Skip to main content

mz_sql/plan/statement/
acl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Access control list (ACL).
11//!
12//! This module houses the handlers for statements that modify privileges in the catalog, like
13//! `GRANT`, `REVOKE`, and `REASSIGN OWNED`.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use mz_sql_parser::ast::display::AstDisplay;
19
20use crate::ast::{Ident, UnresolvedDatabaseName};
21use crate::catalog::{
22    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ErrorMessageObjectDescription, ObjectType,
23    SystemObjectType,
24};
25use crate::names::{
26    Aug, ObjectId, ResolvedDatabaseSpecifier, ResolvedRoleName, SchemaSpecifier, SystemObjectId,
27};
28use crate::plan::error::PlanError;
29use crate::plan::statement::ddl::{
30    resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema,
31};
32use crate::plan::statement::{StatementContext, StatementDesc};
33use crate::plan::{
34    AlterDefaultPrivilegesPlan, AlterNoopPlan, AlterOwnerPlan, GrantPrivilegesPlan, GrantRolePlan,
35    Plan, PlanNotice, ReassignOwnedPlan, RevokePrivilegesPlan, RevokeRolePlan, UpdatePrivilege,
36};
37use crate::session::user::SYSTEM_USER;
38use mz_ore::str::StrExt;
39use mz_repr::adt::mz_acl_item::AclMode;
40use mz_repr::role_id::RoleId;
41use mz_sql_parser::ast::{
42    AbbreviatedGrantOrRevokeStatement, AlterDefaultPrivilegesStatement, AlterOwnerStatement,
43    GrantPrivilegesStatement, GrantRoleStatement, GrantTargetAllSpecification,
44    GrantTargetSpecification, GrantTargetSpecificationInner, Privilege, PrivilegeSpecification,
45    ReassignOwnedStatement, RevokePrivilegesStatement, RevokeRoleStatement,
46    TargetRoleSpecification, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName,
47};
48
49pub fn describe_alter_owner(
50    _: &StatementContext,
51    _: AlterOwnerStatement<Aug>,
52) -> Result<StatementDesc, PlanError> {
53    Ok(StatementDesc::new(None))
54}
55
56pub fn plan_alter_owner(
57    scx: &StatementContext,
58    AlterOwnerStatement {
59        object_type,
60        if_exists,
61        name,
62        new_owner,
63    }: AlterOwnerStatement<Aug>,
64) -> Result<Plan, PlanError> {
65    let object_type = object_type.into();
66    match (object_type, name) {
67        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
68            plan_alter_cluster_owner(scx, if_exists, name, new_owner.id)
69        }
70        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(_)) => {
71            bail_never_supported!("altering the owner of a cluster replica");
72        }
73        (ObjectType::Database, UnresolvedObjectName::Database(name)) => {
74            plan_alter_database_owner(scx, if_exists, name, new_owner.id)
75        }
76        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
77            plan_alter_schema_owner(scx, if_exists, name, new_owner.id)
78        }
79        (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => {
80            plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id)
81        }
82        // The parser should have rejected this.
83        (ObjectType::Role, UnresolvedObjectName::Role(_)) => {
84            bail_internal!("cannot ALTER OWNER of a role")
85        }
86        (
87            object_type @ ObjectType::Cluster
88            | object_type @ ObjectType::ClusterReplica
89            | object_type @ ObjectType::Database
90            | object_type @ ObjectType::Schema
91            | object_type @ ObjectType::Role,
92            name,
93        )
94        | (
95            object_type,
96            name @ UnresolvedObjectName::Cluster(_)
97            | name @ UnresolvedObjectName::ClusterReplica(_)
98            | name @ UnresolvedObjectName::Database(_)
99            | name @ UnresolvedObjectName::Schema(_)
100            | name @ UnresolvedObjectName::NetworkPolicy(_)
101            | name @ UnresolvedObjectName::Role(_),
102        ) => {
103            // The parser should not have produced this combination.
104            bail_internal!("invalid object type '{object_type}' for ALTER OWNER with name {name}")
105        }
106        (object_type, UnresolvedObjectName::Item(name)) => {
107            plan_alter_item_owner(scx, object_type, if_exists, name, new_owner.id)
108        }
109    }
110}
111
112fn plan_alter_cluster_owner(
113    scx: &StatementContext,
114    if_exists: bool,
115    name: Ident,
116    new_owner: RoleId,
117) -> Result<Plan, PlanError> {
118    match resolve_cluster(scx, &name, if_exists)? {
119        Some(cluster) => Ok(Plan::AlterOwner(AlterOwnerPlan {
120            id: ObjectId::Cluster(cluster.id()),
121            object_type: ObjectType::Cluster,
122            new_owner,
123        })),
124        None => {
125            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
126                name: name.to_ast_string_simple(),
127                object_type: ObjectType::Cluster,
128            });
129            Ok(Plan::AlterNoop(AlterNoopPlan {
130                object_type: ObjectType::Cluster,
131            }))
132        }
133    }
134}
135
136fn plan_alter_database_owner(
137    scx: &StatementContext,
138    if_exists: bool,
139    name: UnresolvedDatabaseName,
140    new_owner: RoleId,
141) -> Result<Plan, PlanError> {
142    match resolve_database(scx, &name, if_exists)? {
143        Some(database) => Ok(Plan::AlterOwner(AlterOwnerPlan {
144            id: ObjectId::Database(database.id()),
145            object_type: ObjectType::Database,
146            new_owner,
147        })),
148        None => {
149            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
150                name: name.to_ast_string_simple(),
151                object_type: ObjectType::Database,
152            });
153
154            Ok(Plan::AlterNoop(AlterNoopPlan {
155                object_type: ObjectType::Database,
156            }))
157        }
158    }
159}
160
161fn plan_alter_schema_owner(
162    scx: &StatementContext,
163    if_exists: bool,
164    name: UnresolvedSchemaName,
165    new_owner: RoleId,
166) -> Result<Plan, PlanError> {
167    // Special case for mz_temp: with lazy temporary schema creation, the temp
168    // schema may not exist yet, but we still need to return the correct error.
169    // Check the schema name directly against MZ_TEMP_SCHEMA.
170    let normalized = crate::normalize::unresolved_schema_name(name.clone())?;
171    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
172        sql_bail!("cannot alter schema {name} because it is a temporary schema",)
173    }
174
175    match resolve_schema(scx, name.clone(), if_exists)? {
176        Some((database_spec, schema_spec)) => {
177            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
178                sql_bail!(
179                    "cannot alter schema {name} because it is required by the database system",
180                );
181            }
182            if let SchemaSpecifier::Temporary = schema_spec {
183                sql_bail!("cannot alter schema {name} because it is a temporary schema",)
184            }
185            Ok(Plan::AlterOwner(AlterOwnerPlan {
186                id: ObjectId::Schema((database_spec, schema_spec)),
187                object_type: ObjectType::Schema,
188                new_owner,
189            }))
190        }
191        None => {
192            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
193                name: name.to_ast_string_simple(),
194                object_type: ObjectType::Schema,
195            });
196
197            Ok(Plan::AlterNoop(AlterNoopPlan {
198                object_type: ObjectType::Schema,
199            }))
200        }
201    }
202}
203
204fn plan_alter_item_owner(
205    scx: &StatementContext,
206    object_type: ObjectType,
207    if_exists: bool,
208    name: UnresolvedItemName,
209    new_owner: RoleId,
210) -> Result<Plan, PlanError> {
211    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
212        Ok(r) => r,
213        // Return a more helpful error on `DROP VIEW <materialized-view>`.
214        Err(PlanError::MismatchedObjectType {
215            name,
216            is_type: ObjectType::MaterializedView,
217            expected_type: ObjectType::View,
218        }) => {
219            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
220        }
221        e => e?,
222    };
223
224    match resolved {
225        Some(item) => {
226            if item.id().is_system() {
227                sql_bail!(
228                    "cannot alter item {} because it is required by the database system",
229                    scx.catalog.resolve_full_name(item.name()),
230                );
231            }
232
233            Ok(Plan::AlterOwner(AlterOwnerPlan {
234                id: ObjectId::Item(item.id()),
235                object_type,
236                new_owner,
237            }))
238        }
239        None => {
240            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
241                name: name.to_ast_string_simple(),
242                object_type,
243            });
244
245            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
246        }
247    }
248}
249
250fn plan_alter_network_policy_owner(
251    scx: &StatementContext,
252    if_exists: bool,
253    name: Ident,
254    new_owner: RoleId,
255) -> Result<Plan, PlanError> {
256    match resolve_network_policy(scx, name.clone(), if_exists)? {
257        Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan {
258            id: ObjectId::NetworkPolicy(policy_id.id),
259            object_type: ObjectType::NetworkPolicy,
260            new_owner,
261        })),
262        None => {
263            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
264                name: name.to_ast_string_simple(),
265                object_type: ObjectType::NetworkPolicy,
266            });
267
268            Ok(Plan::AlterNoop(AlterNoopPlan {
269                object_type: ObjectType::NetworkPolicy,
270            }))
271        }
272    }
273}
274
275pub fn describe_grant_role(
276    _: &StatementContext,
277    _: GrantRoleStatement<Aug>,
278) -> Result<StatementDesc, PlanError> {
279    Ok(StatementDesc::new(None))
280}
281
282pub fn plan_grant_role(
283    scx: &StatementContext,
284    GrantRoleStatement {
285        role_names,
286        member_names,
287    }: GrantRoleStatement<Aug>,
288) -> Result<Plan, PlanError> {
289    // In PostgreSQL, the grantor must either be a role with ADMIN OPTION on the role being granted,
290    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
291    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
292    // For more details see:
293    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
294    let grantor_id = scx
295        .catalog
296        .resolve_role(&SYSTEM_USER.name)
297        .expect("system user must exist")
298        .id();
299    Ok(Plan::GrantRole(GrantRolePlan {
300        role_ids: role_names
301            .into_iter()
302            .map(|role_name| role_name.id)
303            .collect(),
304        member_ids: member_names
305            .into_iter()
306            .map(|member_name| member_name.id)
307            .collect(),
308        grantor_id,
309    }))
310}
311
312pub fn describe_revoke_role(
313    _: &StatementContext,
314    _: RevokeRoleStatement<Aug>,
315) -> Result<StatementDesc, PlanError> {
316    Ok(StatementDesc::new(None))
317}
318
319pub fn plan_revoke_role(
320    scx: &StatementContext,
321    RevokeRoleStatement {
322        role_names,
323        member_names,
324    }: RevokeRoleStatement<Aug>,
325) -> Result<Plan, PlanError> {
326    // In PostgreSQL, the same role membership can be granted multiple times by different grantors.
327    // When revoking a role membership, only the membership granted by the specified grantor is
328    // revoked. The grantor must either be a role with ADMIN OPTION on the role being granted,
329    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
330    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
331    // For more details see:
332    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
333    let grantor_id = scx
334        .catalog
335        .resolve_role(&SYSTEM_USER.name)
336        .expect("system user must exist")
337        .id();
338    Ok(Plan::RevokeRole(RevokeRolePlan {
339        role_ids: role_names
340            .into_iter()
341            .map(|role_name| role_name.id)
342            .collect(),
343        member_ids: member_names
344            .into_iter()
345            .map(|member_name| member_name.id)
346            .collect(),
347        grantor_id,
348    }))
349}
350
351pub fn describe_grant_privileges(
352    _: &StatementContext,
353    _: GrantPrivilegesStatement<Aug>,
354) -> Result<StatementDesc, PlanError> {
355    Ok(StatementDesc::new(None))
356}
357
358pub fn plan_grant_privileges(
359    scx: &StatementContext,
360    GrantPrivilegesStatement {
361        privileges,
362        target,
363        roles,
364    }: GrantPrivilegesStatement<Aug>,
365) -> Result<Plan, PlanError> {
366    let plan = plan_update_privilege(scx, privileges, target, roles)?;
367    Ok(Plan::GrantPrivileges(plan.into()))
368}
369
370pub fn describe_revoke_privileges(
371    _: &StatementContext,
372    _: RevokePrivilegesStatement<Aug>,
373) -> Result<StatementDesc, PlanError> {
374    Ok(StatementDesc::new(None))
375}
376
377pub fn plan_revoke_privileges(
378    scx: &StatementContext,
379    RevokePrivilegesStatement {
380        privileges,
381        target,
382        roles,
383    }: RevokePrivilegesStatement<Aug>,
384) -> Result<Plan, PlanError> {
385    let plan = plan_update_privilege(scx, privileges, target, roles)?;
386    Ok(Plan::RevokePrivileges(plan.into()))
387}
388
389struct UpdatePrivilegesPlan {
390    update_privileges: Vec<UpdatePrivilege>,
391    grantees: Vec<RoleId>,
392}
393
394impl From<UpdatePrivilegesPlan> for GrantPrivilegesPlan {
395    fn from(
396        UpdatePrivilegesPlan {
397            update_privileges,
398            grantees,
399        }: UpdatePrivilegesPlan,
400    ) -> GrantPrivilegesPlan {
401        GrantPrivilegesPlan {
402            update_privileges,
403            grantees,
404        }
405    }
406}
407
408impl From<UpdatePrivilegesPlan> for RevokePrivilegesPlan {
409    fn from(
410        UpdatePrivilegesPlan {
411            update_privileges,
412            grantees,
413        }: UpdatePrivilegesPlan,
414    ) -> RevokePrivilegesPlan {
415        RevokePrivilegesPlan {
416            update_privileges,
417            revokees: grantees,
418        }
419    }
420}
421
422fn plan_update_privilege(
423    scx: &StatementContext,
424    privileges: PrivilegeSpecification,
425    target: GrantTargetSpecification<Aug>,
426    roles: Vec<ResolvedRoleName>,
427) -> Result<UpdatePrivilegesPlan, PlanError> {
428    let (object_type, target_ids) = match target {
429        GrantTargetSpecification::Object {
430            object_type,
431            object_spec_inner,
432        } => {
433            fn object_type_filter(
434                object_id: &ObjectId,
435                object_type: &ObjectType,
436                scx: &StatementContext,
437            ) -> bool {
438                if object_type == &ObjectType::Table {
439                    scx.get_object_type(object_id).is_relation()
440                } else {
441                    object_type == &scx.get_object_type(object_id)
442                }
443            }
444            let object_type = object_type.into();
445            let object_ids: Vec<ObjectId> = match object_spec_inner {
446                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::All) => {
447                    let cluster_ids = scx
448                        .catalog
449                        .get_clusters()
450                        .into_iter()
451                        .map(|cluster| cluster.id().into());
452                    let database_ids = scx
453                        .catalog
454                        .get_databases()
455                        .into_iter()
456                        .map(|database| database.id().into());
457                    let schema_ids = scx
458                        .catalog
459                        .get_schemas()
460                        .into_iter()
461                        .filter(|schema| !schema.id().is_temporary())
462                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
463                    let item_ids = scx
464                        .catalog
465                        .get_items()
466                        .into_iter()
467                        .map(|item| item.id().into());
468                    cluster_ids
469                        .chain(database_ids)
470                        .chain(schema_ids)
471                        .chain(item_ids)
472                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
473                        .filter(|object_id| object_id.is_user())
474                        .collect()
475                }
476                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllDatabases {
477                    databases,
478                }) => {
479                    let schema_ids = databases
480                        .iter()
481                        .map(|database| scx.get_database(database.database_id()))
482                        .flat_map(|database| database.schemas().into_iter())
483                        .filter(|schema| !schema.id().is_temporary())
484                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
485
486                    let item_ids = databases
487                        .iter()
488                        .map(|database| scx.get_database(database.database_id()))
489                        .flat_map(|database| database.schemas().into_iter())
490                        .flat_map(|schema| schema.item_ids())
491                        .map(|item_id| item_id.into());
492
493                    item_ids
494                        .chain(schema_ids)
495                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
496                        .collect()
497                }
498                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllSchemas {
499                    schemas,
500                }) => schemas
501                    .into_iter()
502                    .map(|schema| scx.get_schema(schema.database_spec(), schema.schema_spec()))
503                    .flat_map(|schema| schema.item_ids())
504                    .map(|item_id| item_id.into())
505                    .filter(|object_id| object_type_filter(object_id, &object_type, scx))
506                    .collect(),
507                GrantTargetSpecificationInner::Objects { names } => {
508                    let mut ids = Vec::with_capacity(names.len());
509                    for name in names {
510                        ids.push(
511                            // Name resolution should have rejected invalid objects.
512                            name.try_into()
513                                .map_err(|e| internal_err!("invalid object name: {}", e))?,
514                        );
515                    }
516                    ids
517                }
518            };
519            let target_ids = object_ids.into_iter().map(|id| id.into()).collect();
520            (SystemObjectType::Object(object_type), target_ids)
521        }
522        GrantTargetSpecification::System => {
523            (SystemObjectType::System, vec![SystemObjectId::System])
524        }
525    };
526
527    let mut update_privileges = Vec::with_capacity(target_ids.len());
528
529    for target_id in target_ids {
530        // Temporary schemas cannot have privileges granted or revoked - they are
531        // connection-specific and transient. With lazy temporary schema creation,
532        // the temp schema may not exist yet, but we still need to return the correct error.
533        if let SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary))) =
534            &target_id
535        {
536            sql_bail!(
537                "cannot grant or revoke privileges on schema {} because it is a temporary schema",
538                mz_repr::namespaces::MZ_TEMP_SCHEMA
539            );
540        }
541
542        // The actual type of the object.
543        let actual_object_type = scx.get_system_object_type(&target_id);
544        // The type used for privileges, for example if the actual type is a view, the reference
545        // type is table.
546        let mut reference_object_type = actual_object_type.clone();
547
548        let acl_mode = privilege_spec_to_acl_mode(scx, &privileges, actual_object_type);
549
550        if let SystemObjectId::Object(ObjectId::Item(id)) = &target_id {
551            let item = scx.get_item(id);
552            let item_type: ObjectType = item.item_type().into();
553            if (item_type == ObjectType::View
554                || item_type == ObjectType::MaterializedView
555                || item_type == ObjectType::Source)
556                && object_type == SystemObjectType::Object(ObjectType::Table)
557            {
558                // This is an expected mis-match to match PostgreSQL semantics.
559                reference_object_type = SystemObjectType::Object(ObjectType::Table);
560            } else if SystemObjectType::Object(item_type) != object_type {
561                let object_name = scx.catalog.resolve_full_name(item.name()).to_string();
562                return Err(PlanError::InvalidObjectType {
563                    expected_type: object_type,
564                    actual_type: actual_object_type,
565                    object_name,
566                });
567            }
568        }
569
570        let all_object_privileges = scx.catalog.all_object_privileges(reference_object_type);
571        let invalid_privileges = acl_mode.difference(all_object_privileges);
572        if !invalid_privileges.is_empty() {
573            let object_description =
574                ErrorMessageObjectDescription::from_sys_id(&target_id, scx.catalog);
575            return Err(PlanError::InvalidPrivilegeTypes {
576                invalid_privileges,
577                object_description,
578            });
579        }
580
581        // In PostgreSQL, the grantor must always be either the object owner or some role that has been
582        // been explicitly granted grant options. In Materialize, we haven't implemented grant options
583        // so the grantor is always the object owner.
584        //
585        // For more details see:
586        // https://github.com/postgres/postgres/blob/78d5952dd0e66afc4447eec07f770991fa406cce/src/backend/utils/adt/acl.c#L5154-L5246
587        let grantor = match &target_id {
588            SystemObjectId::Object(object_id) => scx
589                .catalog
590                .get_owner_id(object_id)
591                .ok_or_else(|| sql_err!("cannot revoke privileges on objects without owners"))?,
592            SystemObjectId::System => scx.catalog.mz_system_role_id(),
593        };
594
595        update_privileges.push(UpdatePrivilege {
596            acl_mode,
597            target_id,
598            grantor,
599        });
600    }
601
602    let grantees = roles.into_iter().map(|role| role.id).collect();
603
604    Ok(UpdatePrivilegesPlan {
605        update_privileges,
606        grantees,
607    })
608}
609
610fn privilege_spec_to_acl_mode(
611    scx: &StatementContext,
612    privilege_spec: &PrivilegeSpecification,
613    object_type: SystemObjectType,
614) -> AclMode {
615    match privilege_spec {
616        PrivilegeSpecification::All => scx.catalog.all_object_privileges(object_type),
617        PrivilegeSpecification::Privileges(privileges) => privileges
618            .into_iter()
619            .map(|privilege| privilege_to_acl_mode(privilege.clone()))
620            // PostgreSQL doesn't care about duplicate privileges, so we don't either.
621            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode)),
622    }
623}
624
625fn privilege_to_acl_mode(privilege: Privilege) -> AclMode {
626    match privilege {
627        Privilege::SELECT => AclMode::SELECT,
628        Privilege::INSERT => AclMode::INSERT,
629        Privilege::UPDATE => AclMode::UPDATE,
630        Privilege::DELETE => AclMode::DELETE,
631        Privilege::USAGE => AclMode::USAGE,
632        Privilege::CREATE => AclMode::CREATE,
633        Privilege::CREATEROLE => AclMode::CREATE_ROLE,
634        Privilege::CREATEDB => AclMode::CREATE_DB,
635        Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER,
636        Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY,
637    }
638}
639
640pub fn describe_alter_default_privileges(
641    _: &StatementContext,
642    _: AlterDefaultPrivilegesStatement<Aug>,
643) -> Result<StatementDesc, PlanError> {
644    Ok(StatementDesc::new(None))
645}
646
647pub fn plan_alter_default_privileges(
648    scx: &StatementContext,
649    AlterDefaultPrivilegesStatement {
650        target_roles,
651        target_objects,
652        grant_or_revoke,
653    }: AlterDefaultPrivilegesStatement<Aug>,
654) -> Result<Plan, PlanError> {
655    let object_type: ObjectType = (*grant_or_revoke.object_type()).into();
656    match object_type {
657        ObjectType::View | ObjectType::MaterializedView | ObjectType::Source => sql_bail!(
658            "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead"
659        ),
660        ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => {
661            sql_bail!("{object_type}S do not have privileges")
662        }
663        ObjectType::Cluster | ObjectType::Database
664            if matches!(
665                target_objects,
666                GrantTargetAllSpecification::AllDatabases { .. }
667            ) =>
668        {
669            sql_bail!("cannot specify {object_type}S and IN DATABASE")
670        }
671
672        ObjectType::Cluster | ObjectType::Database | ObjectType::Schema
673            if matches!(
674                target_objects,
675                GrantTargetAllSpecification::AllSchemas { .. }
676            ) =>
677        {
678            sql_bail!("cannot specify {object_type}S and IN SCHEMA")
679        }
680        ObjectType::Table
681        | ObjectType::Index
682        | ObjectType::Type
683        | ObjectType::Secret
684        | ObjectType::Connection
685        | ObjectType::Cluster
686        | ObjectType::Database
687        | ObjectType::Schema
688        | ObjectType::NetworkPolicy => {}
689    }
690
691    let acl_mode = privilege_spec_to_acl_mode(
692        scx,
693        grant_or_revoke.privileges(),
694        SystemObjectType::Object(object_type),
695    );
696    let all_object_privileges = scx
697        .catalog
698        .all_object_privileges(SystemObjectType::Object(object_type));
699    let invalid_privileges = acl_mode.difference(all_object_privileges);
700    if !invalid_privileges.is_empty() {
701        let object_description =
702            ErrorMessageObjectDescription::from_object_type(SystemObjectType::Object(object_type));
703        return Err(PlanError::InvalidPrivilegeTypes {
704            invalid_privileges,
705            object_description,
706        });
707    }
708
709    let target_roles = match target_roles {
710        TargetRoleSpecification::Roles(roles) => roles.into_iter().map(|role| role.id).collect(),
711        TargetRoleSpecification::AllRoles => vec![RoleId::Public],
712    };
713    let mut privilege_objects = Vec::with_capacity(target_roles.len() * target_objects.len());
714    for target_role in target_roles {
715        match &target_objects {
716            GrantTargetAllSpecification::All => privilege_objects.push(DefaultPrivilegeObject {
717                role_id: target_role,
718                database_id: None,
719                schema_id: None,
720                object_type,
721            }),
722            GrantTargetAllSpecification::AllDatabases { databases } => {
723                for database in databases {
724                    privilege_objects.push(DefaultPrivilegeObject {
725                        role_id: target_role,
726                        database_id: Some(*database.database_id()),
727                        schema_id: None,
728                        object_type,
729                    });
730                }
731            }
732            GrantTargetAllSpecification::AllSchemas { schemas } => {
733                for schema in schemas {
734                    privilege_objects.push(DefaultPrivilegeObject {
735                        role_id: target_role,
736                        database_id: schema.database_spec().id(),
737                        schema_id: Some(schema.schema_spec().into()),
738                        object_type,
739                    });
740                }
741            }
742        }
743    }
744
745    let privilege_acl_items = grant_or_revoke
746        .roles()
747        .into_iter()
748        .map(|grantee| DefaultPrivilegeAclItem {
749            grantee: grantee.id,
750            acl_mode,
751        })
752        .collect();
753
754    let is_grant = match grant_or_revoke {
755        AbbreviatedGrantOrRevokeStatement::Grant(_) => true,
756        AbbreviatedGrantOrRevokeStatement::Revoke(_) => false,
757    };
758
759    Ok(Plan::AlterDefaultPrivileges(AlterDefaultPrivilegesPlan {
760        privilege_objects,
761        privilege_acl_items,
762        is_grant,
763    }))
764}
765
766pub fn describe_reassign_owned(
767    _: &StatementContext,
768    _: ReassignOwnedStatement<Aug>,
769) -> Result<StatementDesc, PlanError> {
770    Ok(StatementDesc::new(None))
771}
772
773pub fn plan_reassign_owned(
774    scx: &StatementContext,
775    ReassignOwnedStatement {
776        old_roles,
777        new_role,
778    }: ReassignOwnedStatement<Aug>,
779) -> Result<Plan, PlanError> {
780    let old_roles: BTreeSet<_> = old_roles.into_iter().map(|role| role.id).collect();
781    let mut reassign_ids: Vec<ObjectId> = Vec::new();
782
783    // Replicas
784    for replica in scx.catalog.get_cluster_replicas() {
785        if old_roles.contains(&replica.owner_id()) {
786            reassign_ids.push((replica.cluster_id(), replica.replica_id()).into());
787        }
788    }
789    // Clusters
790    for cluster in scx.catalog.get_clusters() {
791        if old_roles.contains(&cluster.owner_id()) {
792            reassign_ids.push(cluster.id().into());
793        }
794    }
795    // Items
796    for item in scx.catalog.get_items() {
797        if old_roles.contains(&item.owner_id()) {
798            reassign_ids.push(item.id().into());
799        }
800    }
801    // Schemas
802    for schema in scx.catalog.get_schemas() {
803        if !schema.id().is_temporary() {
804            if old_roles.contains(&schema.owner_id()) {
805                reassign_ids.push((*schema.database(), *schema.id()).into())
806            }
807        }
808    }
809    // Databases
810    for database in scx.catalog.get_databases() {
811        if old_roles.contains(&database.owner_id()) {
812            reassign_ids.push(database.id().into());
813        }
814    }
815    // Network policies
816    for network_policy in scx.catalog.get_network_policies() {
817        if old_roles.contains(&network_policy.owner_id()) {
818            reassign_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
819        }
820    }
821
822    let system_ids: Vec<_> = reassign_ids.iter().filter(|id| id.is_system()).collect();
823    if !system_ids.is_empty() {
824        let mut owners = system_ids
825            .into_iter()
826            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
827            .collect::<BTreeSet<_>>()
828            .into_iter()
829            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
830        sql_bail!(
831            "cannot reassign objects owned by role {} because they are required by the database system",
832            owners.join(", "),
833        );
834    }
835
836    Ok(Plan::ReassignOwned(ReassignOwnedPlan {
837        old_roles: old_roles.into_iter().collect(),
838        new_role: new_role.id,
839        reassign_ids,
840    }))
841}