mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34    StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50    RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69    is_reserved_role_name, object_type_to_audit_object_type,
70    system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
74use crate::coord::cluster_scheduling::SchedulingDecision;
75use crate::util::ResultExt;
76
77#[derive(Debug, Clone)]
78pub enum Op {
79    AlterRetainHistory {
80        id: CatalogItemId,
81        value: Option<Value>,
82        window: CompactionWindow,
83    },
84    AlterRole {
85        id: RoleId,
86        name: String,
87        attributes: RoleAttributesRaw,
88        nopassword: bool,
89        vars: RoleVars,
90    },
91    AlterNetworkPolicy {
92        id: NetworkPolicyId,
93        rules: Vec<NetworkPolicyRule>,
94        name: String,
95        owner_id: RoleId,
96    },
97    AlterAddColumn {
98        id: CatalogItemId,
99        new_global_id: GlobalId,
100        name: ColumnName,
101        typ: SqlColumnType,
102        sql: RawDataType,
103    },
104    AlterMaterializedViewApplyReplacement {
105        id: CatalogItemId,
106        replacement_id: CatalogItemId,
107    },
108    CreateDatabase {
109        name: String,
110        owner_id: RoleId,
111    },
112    CreateSchema {
113        database_id: ResolvedDatabaseSpecifier,
114        schema_name: String,
115        owner_id: RoleId,
116    },
117    CreateRole {
118        name: String,
119        attributes: RoleAttributesRaw,
120    },
121    CreateCluster {
122        id: ClusterId,
123        name: String,
124        introspection_sources: Vec<&'static BuiltinLog>,
125        owner_id: RoleId,
126        config: ClusterConfig,
127    },
128    CreateClusterReplica {
129        cluster_id: ClusterId,
130        name: String,
131        config: ReplicaConfig,
132        owner_id: RoleId,
133        reason: ReplicaCreateDropReason,
134    },
135    CreateItem {
136        id: CatalogItemId,
137        name: QualifiedItemName,
138        item: CatalogItem,
139        owner_id: RoleId,
140    },
141    CreateNetworkPolicy {
142        rules: Vec<NetworkPolicyRule>,
143        name: String,
144        owner_id: RoleId,
145    },
146    Comment {
147        object_id: CommentObjectId,
148        sub_component: Option<usize>,
149        comment: Option<String>,
150    },
151    DropObjects(Vec<DropObjectInfo>),
152    GrantRole {
153        role_id: RoleId,
154        member_id: RoleId,
155        grantor_id: RoleId,
156    },
157    RenameCluster {
158        id: ClusterId,
159        name: String,
160        to_name: String,
161        check_reserved_names: bool,
162    },
163    RenameClusterReplica {
164        cluster_id: ClusterId,
165        replica_id: ReplicaId,
166        name: QualifiedReplica,
167        to_name: String,
168    },
169    RenameItem {
170        id: CatalogItemId,
171        current_full_name: FullItemName,
172        to_name: String,
173    },
174    RenameSchema {
175        database_spec: ResolvedDatabaseSpecifier,
176        schema_spec: SchemaSpecifier,
177        new_name: String,
178        check_reserved_names: bool,
179    },
180    UpdateOwner {
181        id: ObjectId,
182        new_owner: RoleId,
183    },
184    UpdatePrivilege {
185        target_id: SystemObjectId,
186        privilege: MzAclItem,
187        variant: UpdatePrivilegeVariant,
188    },
189    UpdateDefaultPrivilege {
190        privilege_object: DefaultPrivilegeObject,
191        privilege_acl_item: DefaultPrivilegeAclItem,
192        variant: UpdatePrivilegeVariant,
193    },
194    RevokeRole {
195        role_id: RoleId,
196        member_id: RoleId,
197        grantor_id: RoleId,
198    },
199    UpdateClusterConfig {
200        id: ClusterId,
201        name: String,
202        config: ClusterConfig,
203    },
204    UpdateClusterReplicaConfig {
205        cluster_id: ClusterId,
206        replica_id: ReplicaId,
207        config: ReplicaConfig,
208    },
209    UpdateItem {
210        id: CatalogItemId,
211        name: QualifiedItemName,
212        to_item: CatalogItem,
213    },
214    UpdateSourceReferences {
215        source_id: CatalogItemId,
216        references: SourceReferences,
217    },
218    UpdateSystemConfiguration {
219        name: String,
220        value: OwnedVarInput,
221    },
222    ResetSystemConfiguration {
223        name: String,
224    },
225    ResetAllSystemConfiguration,
226    /// Performs updates to the storage usage table, which probably should be a builtin source.
227    ///
228    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
229    /// might not work. If a process crashes after collecting storage usage events
230    /// but before updating the builtin table, then another listening catalog
231    /// will never know to update the builtin table.
232    WeirdStorageUsageUpdates {
233        object_id: Option<String>,
234        size_bytes: u64,
235        collection_timestamp: EpochMillis,
236    },
237    /// Performs a dry run of the commit, but errors with
238    /// [`AdapterError::TransactionDryRun`].
239    ///
240    /// When using this value, it should be included only as the last element of
241    /// the transaction and should not be the only value in the transaction.
242    TransactionDryRun,
243}
244
245/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
246/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
247/// the `Op::DropObjects`.
248#[derive(Debug, Clone)]
249pub enum DropObjectInfo {
250    Cluster(ClusterId),
251    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
252    Database(DatabaseId),
253    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
254    Role(RoleId),
255    Item(CatalogItemId),
256    NetworkPolicy(NetworkPolicyId),
257}
258
259impl DropObjectInfo {
260    /// Creates a `DropObjectInfo` from an `ObjectId`.
261    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
262    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
263        match id {
264            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
265            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
266                cluster_id,
267                replica_id,
268                ReplicaCreateDropReason::Manual,
269            )),
270            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
271            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
272            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
273            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
274            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
275        }
276    }
277
278    /// Creates an `ObjectId` from a `DropObjectInfo`.
279    /// Loses the `ReplicaCreateDropReason` if there is one!
280    fn to_object_id(&self) -> ObjectId {
281        match &self {
282            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
283            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
284                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
285            }
286            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
287            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
288            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
289            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
290            DropObjectInfo::NetworkPolicy(network_policy_id) => {
291                ObjectId::NetworkPolicy(network_policy_id.clone())
292            }
293        }
294    }
295}
296
297/// The reason for creating or dropping a replica.
298#[derive(Debug, Clone)]
299pub enum ReplicaCreateDropReason {
300    /// The user initiated the replica create or drop, e.g., by
301    /// - creating/dropping a cluster,
302    /// - ALTERing various options on a managed cluster,
303    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
304    Manual,
305    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
306    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
307    ClusterScheduling(Vec<SchedulingDecision>),
308}
309
310impl ReplicaCreateDropReason {
311    pub fn into_audit_log(
312        self,
313    ) -> (
314        CreateOrDropClusterReplicaReasonV1,
315        Option<SchedulingDecisionsWithReasonsV2>,
316    ) {
317        let (reason, scheduling_policies) = match self {
318            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
319            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
320                CreateOrDropClusterReplicaReasonV1::Schedule,
321                Some(scheduling_decisions),
322            ),
323        };
324        (
325            reason,
326            scheduling_policies
327                .as_ref()
328                .map(SchedulingDecision::reasons_to_audit_log_reasons),
329        )
330    }
331}
332
333pub struct TransactionResult {
334    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
335    /// Parsed catalog updates from which we will derive catalog implications.
336    pub catalog_updates: Vec<ParsedStateUpdate>,
337    pub audit_events: Vec<VersionedEvent>,
338}
339
340impl Catalog {
341    fn should_audit_log_item(item: &CatalogItem) -> bool {
342        !item.is_temporary()
343    }
344
345    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
346    /// within a connection id.
347    fn temporary_ids(
348        &self,
349        ops: &[Op],
350        temporary_drops: BTreeSet<(&ConnectionId, String)>,
351    ) -> Result<BTreeSet<CatalogItemId>, Error> {
352        let mut creating = BTreeSet::new();
353        let mut temporary_ids = BTreeSet::new();
354        for op in ops.iter() {
355            if let Op::CreateItem {
356                id,
357                name,
358                item,
359                owner_id: _,
360            } = op
361            {
362                if let Some(conn_id) = item.conn_id() {
363                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
364                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
365                        || creating.contains(&(conn_id, &name.item))
366                    {
367                        return Err(
368                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
369                        );
370                    } else {
371                        creating.insert((conn_id, &name.item));
372                        temporary_ids.insert(id.clone());
373                    }
374                }
375            }
376        }
377        Ok(temporary_ids)
378    }
379
380    #[instrument(name = "catalog::transact")]
381    pub async fn transact(
382        &mut self,
383        // n.b. this is an option to prevent us from needing to build out a
384        // dummy impl of `StorageController` for tests.
385        storage_collections: Option<
386            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
387        >,
388        oracle_write_ts: mz_repr::Timestamp,
389        session: Option<&ConnMeta>,
390        ops: Vec<Op>,
391    ) -> Result<TransactionResult, AdapterError> {
392        trace!("transact: {:?}", ops);
393        fail::fail_point!("catalog_transact", |arg| {
394            Err(AdapterError::Unstructured(anyhow::anyhow!(
395                "failpoint: {arg:?}"
396            )))
397        });
398
399        let drop_ids: BTreeSet<CatalogItemId> = ops
400            .iter()
401            .filter_map(|op| match op {
402                Op::DropObjects(drop_object_infos) => {
403                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
404                    let item_ids = ids.filter_map(|id| match id {
405                        ObjectId::Item(id) => Some(id),
406                        _ => None,
407                    });
408                    Some(item_ids)
409                }
410                _ => None,
411            })
412            .flatten()
413            .collect();
414        let temporary_drops = drop_ids
415            .iter()
416            .filter_map(|id| {
417                let entry = self.get_entry(id);
418                match entry.item().conn_id() {
419                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
420                    None => None,
421                }
422            })
423            .collect();
424        let dropped_global_ids = drop_ids
425            .iter()
426            .flat_map(|item_id| self.get_global_ids(item_id))
427            .collect();
428
429        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
430        let mut builtin_table_updates = vec![];
431        let mut catalog_updates = vec![];
432        let mut audit_events = vec![];
433        let mut storage = self.storage().await;
434        let mut tx = storage
435            .transaction()
436            .await
437            .unwrap_or_terminate("starting catalog transaction");
438
439        let new_state = Self::transact_inner(
440            storage_collections,
441            oracle_write_ts,
442            session,
443            ops,
444            temporary_ids,
445            &mut builtin_table_updates,
446            &mut catalog_updates,
447            &mut audit_events,
448            &mut tx,
449            &self.state,
450        )
451        .await?;
452
453        // The user closure was successful, apply the updates. Terminate the
454        // process if this fails, because we have to restart envd due to
455        // indeterminate catalog state, which we only reconcile during catalog
456        // init.
457        tx.commit(oracle_write_ts)
458            .await
459            .unwrap_or_terminate("catalog storage transaction commit must succeed");
460
461        // Dropping here keeps the mutable borrow on self, preventing us accidentally
462        // mutating anything until after f is executed.
463        drop(storage);
464        if let Some(new_state) = new_state {
465            self.transient_revision += 1;
466            self.state = new_state;
467        }
468
469        // Drop in-memory planning metadata.
470        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
471        if self.state.system_config().enable_mz_notices() {
472            // Generate retractions for the Builtin tables.
473            self.state().pack_optimizer_notices(
474                &mut builtin_table_updates,
475                dropped_notices.iter(),
476                Diff::MINUS_ONE,
477            );
478        }
479
480        Ok(TransactionResult {
481            builtin_table_updates,
482            catalog_updates,
483            audit_events,
484        })
485    }
486
487    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
488    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
489    ///
490    /// # Panics
491    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
492    ///   final element.
493    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
494    #[instrument(name = "catalog::transact_inner")]
495    async fn transact_inner(
496        storage_collections: Option<
497            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
498        >,
499        oracle_write_ts: mz_repr::Timestamp,
500        session: Option<&ConnMeta>,
501        mut ops: Vec<Op>,
502        temporary_ids: BTreeSet<CatalogItemId>,
503        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
504        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
505        audit_events: &mut Vec<VersionedEvent>,
506        tx: &mut Transaction<'_>,
507        state: &CatalogState,
508    ) -> Result<Option<CatalogState>, AdapterError> {
509        // We come up with new catalog state, builtin state updates, and parsed
510        // catalog updates (for deriving catalog implications) in two phases:
511        //
512        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
513        //    one-by-one. This will give us the full list of updates to apply to
514        //    the catalog, which will allow us to apply it in one batch, which
515        //    in turn will allow the apply machinery to consolidate the updates.
516        // 2. We do one final apply call with all updates, which gives us the
517        //    final builtin table updates and parsed catalog updates.
518        //
519        // The reason is that the loop that is working off ops first does a
520        // transact_op to derive the state updates for that op, and then calls
521        // apply_updates on the catalog state. And successive ops might expect
522        // the catalog state to reflect the modified state _after_ applying
523        // previous ops.
524        //
525        // We want to, however, have one final apply_state that takes all the
526        // accumulated updates to derive the required controller updates and the
527        // builtin table updates.
528        //
529        // We won't win any DDL throughput benchmarks, but so far that's not
530        // what we're optimizing for and there would probably be other
531        // bottlenecks before we hit this one as a bottleneck.
532        //
533        // We could work around this by refactoring how the interplay of
534        // transact_op and apply_updates works, but that's a larger undertaking.
535        let mut preliminary_state = Cow::Borrowed(state);
536
537        // The final state that we will return, if modified.
538        let mut state = Cow::Borrowed(state);
539
540        let dry_run_ops = match ops.last() {
541            Some(Op::TransactionDryRun) => {
542                // Remove dry run marker.
543                ops.pop();
544                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
545                ops.clone()
546            }
547            Some(_) => vec![],
548            None => return Ok(None),
549        };
550
551        let mut storage_collections_to_create = BTreeSet::new();
552        let mut storage_collections_to_drop = BTreeSet::new();
553        let mut storage_collections_to_register = BTreeMap::new();
554
555        let mut updates = Vec::new();
556
557        for op in ops {
558            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
559                oracle_write_ts,
560                session,
561                op,
562                &temporary_ids,
563                audit_events,
564                tx,
565                &*preliminary_state,
566                &mut storage_collections_to_create,
567                &mut storage_collections_to_drop,
568                &mut storage_collections_to_register,
569            )
570            .await?;
571
572            // Certain builtin tables are not derived from the durable catalog state, so they need
573            // to be updated ad-hoc based on the current transaction. This is weird and will not
574            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
575            // this instance crashes after committing the durable catalog but before applying the
576            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
577            // world. Currently, this works fine and there are no correctness issues because
578            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
579            // state of all builtin tables to the correct values.
580            if let Some(builtin_table_update) = weird_builtin_table_update {
581                builtin_table_updates.push(builtin_table_update);
582            }
583
584            // Temporary items are not stored in the durable catalog, so they need to be handled
585            // separately for updating state and builtin tables.
586            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
587            // in a multi-subscriber catalog world.
588            let upper = tx.upper();
589            let temporary_item_updates =
590                temporary_item_updates
591                    .into_iter()
592                    .map(|(item, diff)| StateUpdate {
593                        kind: StateUpdateKind::TemporaryItem(item),
594                        ts: upper,
595                        diff,
596                    });
597
598            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
599            op_updates.extend(temporary_item_updates);
600            if !op_updates.is_empty() {
601                let (_op_builtin_table_updates, _op_catalog_updates) =
602                    preliminary_state
603                        .to_mut()
604                        .apply_updates(op_updates.clone())?;
605            }
606            updates.append(&mut op_updates);
607        }
608
609        if !updates.is_empty() {
610            let (op_builtin_table_updates, op_catalog_updates) =
611                state.to_mut().apply_updates(updates.clone())?;
612            let op_builtin_table_updates = state
613                .to_mut()
614                .resolve_builtin_table_updates(op_builtin_table_updates);
615            builtin_table_updates.extend(op_builtin_table_updates);
616            parsed_catalog_updates.extend(op_catalog_updates);
617        }
618
619        if dry_run_ops.is_empty() {
620            // `storage_collections` should only be `None` for tests.
621            if let Some(c) = storage_collections {
622                c.prepare_state(
623                    tx,
624                    storage_collections_to_create,
625                    storage_collections_to_drop,
626                    storage_collections_to_register,
627                )
628                .await?;
629            }
630
631            let updates = tx.get_and_commit_op_updates();
632            if !updates.is_empty() {
633                let (op_builtin_table_updates, op_catalog_updates) =
634                    state.to_mut().apply_updates(updates.clone())?;
635                let op_builtin_table_updates = state
636                    .to_mut()
637                    .resolve_builtin_table_updates(op_builtin_table_updates);
638                builtin_table_updates.extend(op_builtin_table_updates);
639                parsed_catalog_updates.extend(op_catalog_updates);
640            }
641
642            match state {
643                Cow::Owned(state) => Ok(Some(state)),
644                Cow::Borrowed(_) => Ok(None),
645            }
646        } else {
647            Err(AdapterError::TransactionDryRun {
648                new_ops: dry_run_ops,
649                new_state: state.into_owned(),
650            })
651        }
652    }
653
654    /// Performs the transaction operation described by `op`. This function prepares the changes in
655    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
656    /// changes.
657    ///
658    /// Optionally returns a builtin table update for any builtin table updates than cannot be
659    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
660    /// scenarios and ideally in the future don't exist.
661    #[instrument]
662    async fn transact_op(
663        oracle_write_ts: mz_repr::Timestamp,
664        session: Option<&ConnMeta>,
665        op: Op,
666        temporary_ids: &BTreeSet<CatalogItemId>,
667        audit_events: &mut Vec<VersionedEvent>,
668        tx: &mut Transaction<'_>,
669        state: &CatalogState,
670        storage_collections_to_create: &mut BTreeSet<GlobalId>,
671        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
672        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
673    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
674        let mut weird_builtin_table_update = None;
675        let mut temporary_item_updates = Vec::new();
676
677        match op {
678            Op::TransactionDryRun => {
679                unreachable!("TransactionDryRun can only be used a final element of ops")
680            }
681            Op::AlterRetainHistory { id, value, window } => {
682                let entry = state.get_entry(&id);
683                if id.is_system() {
684                    let name = entry.name();
685                    let full_name =
686                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
687                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
688                        full_name.to_string(),
689                    ))));
690                }
691
692                let mut new_entry = entry.clone();
693                let previous = new_entry
694                    .item
695                    .update_retain_history(value.clone(), window)
696                    .map_err(|_| {
697                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
698                            "planner should have rejected invalid alter retain history item type"
699                                .to_string(),
700                        )))
701                    })?;
702
703                if Self::should_audit_log_item(new_entry.item()) {
704                    let details =
705                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
706                            id: id.to_string(),
707                            old_history: previous.map(|previous| previous.to_string()),
708                            new_history: value.map(|v| v.to_string()),
709                        });
710                    CatalogState::add_to_audit_log(
711                        &state.system_configuration,
712                        oracle_write_ts,
713                        session,
714                        tx,
715                        audit_events,
716                        EventType::Alter,
717                        catalog_type_to_audit_object_type(new_entry.item().typ()),
718                        details,
719                    )?;
720                }
721
722                tx.update_item(id, new_entry.into())?;
723
724                Self::log_update(state, &id);
725            }
726            Op::AlterRole {
727                id,
728                name,
729                attributes,
730                nopassword,
731                vars,
732            } => {
733                state.ensure_not_reserved_role(&id)?;
734
735                let mut existing_role = state.get_role(&id).clone();
736                let password = attributes.password.clone();
737                let scram_iterations = attributes
738                    .scram_iterations
739                    .unwrap_or_else(|| state.system_config().scram_iterations());
740                existing_role.attributes = attributes.into();
741                existing_role.vars = vars;
742                let password_action = if nopassword {
743                    PasswordAction::Clear
744                } else if let Some(password) = password {
745                    PasswordAction::Set(PasswordConfig {
746                        password,
747                        scram_iterations,
748                    })
749                } else {
750                    PasswordAction::NoChange
751                };
752                tx.update_role(id, existing_role.into(), password_action)?;
753
754                CatalogState::add_to_audit_log(
755                    &state.system_configuration,
756                    oracle_write_ts,
757                    session,
758                    tx,
759                    audit_events,
760                    EventType::Alter,
761                    ObjectType::Role,
762                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
763                        id: id.to_string(),
764                        name: name.clone(),
765                    }),
766                )?;
767
768                info!("update role {name} ({id})");
769            }
770            Op::AlterNetworkPolicy {
771                id,
772                rules,
773                name,
774                owner_id: _owner_id,
775            } => {
776                let existing_policy = state.get_network_policy(&id).clone();
777                let mut policy: NetworkPolicy = existing_policy.into();
778                policy.rules = rules;
779                if is_reserved_name(&name) {
780                    return Err(AdapterError::Catalog(Error::new(
781                        ErrorKind::ReservedNetworkPolicyName(name),
782                    )));
783                }
784                tx.update_network_policy(id, policy.clone())?;
785
786                CatalogState::add_to_audit_log(
787                    &state.system_configuration,
788                    oracle_write_ts,
789                    session,
790                    tx,
791                    audit_events,
792                    EventType::Alter,
793                    ObjectType::NetworkPolicy,
794                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
795                        id: id.to_string(),
796                        name: name.clone(),
797                    }),
798                )?;
799
800                info!("update network policy {name} ({id})");
801            }
802            Op::AlterAddColumn {
803                id,
804                new_global_id,
805                name,
806                typ,
807                sql,
808            } => {
809                let mut new_entry = state.get_entry(&id).clone();
810                let version = new_entry.item.add_column(name, typ, sql)?;
811                // All versions of a table share the same shard, so it shouldn't matter what
812                // GlobalId we use here.
813                let shard_id = state
814                    .storage_metadata()
815                    .get_collection_shard(new_entry.latest_global_id())?;
816
817                // TODO(alter_table): Support adding columns to sources.
818                let CatalogItem::Table(table) = &mut new_entry.item else {
819                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
820                };
821                table.collections.insert(version, new_global_id);
822
823                tx.update_item(id, new_entry.into())?;
824                storage_collections_to_register.insert(new_global_id, shard_id);
825            }
826            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
827                let mut new_entry = state.get_entry(&id).clone();
828                let replacement = state.get_entry(&replacement_id);
829
830                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
831                    return Err(AdapterError::internal(
832                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
833                        "id must refer to a materialized view",
834                    ));
835                };
836                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
837                    return Err(AdapterError::internal(
838                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
839                        "replacement_id must refer to a materialized view",
840                    ));
841                };
842
843                mv.apply_replacement(replacement_mv.clone());
844
845                tx.remove_item(replacement_id)?;
846                tx.update_item(id, new_entry.into())?;
847
848                // TODO(alter-mv): audit logging
849            }
850            Op::CreateDatabase { name, owner_id } => {
851                let database_owner_privileges = vec![rbac::owner_privilege(
852                    mz_sql::catalog::ObjectType::Database,
853                    owner_id,
854                )];
855                let database_default_privileges = state
856                    .default_privileges
857                    .get_applicable_privileges(
858                        owner_id,
859                        None,
860                        None,
861                        mz_sql::catalog::ObjectType::Database,
862                    )
863                    .map(|item| item.mz_acl_item(owner_id));
864                let database_privileges: Vec<_> = merge_mz_acl_items(
865                    database_owner_privileges
866                        .into_iter()
867                        .chain(database_default_privileges),
868                )
869                .collect();
870
871                let schema_owner_privileges = vec![rbac::owner_privilege(
872                    mz_sql::catalog::ObjectType::Schema,
873                    owner_id,
874                )];
875                let schema_default_privileges = state
876                    .default_privileges
877                    .get_applicable_privileges(
878                        owner_id,
879                        None,
880                        None,
881                        mz_sql::catalog::ObjectType::Schema,
882                    )
883                    .map(|item| item.mz_acl_item(owner_id))
884                    // Special default privilege on public schemas.
885                    .chain(std::iter::once(MzAclItem {
886                        grantee: RoleId::Public,
887                        grantor: owner_id,
888                        acl_mode: AclMode::USAGE,
889                    }));
890                let schema_privileges: Vec<_> = merge_mz_acl_items(
891                    schema_owner_privileges
892                        .into_iter()
893                        .chain(schema_default_privileges),
894                )
895                .collect();
896
897                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
898                let (database_id, _) = tx.insert_user_database(
899                    &name,
900                    owner_id,
901                    database_privileges.clone(),
902                    &temporary_oids,
903                )?;
904                let (schema_id, _) = tx.insert_user_schema(
905                    database_id,
906                    DEFAULT_SCHEMA,
907                    owner_id,
908                    schema_privileges.clone(),
909                    &temporary_oids,
910                )?;
911                CatalogState::add_to_audit_log(
912                    &state.system_configuration,
913                    oracle_write_ts,
914                    session,
915                    tx,
916                    audit_events,
917                    EventType::Create,
918                    ObjectType::Database,
919                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
920                        id: database_id.to_string(),
921                        name: name.clone(),
922                    }),
923                )?;
924                info!("create database {}", name);
925
926                CatalogState::add_to_audit_log(
927                    &state.system_configuration,
928                    oracle_write_ts,
929                    session,
930                    tx,
931                    audit_events,
932                    EventType::Create,
933                    ObjectType::Schema,
934                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
935                        id: schema_id.to_string(),
936                        name: DEFAULT_SCHEMA.to_string(),
937                        database_name: Some(name),
938                    }),
939                )?;
940            }
941            Op::CreateSchema {
942                database_id,
943                schema_name,
944                owner_id,
945            } => {
946                if is_reserved_name(&schema_name) {
947                    return Err(AdapterError::Catalog(Error::new(
948                        ErrorKind::ReservedSchemaName(schema_name),
949                    )));
950                }
951                let database_id = match database_id {
952                    ResolvedDatabaseSpecifier::Id(id) => id,
953                    ResolvedDatabaseSpecifier::Ambient => {
954                        return Err(AdapterError::Catalog(Error::new(
955                            ErrorKind::ReadOnlySystemSchema(schema_name),
956                        )));
957                    }
958                };
959                let owner_privileges = vec![rbac::owner_privilege(
960                    mz_sql::catalog::ObjectType::Schema,
961                    owner_id,
962                )];
963                let default_privileges = state
964                    .default_privileges
965                    .get_applicable_privileges(
966                        owner_id,
967                        Some(database_id),
968                        None,
969                        mz_sql::catalog::ObjectType::Schema,
970                    )
971                    .map(|item| item.mz_acl_item(owner_id));
972                let privileges: Vec<_> =
973                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
974                        .collect();
975                let (schema_id, _) = tx.insert_user_schema(
976                    database_id,
977                    &schema_name,
978                    owner_id,
979                    privileges.clone(),
980                    &state.get_temporary_oids().collect(),
981                )?;
982                CatalogState::add_to_audit_log(
983                    &state.system_configuration,
984                    oracle_write_ts,
985                    session,
986                    tx,
987                    audit_events,
988                    EventType::Create,
989                    ObjectType::Schema,
990                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
991                        id: schema_id.to_string(),
992                        name: schema_name.clone(),
993                        database_name: Some(state.database_by_id[&database_id].name.clone()),
994                    }),
995                )?;
996            }
997            Op::CreateRole { name, attributes } => {
998                if is_reserved_role_name(&name) {
999                    return Err(AdapterError::Catalog(Error::new(
1000                        ErrorKind::ReservedRoleName(name),
1001                    )));
1002                }
1003                let membership = RoleMembership::new();
1004                let vars = RoleVars::default();
1005                let (id, _) = tx.insert_user_role(
1006                    name.clone(),
1007                    attributes.clone(),
1008                    membership.clone(),
1009                    vars.clone(),
1010                    &state.get_temporary_oids().collect(),
1011                )?;
1012                CatalogState::add_to_audit_log(
1013                    &state.system_configuration,
1014                    oracle_write_ts,
1015                    session,
1016                    tx,
1017                    audit_events,
1018                    EventType::Create,
1019                    ObjectType::Role,
1020                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1021                        id: id.to_string(),
1022                        name: name.clone(),
1023                    }),
1024                )?;
1025                info!("create role {}", name);
1026            }
1027            Op::CreateCluster {
1028                id,
1029                name,
1030                introspection_sources,
1031                owner_id,
1032                config,
1033            } => {
1034                if is_reserved_name(&name) {
1035                    return Err(AdapterError::Catalog(Error::new(
1036                        ErrorKind::ReservedClusterName(name),
1037                    )));
1038                }
1039                let owner_privileges = vec![rbac::owner_privilege(
1040                    mz_sql::catalog::ObjectType::Cluster,
1041                    owner_id,
1042                )];
1043                let default_privileges = state
1044                    .default_privileges
1045                    .get_applicable_privileges(
1046                        owner_id,
1047                        None,
1048                        None,
1049                        mz_sql::catalog::ObjectType::Cluster,
1050                    )
1051                    .map(|item| item.mz_acl_item(owner_id));
1052                let privileges: Vec<_> =
1053                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1054                        .collect();
1055                let introspection_source_ids: Vec<_> = introspection_sources
1056                    .iter()
1057                    .map(|introspection_source| {
1058                        Transaction::allocate_introspection_source_index_id(
1059                            &id,
1060                            introspection_source.variant,
1061                        )
1062                    })
1063                    .collect();
1064
1065                let introspection_sources = introspection_sources
1066                    .into_iter()
1067                    .zip_eq(introspection_source_ids)
1068                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1069                    .collect();
1070
1071                tx.insert_user_cluster(
1072                    id,
1073                    &name,
1074                    introspection_sources,
1075                    owner_id,
1076                    privileges.clone(),
1077                    config.clone().into(),
1078                    &state.get_temporary_oids().collect(),
1079                )?;
1080                CatalogState::add_to_audit_log(
1081                    &state.system_configuration,
1082                    oracle_write_ts,
1083                    session,
1084                    tx,
1085                    audit_events,
1086                    EventType::Create,
1087                    ObjectType::Cluster,
1088                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1089                        id: id.to_string(),
1090                        name: name.clone(),
1091                    }),
1092                )?;
1093                info!("create cluster {}", name);
1094            }
1095            Op::CreateClusterReplica {
1096                cluster_id,
1097                name,
1098                config,
1099                owner_id,
1100                reason,
1101            } => {
1102                if is_reserved_name(&name) {
1103                    return Err(AdapterError::Catalog(Error::new(
1104                        ErrorKind::ReservedReplicaName(name),
1105                    )));
1106                }
1107                let cluster = state.get_cluster(cluster_id);
1108                let id =
1109                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1110                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1111                    size,
1112                    billed_as,
1113                    internal,
1114                    ..
1115                }) = &config.location
1116                {
1117                    let (reason, scheduling_policies) = reason.into_audit_log();
1118                    let details = EventDetails::CreateClusterReplicaV4(
1119                        mz_audit_log::CreateClusterReplicaV4 {
1120                            cluster_id: cluster_id.to_string(),
1121                            cluster_name: cluster.name.clone(),
1122                            replica_id: Some(id.to_string()),
1123                            replica_name: name.clone(),
1124                            logical_size: size.clone(),
1125                            billed_as: billed_as.clone(),
1126                            internal: *internal,
1127                            reason,
1128                            scheduling_policies,
1129                        },
1130                    );
1131                    CatalogState::add_to_audit_log(
1132                        &state.system_configuration,
1133                        oracle_write_ts,
1134                        session,
1135                        tx,
1136                        audit_events,
1137                        EventType::Create,
1138                        ObjectType::ClusterReplica,
1139                        details,
1140                    )?;
1141                }
1142            }
1143            Op::CreateItem {
1144                id,
1145                name,
1146                item,
1147                owner_id,
1148            } => {
1149                state.check_unstable_dependencies(&item)?;
1150
1151                match &item {
1152                    CatalogItem::Table(table) => {
1153                        let gids: Vec<_> = table.global_ids().collect();
1154                        assert_eq!(gids.len(), 1);
1155                        storage_collections_to_create.extend(gids);
1156                    }
1157                    CatalogItem::Source(source) => {
1158                        storage_collections_to_create.insert(source.global_id());
1159                    }
1160                    CatalogItem::MaterializedView(mv) => {
1161                        let mv_gid = mv.global_id_writes();
1162                        if let Some(target_id) = mv.replacement_target {
1163                            let target_gid = state.get_entry(&target_id).latest_global_id();
1164                            let shard_id =
1165                                state.storage_metadata().get_collection_shard(target_gid)?;
1166                            storage_collections_to_register.insert(mv_gid, shard_id);
1167                        } else {
1168                            storage_collections_to_create.insert(mv_gid);
1169                        }
1170                    }
1171                    CatalogItem::ContinualTask(ct) => {
1172                        storage_collections_to_create.insert(ct.global_id());
1173                    }
1174                    CatalogItem::Sink(sink) => {
1175                        storage_collections_to_create.insert(sink.global_id());
1176                    }
1177                    CatalogItem::Log(_)
1178                    | CatalogItem::View(_)
1179                    | CatalogItem::Index(_)
1180                    | CatalogItem::Type(_)
1181                    | CatalogItem::Func(_)
1182                    | CatalogItem::Secret(_)
1183                    | CatalogItem::Connection(_) => (),
1184                }
1185
1186                let system_user = session.map_or(false, |s| s.user().is_system_user());
1187                if !system_user {
1188                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1189                        let cluster_name = state.clusters_by_id[&id].name.clone();
1190                        return Err(AdapterError::Catalog(Error::new(
1191                            ErrorKind::ReadOnlyCluster(cluster_name),
1192                        )));
1193                    }
1194                }
1195
1196                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1197                let default_privileges = state
1198                    .default_privileges
1199                    .get_applicable_privileges(
1200                        owner_id,
1201                        name.qualifiers.database_spec.id(),
1202                        Some(name.qualifiers.schema_spec.into()),
1203                        item.typ().into(),
1204                    )
1205                    .map(|item| item.mz_acl_item(owner_id));
1206                // mz_support can read all progress sources.
1207                let progress_source_privilege = if item.is_progress_source() {
1208                    Some(MzAclItem {
1209                        grantee: MZ_SUPPORT_ROLE_ID,
1210                        grantor: owner_id,
1211                        acl_mode: AclMode::SELECT,
1212                    })
1213                } else {
1214                    None
1215                };
1216                let privileges: Vec<_> = merge_mz_acl_items(
1217                    owner_privileges
1218                        .into_iter()
1219                        .chain(default_privileges)
1220                        .chain(progress_source_privilege),
1221                )
1222                .collect();
1223
1224                let temporary_oids = state.get_temporary_oids().collect();
1225
1226                if item.is_temporary() {
1227                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1228                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1229                    {
1230                        return Err(AdapterError::Catalog(Error::new(
1231                            ErrorKind::InvalidTemporarySchema,
1232                        )));
1233                    }
1234                    let oid = tx.allocate_oid(&temporary_oids)?;
1235
1236                    let schema_id = name.qualifiers.schema_spec.clone().into();
1237                    let item_type = item.typ();
1238                    let (create_sql, global_id, versions) = item.to_serialized();
1239
1240                    let item = TemporaryItem {
1241                        id,
1242                        oid,
1243                        global_id,
1244                        schema_id,
1245                        name: name.item.clone(),
1246                        create_sql,
1247                        conn_id: item.conn_id().cloned(),
1248                        owner_id,
1249                        privileges: privileges.clone(),
1250                        extra_versions: versions,
1251                    };
1252                    temporary_item_updates.push((item, StateDiff::Addition));
1253
1254                    info!(
1255                        "create temporary {} {} ({})",
1256                        item_type,
1257                        state.resolve_full_name(&name, None),
1258                        id
1259                    );
1260                } else {
1261                    if let Some(temp_id) =
1262                        item.uses()
1263                            .iter()
1264                            .find(|id| match state.try_get_entry(*id) {
1265                                Some(entry) => entry.item().is_temporary(),
1266                                None => temporary_ids.contains(id),
1267                            })
1268                    {
1269                        let temp_item = state.get_entry(temp_id);
1270                        return Err(AdapterError::Catalog(Error::new(
1271                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1272                        )));
1273                    }
1274                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1275                        && !system_user
1276                    {
1277                        let schema_name = state
1278                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1279                            .schema;
1280                        return Err(AdapterError::Catalog(Error::new(
1281                            ErrorKind::ReadOnlySystemSchema(schema_name),
1282                        )));
1283                    }
1284                    let schema_id = name.qualifiers.schema_spec.clone().into();
1285                    let item_type = item.typ();
1286                    let (create_sql, global_id, versions) = item.to_serialized();
1287                    tx.insert_user_item(
1288                        id,
1289                        global_id,
1290                        schema_id,
1291                        &name.item,
1292                        create_sql,
1293                        owner_id,
1294                        privileges.clone(),
1295                        &temporary_oids,
1296                        versions,
1297                    )?;
1298                    info!(
1299                        "create {} {} ({})",
1300                        item_type,
1301                        state.resolve_full_name(&name, None),
1302                        id
1303                    );
1304                }
1305
1306                if Self::should_audit_log_item(&item) {
1307                    let name = Self::full_name_detail(
1308                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1309                    );
1310                    let details = match &item {
1311                        CatalogItem::Source(s) => {
1312                            let cluster_id = match s.data_source {
1313                                // Ingestion exports don't have their own cluster, but
1314                                // run on their ingestion's cluster.
1315                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1316                                    match state.get_entry(&ingestion_id).cluster_id() {
1317                                        Some(cluster_id) => Some(cluster_id.to_string()),
1318                                        None => None,
1319                                    }
1320                                }
1321                                _ => match item.cluster_id() {
1322                                    Some(cluster_id) => Some(cluster_id.to_string()),
1323                                    None => None,
1324                                },
1325                            };
1326
1327                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1328                                id: id.to_string(),
1329                                cluster_id,
1330                                name,
1331                                external_type: s.source_type().to_string(),
1332                            })
1333                        }
1334                        CatalogItem::Sink(s) => {
1335                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1336                                id: id.to_string(),
1337                                cluster_id: Some(s.cluster_id.to_string()),
1338                                name,
1339                                external_type: s.sink_type().to_string(),
1340                            })
1341                        }
1342                        CatalogItem::Index(i) => {
1343                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1344                                id: id.to_string(),
1345                                name,
1346                                cluster_id: i.cluster_id.to_string(),
1347                            })
1348                        }
1349                        CatalogItem::MaterializedView(mv) => {
1350                            EventDetails::CreateMaterializedViewV1(
1351                                mz_audit_log::CreateMaterializedViewV1 {
1352                                    id: id.to_string(),
1353                                    name,
1354                                    cluster_id: mv.cluster_id.to_string(),
1355                                },
1356                            )
1357                        }
1358                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1359                            id: id.to_string(),
1360                            name,
1361                        }),
1362                    };
1363                    CatalogState::add_to_audit_log(
1364                        &state.system_configuration,
1365                        oracle_write_ts,
1366                        session,
1367                        tx,
1368                        audit_events,
1369                        EventType::Create,
1370                        catalog_type_to_audit_object_type(item.typ()),
1371                        details,
1372                    )?;
1373                }
1374            }
1375            Op::CreateNetworkPolicy {
1376                rules,
1377                name,
1378                owner_id,
1379            } => {
1380                if state.network_policies_by_name.contains_key(&name) {
1381                    return Err(AdapterError::PlanError(PlanError::Catalog(
1382                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1383                    )));
1384                }
1385                if is_reserved_name(&name) {
1386                    return Err(AdapterError::Catalog(Error::new(
1387                        ErrorKind::ReservedNetworkPolicyName(name),
1388                    )));
1389                }
1390
1391                let owner_privileges = vec![rbac::owner_privilege(
1392                    mz_sql::catalog::ObjectType::NetworkPolicy,
1393                    owner_id,
1394                )];
1395                let default_privileges = state
1396                    .default_privileges
1397                    .get_applicable_privileges(
1398                        owner_id,
1399                        None,
1400                        None,
1401                        mz_sql::catalog::ObjectType::NetworkPolicy,
1402                    )
1403                    .map(|item| item.mz_acl_item(owner_id));
1404                let privileges: Vec<_> =
1405                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1406                        .collect();
1407
1408                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1409                let id = tx.insert_user_network_policy(
1410                    name.clone(),
1411                    rules,
1412                    privileges,
1413                    owner_id,
1414                    &temporary_oids,
1415                )?;
1416
1417                CatalogState::add_to_audit_log(
1418                    &state.system_configuration,
1419                    oracle_write_ts,
1420                    session,
1421                    tx,
1422                    audit_events,
1423                    EventType::Create,
1424                    ObjectType::NetworkPolicy,
1425                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1426                        id: id.to_string(),
1427                        name: name.clone(),
1428                    }),
1429                )?;
1430
1431                info!("created network policy {name} ({id})");
1432            }
1433            Op::Comment {
1434                object_id,
1435                sub_component,
1436                comment,
1437            } => {
1438                tx.update_comment(object_id, sub_component, comment)?;
1439                let entry = state.get_comment_id_entry(&object_id);
1440                let should_log = entry
1441                    .map(|entry| Self::should_audit_log_item(entry.item()))
1442                    // Things that aren't catalog entries can't be temp, so should be logged.
1443                    .unwrap_or(true);
1444                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1445                // comments won't be logged for now.
1446                if let (Some(conn_id), true) =
1447                    (session.map(|session| session.conn_id()), should_log)
1448                {
1449                    CatalogState::add_to_audit_log(
1450                        &state.system_configuration,
1451                        oracle_write_ts,
1452                        session,
1453                        tx,
1454                        audit_events,
1455                        EventType::Comment,
1456                        comment_id_to_audit_object_type(object_id),
1457                        EventDetails::IdNameV1(IdNameV1 {
1458                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1459                            id: format!("{object_id:?}"),
1460                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1461                        }),
1462                    )?;
1463                }
1464            }
1465            Op::UpdateSourceReferences {
1466                source_id,
1467                references,
1468            } => {
1469                tx.update_source_references(
1470                    source_id,
1471                    references
1472                        .references
1473                        .into_iter()
1474                        .map(|reference| reference.into())
1475                        .collect(),
1476                    references.updated_at,
1477                )?;
1478            }
1479            Op::DropObjects(drop_object_infos) => {
1480                // Generate all of the objects that need to get dropped.
1481                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1482
1483                // Drop any associated comments.
1484                tx.drop_comments(&delta.comments)?;
1485
1486                // Drop any items.
1487                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1488                    delta
1489                        .items
1490                        .iter()
1491                        .map(|id| id)
1492                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1493                tx.remove_items(&durable_items_to_drop)?;
1494                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1495                    let entry = state.get_entry(&id);
1496                    (entry.clone().into(), StateDiff::Retraction)
1497                }));
1498
1499                for item_id in delta.items {
1500                    let entry = state.get_entry(&item_id);
1501
1502                    if entry.item().is_storage_collection() {
1503                        storage_collections_to_drop.extend(entry.global_ids());
1504                    }
1505
1506                    if state.source_references.contains_key(&item_id) {
1507                        tx.remove_source_references(item_id)?;
1508                    }
1509
1510                    if Self::should_audit_log_item(entry.item()) {
1511                        CatalogState::add_to_audit_log(
1512                            &state.system_configuration,
1513                            oracle_write_ts,
1514                            session,
1515                            tx,
1516                            audit_events,
1517                            EventType::Drop,
1518                            catalog_type_to_audit_object_type(entry.item().typ()),
1519                            EventDetails::IdFullNameV1(IdFullNameV1 {
1520                                id: item_id.to_string(),
1521                                name: Self::full_name_detail(&state.resolve_full_name(
1522                                    entry.name(),
1523                                    session.map(|session| session.conn_id()),
1524                                )),
1525                            }),
1526                        )?;
1527                    }
1528                    info!(
1529                        "drop {} {} ({})",
1530                        entry.item_type(),
1531                        state.resolve_full_name(entry.name(), entry.conn_id()),
1532                        item_id
1533                    );
1534                }
1535
1536                // Drop any schemas.
1537                let schemas = delta
1538                    .schemas
1539                    .iter()
1540                    .map(|(schema_spec, database_spec)| {
1541                        (SchemaId::from(schema_spec), *database_spec)
1542                    })
1543                    .collect();
1544                tx.remove_schemas(&schemas)?;
1545
1546                for (schema_spec, database_spec) in delta.schemas {
1547                    let schema = state.get_schema(
1548                        &database_spec,
1549                        &schema_spec,
1550                        session
1551                            .map(|session| session.conn_id())
1552                            .unwrap_or(&SYSTEM_CONN_ID),
1553                    );
1554
1555                    let schema_id = SchemaId::from(schema_spec);
1556                    let database_id = match database_spec {
1557                        ResolvedDatabaseSpecifier::Ambient => None,
1558                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1559                    };
1560
1561                    CatalogState::add_to_audit_log(
1562                        &state.system_configuration,
1563                        oracle_write_ts,
1564                        session,
1565                        tx,
1566                        audit_events,
1567                        EventType::Drop,
1568                        ObjectType::Schema,
1569                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1570                            id: schema_id.to_string(),
1571                            name: schema.name.schema.to_string(),
1572                            database_name: database_id
1573                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1574                        }),
1575                    )?;
1576                }
1577
1578                // Drop any databases.
1579                tx.remove_databases(&delta.databases)?;
1580
1581                for database_id in delta.databases {
1582                    let database = state.get_database(&database_id).clone();
1583
1584                    CatalogState::add_to_audit_log(
1585                        &state.system_configuration,
1586                        oracle_write_ts,
1587                        session,
1588                        tx,
1589                        audit_events,
1590                        EventType::Drop,
1591                        ObjectType::Database,
1592                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1593                            id: database_id.to_string(),
1594                            name: database.name.clone(),
1595                        }),
1596                    )?;
1597                }
1598
1599                // Drop any roles.
1600                tx.remove_user_roles(&delta.roles)?;
1601
1602                for role_id in delta.roles {
1603                    let role = state
1604                        .roles_by_id
1605                        .get(&role_id)
1606                        .expect("catalog out of sync");
1607
1608                    CatalogState::add_to_audit_log(
1609                        &state.system_configuration,
1610                        oracle_write_ts,
1611                        session,
1612                        tx,
1613                        audit_events,
1614                        EventType::Drop,
1615                        ObjectType::Role,
1616                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1617                            id: role.id.to_string(),
1618                            name: role.name.clone(),
1619                        }),
1620                    )?;
1621                    info!("drop role {}", role.name());
1622                }
1623
1624                // Drop any network policies.
1625                tx.remove_network_policies(&delta.network_policies)?;
1626
1627                for network_policy_id in delta.network_policies {
1628                    let policy = state
1629                        .network_policies_by_id
1630                        .get(&network_policy_id)
1631                        .expect("catalog out of sync");
1632
1633                    CatalogState::add_to_audit_log(
1634                        &state.system_configuration,
1635                        oracle_write_ts,
1636                        session,
1637                        tx,
1638                        audit_events,
1639                        EventType::Drop,
1640                        ObjectType::NetworkPolicy,
1641                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1642                            id: policy.id.to_string(),
1643                            name: policy.name.clone(),
1644                        }),
1645                    )?;
1646                    info!("drop network policy {}", policy.name.clone());
1647                }
1648
1649                // Drop any replicas.
1650                let replicas = delta.replicas.keys().copied().collect();
1651                tx.remove_cluster_replicas(&replicas)?;
1652
1653                for (replica_id, (cluster_id, reason)) in delta.replicas {
1654                    let cluster = state.get_cluster(cluster_id);
1655                    let replica = cluster.replica(replica_id).expect("Must exist");
1656
1657                    let (reason, scheduling_policies) = reason.into_audit_log();
1658                    let details =
1659                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1660                            cluster_id: cluster_id.to_string(),
1661                            cluster_name: cluster.name.clone(),
1662                            replica_id: Some(replica_id.to_string()),
1663                            replica_name: replica.name.clone(),
1664                            reason,
1665                            scheduling_policies,
1666                        });
1667                    CatalogState::add_to_audit_log(
1668                        &state.system_configuration,
1669                        oracle_write_ts,
1670                        session,
1671                        tx,
1672                        audit_events,
1673                        EventType::Drop,
1674                        ObjectType::ClusterReplica,
1675                        details,
1676                    )?;
1677                }
1678
1679                // Drop any clusters.
1680                tx.remove_clusters(&delta.clusters)?;
1681
1682                for cluster_id in delta.clusters {
1683                    let cluster = state.get_cluster(cluster_id);
1684
1685                    CatalogState::add_to_audit_log(
1686                        &state.system_configuration,
1687                        oracle_write_ts,
1688                        session,
1689                        tx,
1690                        audit_events,
1691                        EventType::Drop,
1692                        ObjectType::Cluster,
1693                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1694                            id: cluster.id.to_string(),
1695                            name: cluster.name.clone(),
1696                        }),
1697                    )?;
1698                }
1699            }
1700            Op::GrantRole {
1701                role_id,
1702                member_id,
1703                grantor_id,
1704            } => {
1705                state.ensure_not_reserved_role(&member_id)?;
1706                state.ensure_grantable_role(&role_id)?;
1707                if state.collect_role_membership(&role_id).contains(&member_id) {
1708                    let group_role = state.get_role(&role_id);
1709                    let member_role = state.get_role(&member_id);
1710                    return Err(AdapterError::Catalog(Error::new(
1711                        ErrorKind::CircularRoleMembership {
1712                            role_name: group_role.name().to_string(),
1713                            member_name: member_role.name().to_string(),
1714                        },
1715                    )));
1716                }
1717                let mut member_role = state.get_role(&member_id).clone();
1718                member_role.membership.map.insert(role_id, grantor_id);
1719                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1720
1721                CatalogState::add_to_audit_log(
1722                    &state.system_configuration,
1723                    oracle_write_ts,
1724                    session,
1725                    tx,
1726                    audit_events,
1727                    EventType::Grant,
1728                    ObjectType::Role,
1729                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1730                        role_id: role_id.to_string(),
1731                        member_id: member_id.to_string(),
1732                        grantor_id: grantor_id.to_string(),
1733                        executed_by: session
1734                            .map(|session| session.authenticated_role_id())
1735                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1736                            .to_string(),
1737                    }),
1738                )?;
1739            }
1740            Op::RevokeRole {
1741                role_id,
1742                member_id,
1743                grantor_id,
1744            } => {
1745                state.ensure_not_reserved_role(&member_id)?;
1746                state.ensure_grantable_role(&role_id)?;
1747                let mut member_role = state.get_role(&member_id).clone();
1748                member_role.membership.map.remove(&role_id);
1749                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1750
1751                CatalogState::add_to_audit_log(
1752                    &state.system_configuration,
1753                    oracle_write_ts,
1754                    session,
1755                    tx,
1756                    audit_events,
1757                    EventType::Revoke,
1758                    ObjectType::Role,
1759                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1760                        role_id: role_id.to_string(),
1761                        member_id: member_id.to_string(),
1762                        grantor_id: grantor_id.to_string(),
1763                        executed_by: session
1764                            .map(|session| session.authenticated_role_id())
1765                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1766                            .to_string(),
1767                    }),
1768                )?;
1769            }
1770            Op::UpdatePrivilege {
1771                target_id,
1772                privilege,
1773                variant,
1774            } => {
1775                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1776                    UpdatePrivilegeVariant::Grant => {
1777                        privileges.grant(privilege);
1778                    }
1779                    UpdatePrivilegeVariant::Revoke => {
1780                        privileges.revoke(&privilege);
1781                    }
1782                };
1783                match &target_id {
1784                    SystemObjectId::Object(object_id) => match object_id {
1785                        ObjectId::Cluster(id) => {
1786                            let mut cluster = state.get_cluster(*id).clone();
1787                            update_privilege_fn(&mut cluster.privileges);
1788                            tx.update_cluster(*id, cluster.into())?;
1789                        }
1790                        ObjectId::Database(id) => {
1791                            let mut database = state.get_database(id).clone();
1792                            update_privilege_fn(&mut database.privileges);
1793                            tx.update_database(*id, database.into())?;
1794                        }
1795                        ObjectId::NetworkPolicy(id) => {
1796                            let mut policy = state.get_network_policy(id).clone();
1797                            update_privilege_fn(&mut policy.privileges);
1798                            tx.update_network_policy(*id, policy.into())?;
1799                        }
1800                        ObjectId::Schema((database_spec, schema_spec)) => {
1801                            let schema_id = schema_spec.clone().into();
1802                            let mut schema = state
1803                                .get_schema(
1804                                    database_spec,
1805                                    schema_spec,
1806                                    session
1807                                        .map(|session| session.conn_id())
1808                                        .unwrap_or(&SYSTEM_CONN_ID),
1809                                )
1810                                .clone();
1811                            update_privilege_fn(&mut schema.privileges);
1812                            tx.update_schema(schema_id, schema.into())?;
1813                        }
1814                        ObjectId::Item(id) => {
1815                            let entry = state.get_entry(id);
1816                            let mut new_entry = entry.clone();
1817                            update_privilege_fn(&mut new_entry.privileges);
1818                            if !new_entry.item().is_temporary() {
1819                                tx.update_item(*id, new_entry.into())?;
1820                            } else {
1821                                temporary_item_updates
1822                                    .push((entry.clone().into(), StateDiff::Retraction));
1823                                temporary_item_updates
1824                                    .push((new_entry.into(), StateDiff::Addition));
1825                            }
1826                        }
1827                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1828                    },
1829                    SystemObjectId::System => {
1830                        let mut system_privileges = state.system_privileges.clone();
1831                        update_privilege_fn(&mut system_privileges);
1832                        let new_privilege =
1833                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1834                        tx.set_system_privilege(
1835                            privilege.grantee,
1836                            privilege.grantor,
1837                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1838                        )?;
1839                    }
1840                }
1841                let object_type = state.get_system_object_type(&target_id);
1842                let object_id_str = match &target_id {
1843                    SystemObjectId::System => "SYSTEM".to_string(),
1844                    SystemObjectId::Object(id) => id.to_string(),
1845                };
1846                CatalogState::add_to_audit_log(
1847                    &state.system_configuration,
1848                    oracle_write_ts,
1849                    session,
1850                    tx,
1851                    audit_events,
1852                    variant.into(),
1853                    system_object_type_to_audit_object_type(&object_type),
1854                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1855                        object_id: object_id_str,
1856                        grantee_id: privilege.grantee.to_string(),
1857                        grantor_id: privilege.grantor.to_string(),
1858                        privileges: privilege.acl_mode.to_string(),
1859                    }),
1860                )?;
1861            }
1862            Op::UpdateDefaultPrivilege {
1863                privilege_object,
1864                privilege_acl_item,
1865                variant,
1866            } => {
1867                let mut default_privileges = state.default_privileges.clone();
1868                match variant {
1869                    UpdatePrivilegeVariant::Grant => default_privileges
1870                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1871                    UpdatePrivilegeVariant::Revoke => {
1872                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1873                    }
1874                }
1875                let new_acl_mode = default_privileges
1876                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1877                tx.set_default_privilege(
1878                    privilege_object.role_id,
1879                    privilege_object.database_id,
1880                    privilege_object.schema_id,
1881                    privilege_object.object_type,
1882                    privilege_acl_item.grantee,
1883                    new_acl_mode.cloned(),
1884                )?;
1885                CatalogState::add_to_audit_log(
1886                    &state.system_configuration,
1887                    oracle_write_ts,
1888                    session,
1889                    tx,
1890                    audit_events,
1891                    variant.into(),
1892                    object_type_to_audit_object_type(privilege_object.object_type),
1893                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1894                        role_id: privilege_object.role_id.to_string(),
1895                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1896                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1897                        grantee_id: privilege_acl_item.grantee.to_string(),
1898                        privileges: privilege_acl_item.acl_mode.to_string(),
1899                    }),
1900                )?;
1901            }
1902            Op::RenameCluster {
1903                id,
1904                name,
1905                to_name,
1906                check_reserved_names,
1907            } => {
1908                if id.is_system() {
1909                    return Err(AdapterError::Catalog(Error::new(
1910                        ErrorKind::ReadOnlyCluster(name.clone()),
1911                    )));
1912                }
1913                if check_reserved_names && is_reserved_name(&to_name) {
1914                    return Err(AdapterError::Catalog(Error::new(
1915                        ErrorKind::ReservedClusterName(to_name),
1916                    )));
1917                }
1918                tx.rename_cluster(id, &name, &to_name)?;
1919                CatalogState::add_to_audit_log(
1920                    &state.system_configuration,
1921                    oracle_write_ts,
1922                    session,
1923                    tx,
1924                    audit_events,
1925                    EventType::Alter,
1926                    ObjectType::Cluster,
1927                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1928                        id: id.to_string(),
1929                        old_name: name.clone(),
1930                        new_name: to_name.clone(),
1931                    }),
1932                )?;
1933                info!("rename cluster {name} to {to_name}");
1934            }
1935            Op::RenameClusterReplica {
1936                cluster_id,
1937                replica_id,
1938                name,
1939                to_name,
1940            } => {
1941                if is_reserved_name(&to_name) {
1942                    return Err(AdapterError::Catalog(Error::new(
1943                        ErrorKind::ReservedReplicaName(to_name),
1944                    )));
1945                }
1946                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1947                CatalogState::add_to_audit_log(
1948                    &state.system_configuration,
1949                    oracle_write_ts,
1950                    session,
1951                    tx,
1952                    audit_events,
1953                    EventType::Alter,
1954                    ObjectType::ClusterReplica,
1955                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1956                        cluster_id: cluster_id.to_string(),
1957                        replica_id: replica_id.to_string(),
1958                        old_name: name.replica.as_str().to_string(),
1959                        new_name: to_name.clone(),
1960                    }),
1961                )?;
1962                info!("rename cluster replica {name} to {to_name}");
1963            }
1964            Op::RenameItem {
1965                id,
1966                to_name,
1967                current_full_name,
1968            } => {
1969                let mut updates = Vec::new();
1970
1971                let entry = state.get_entry(&id);
1972                if let CatalogItem::Type(_) = entry.item() {
1973                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1974                        current_full_name.to_string(),
1975                    ))));
1976                }
1977
1978                if entry.id().is_system() {
1979                    let name = state
1980                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1981                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1982                        name.to_string(),
1983                    ))));
1984                }
1985
1986                let mut to_full_name = current_full_name.clone();
1987                to_full_name.item.clone_from(&to_name);
1988
1989                let mut to_qualified_name = entry.name().clone();
1990                to_qualified_name.item.clone_from(&to_name);
1991
1992                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1993                    id: id.to_string(),
1994                    old_name: Self::full_name_detail(&current_full_name),
1995                    new_name: Self::full_name_detail(&to_full_name),
1996                });
1997                if Self::should_audit_log_item(entry.item()) {
1998                    CatalogState::add_to_audit_log(
1999                        &state.system_configuration,
2000                        oracle_write_ts,
2001                        session,
2002                        tx,
2003                        audit_events,
2004                        EventType::Alter,
2005                        catalog_type_to_audit_object_type(entry.item().typ()),
2006                        details,
2007                    )?;
2008                }
2009
2010                // Rename item itself.
2011                let mut new_entry = entry.clone();
2012                new_entry.name.item.clone_from(&to_name);
2013                new_entry.item = entry
2014                    .item()
2015                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2016                    .map_err(|e| {
2017                        Error::new(ErrorKind::from(AmbiguousRename {
2018                            depender: state
2019                                .resolve_full_name(entry.name(), entry.conn_id())
2020                                .to_string(),
2021                            dependee: state
2022                                .resolve_full_name(entry.name(), entry.conn_id())
2023                                .to_string(),
2024                            message: e,
2025                        }))
2026                    })?;
2027
2028                for id in entry.referenced_by() {
2029                    let dependent_item = state.get_entry(id);
2030                    let mut to_entry = dependent_item.clone();
2031                    to_entry.item = dependent_item
2032                        .item()
2033                        .rename_item_refs(
2034                            current_full_name.clone(),
2035                            to_full_name.item.clone(),
2036                            false,
2037                        )
2038                        .map_err(|e| {
2039                            Error::new(ErrorKind::from(AmbiguousRename {
2040                                depender: state
2041                                    .resolve_full_name(
2042                                        dependent_item.name(),
2043                                        dependent_item.conn_id(),
2044                                    )
2045                                    .to_string(),
2046                                dependee: state
2047                                    .resolve_full_name(entry.name(), entry.conn_id())
2048                                    .to_string(),
2049                                message: e,
2050                            }))
2051                        })?;
2052
2053                    if !to_entry.item().is_temporary() {
2054                        tx.update_item(*id, to_entry.into())?;
2055                    } else {
2056                        temporary_item_updates
2057                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2058                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2059                    }
2060                    updates.push(*id);
2061                }
2062                if !new_entry.item().is_temporary() {
2063                    tx.update_item(id, new_entry.into())?;
2064                } else {
2065                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2066                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2067                }
2068
2069                updates.push(id);
2070                for id in updates {
2071                    Self::log_update(state, &id);
2072                }
2073            }
2074            Op::RenameSchema {
2075                database_spec,
2076                schema_spec,
2077                new_name,
2078                check_reserved_names,
2079            } => {
2080                if check_reserved_names && is_reserved_name(&new_name) {
2081                    return Err(AdapterError::Catalog(Error::new(
2082                        ErrorKind::ReservedSchemaName(new_name),
2083                    )));
2084                }
2085
2086                let conn_id = session
2087                    .map(|session| session.conn_id())
2088                    .unwrap_or(&SYSTEM_CONN_ID);
2089
2090                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2091                let cur_name = schema.name().schema.clone();
2092
2093                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2094                    return Err(AdapterError::Catalog(Error::new(
2095                        ErrorKind::AmbientSchemaRename(cur_name),
2096                    )));
2097                };
2098                let database = state.get_database(&database_id);
2099                let database_name = &database.name;
2100
2101                let mut updates = Vec::new();
2102                let mut items_to_update = BTreeMap::new();
2103
2104                let mut update_item = |id| {
2105                    if items_to_update.contains_key(id) {
2106                        return Ok(());
2107                    }
2108
2109                    let entry = state.get_entry(id);
2110
2111                    // Update our item.
2112                    let mut new_entry = entry.clone();
2113                    new_entry.item = entry
2114                        .item
2115                        .rename_schema_refs(database_name, &cur_name, &new_name)
2116                        .map_err(|(s, _i)| {
2117                            Error::new(ErrorKind::from(AmbiguousRename {
2118                                depender: state
2119                                    .resolve_full_name(entry.name(), entry.conn_id())
2120                                    .to_string(),
2121                                dependee: format!("{database_name}.{cur_name}"),
2122                                message: format!("ambiguous reference to schema named {s}"),
2123                            }))
2124                        })?;
2125
2126                    // Queue updates for Catalog storage and Builtin Tables.
2127                    if !new_entry.item().is_temporary() {
2128                        items_to_update.insert(*id, new_entry.into());
2129                    } else {
2130                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2131                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2132                    }
2133                    updates.push(id);
2134
2135                    Ok::<_, AdapterError>(())
2136                };
2137
2138                // Update all of the items in the schema.
2139                for (_name, item_id) in &schema.items {
2140                    // Update the item itself.
2141                    update_item(item_id)?;
2142
2143                    // Update everything that depends on this item.
2144                    for id in state.get_entry(item_id).referenced_by() {
2145                        update_item(id)?;
2146                    }
2147                }
2148                // Note: When updating the transaction it's very important that we update the
2149                // items as a whole group, otherwise we exhibit quadratic behavior.
2150                tx.update_items(items_to_update)?;
2151
2152                // Renaming temporary schemas is not supported.
2153                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2154                    let schema_name = schema.name().schema.clone();
2155                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2156                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2157                    )));
2158                };
2159
2160                // Add an entry to the audit log.
2161                let database_name = database_spec
2162                    .id()
2163                    .map(|id| state.get_database(&id).name.clone());
2164                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2165                    id: schema_id.to_string(),
2166                    old_name: schema.name().schema.clone(),
2167                    new_name: new_name.clone(),
2168                    database_name,
2169                });
2170                CatalogState::add_to_audit_log(
2171                    &state.system_configuration,
2172                    oracle_write_ts,
2173                    session,
2174                    tx,
2175                    audit_events,
2176                    EventType::Alter,
2177                    mz_audit_log::ObjectType::Schema,
2178                    details,
2179                )?;
2180
2181                // Update the schema itself.
2182                let mut new_schema = schema.clone();
2183                new_schema.name.schema.clone_from(&new_name);
2184                tx.update_schema(schema_id, new_schema.into())?;
2185
2186                for id in updates {
2187                    Self::log_update(state, id);
2188                }
2189            }
2190            Op::UpdateOwner { id, new_owner } => {
2191                let conn_id = session
2192                    .map(|session| session.conn_id())
2193                    .unwrap_or(&SYSTEM_CONN_ID);
2194                let old_owner = state
2195                    .get_owner_id(&id, conn_id)
2196                    .expect("cannot update the owner of an object without an owner");
2197                match &id {
2198                    ObjectId::Cluster(id) => {
2199                        let mut cluster = state.get_cluster(*id).clone();
2200                        if id.is_system() {
2201                            return Err(AdapterError::Catalog(Error::new(
2202                                ErrorKind::ReadOnlyCluster(cluster.name),
2203                            )));
2204                        }
2205                        Self::update_privilege_owners(
2206                            &mut cluster.privileges,
2207                            cluster.owner_id,
2208                            new_owner,
2209                        );
2210                        cluster.owner_id = new_owner;
2211                        tx.update_cluster(*id, cluster.into())?;
2212                    }
2213                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2214                        let cluster = state.get_cluster(*cluster_id);
2215                        let mut replica = cluster
2216                            .replica(*replica_id)
2217                            .expect("catalog out of sync")
2218                            .clone();
2219                        if replica_id.is_system() {
2220                            return Err(AdapterError::Catalog(Error::new(
2221                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2222                            )));
2223                        }
2224                        replica.owner_id = new_owner;
2225                        tx.update_cluster_replica(*replica_id, replica.into())?;
2226                    }
2227                    ObjectId::Database(id) => {
2228                        let mut database = state.get_database(id).clone();
2229                        if id.is_system() {
2230                            return Err(AdapterError::Catalog(Error::new(
2231                                ErrorKind::ReadOnlyDatabase(database.name),
2232                            )));
2233                        }
2234                        Self::update_privilege_owners(
2235                            &mut database.privileges,
2236                            database.owner_id,
2237                            new_owner,
2238                        );
2239                        database.owner_id = new_owner;
2240                        tx.update_database(*id, database.clone().into())?;
2241                    }
2242                    ObjectId::Schema((database_spec, schema_spec)) => {
2243                        let schema_id: SchemaId = schema_spec.clone().into();
2244                        let mut schema = state
2245                            .get_schema(database_spec, schema_spec, conn_id)
2246                            .clone();
2247                        if schema_id.is_system() {
2248                            let name = schema.name();
2249                            let full_name = state.resolve_full_schema_name(name);
2250                            return Err(AdapterError::Catalog(Error::new(
2251                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2252                            )));
2253                        }
2254                        Self::update_privilege_owners(
2255                            &mut schema.privileges,
2256                            schema.owner_id,
2257                            new_owner,
2258                        );
2259                        schema.owner_id = new_owner;
2260                        tx.update_schema(schema_id, schema.into())?;
2261                    }
2262                    ObjectId::Item(id) => {
2263                        let entry = state.get_entry(id);
2264                        let mut new_entry = entry.clone();
2265                        if id.is_system() {
2266                            let full_name = state.resolve_full_name(
2267                                new_entry.name(),
2268                                session.map(|session| session.conn_id()),
2269                            );
2270                            return Err(AdapterError::Catalog(Error::new(
2271                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2272                            )));
2273                        }
2274                        Self::update_privilege_owners(
2275                            &mut new_entry.privileges,
2276                            new_entry.owner_id,
2277                            new_owner,
2278                        );
2279                        new_entry.owner_id = new_owner;
2280                        if !new_entry.item().is_temporary() {
2281                            tx.update_item(*id, new_entry.into())?;
2282                        } else {
2283                            temporary_item_updates
2284                                .push((entry.clone().into(), StateDiff::Retraction));
2285                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2286                        }
2287                    }
2288                    ObjectId::NetworkPolicy(id) => {
2289                        let mut policy = state.get_network_policy(id).clone();
2290                        if id.is_system() {
2291                            return Err(AdapterError::Catalog(Error::new(
2292                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2293                            )));
2294                        }
2295                        Self::update_privilege_owners(
2296                            &mut policy.privileges,
2297                            policy.owner_id,
2298                            new_owner,
2299                        );
2300                        policy.owner_id = new_owner;
2301                        tx.update_network_policy(*id, policy.into())?;
2302                    }
2303                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2304                }
2305                let object_type = state.get_object_type(&id);
2306                CatalogState::add_to_audit_log(
2307                    &state.system_configuration,
2308                    oracle_write_ts,
2309                    session,
2310                    tx,
2311                    audit_events,
2312                    EventType::Alter,
2313                    object_type_to_audit_object_type(object_type),
2314                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2315                        object_id: id.to_string(),
2316                        old_owner_id: old_owner.to_string(),
2317                        new_owner_id: new_owner.to_string(),
2318                    }),
2319                )?;
2320            }
2321            Op::UpdateClusterConfig { id, name, config } => {
2322                let mut cluster = state.get_cluster(id).clone();
2323                cluster.config = config;
2324                tx.update_cluster(id, cluster.into())?;
2325                info!("update cluster {}", name);
2326
2327                CatalogState::add_to_audit_log(
2328                    &state.system_configuration,
2329                    oracle_write_ts,
2330                    session,
2331                    tx,
2332                    audit_events,
2333                    EventType::Alter,
2334                    ObjectType::Cluster,
2335                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2336                        id: id.to_string(),
2337                        name,
2338                    }),
2339                )?;
2340            }
2341            Op::UpdateClusterReplicaConfig {
2342                replica_id,
2343                cluster_id,
2344                config,
2345            } => {
2346                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2347                info!("update replica {}", replica.name);
2348                tx.update_cluster_replica(
2349                    replica_id,
2350                    mz_catalog::durable::ClusterReplica {
2351                        cluster_id,
2352                        replica_id,
2353                        name: replica.name.clone(),
2354                        config: config.clone().into(),
2355                        owner_id: replica.owner_id,
2356                    },
2357                )?;
2358            }
2359            Op::UpdateItem { id, name, to_item } => {
2360                let mut entry = state.get_entry(&id).clone();
2361                entry.name = name.clone();
2362                entry.item = to_item.clone();
2363                tx.update_item(id, entry.into())?;
2364
2365                if Self::should_audit_log_item(&to_item) {
2366                    let mut full_name = Self::full_name_detail(
2367                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2368                    );
2369                    full_name.item = name.item;
2370
2371                    CatalogState::add_to_audit_log(
2372                        &state.system_configuration,
2373                        oracle_write_ts,
2374                        session,
2375                        tx,
2376                        audit_events,
2377                        EventType::Alter,
2378                        catalog_type_to_audit_object_type(to_item.typ()),
2379                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2380                            id: id.to_string(),
2381                            name: full_name,
2382                        }),
2383                    )?;
2384                }
2385
2386                Self::log_update(state, &id);
2387            }
2388            Op::UpdateSystemConfiguration { name, value } => {
2389                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2390                tx.upsert_system_config(&name, parsed_value.clone())?;
2391                // This mirrors some "system vars" into the catalog storage
2392                // "config" collection so that we can toggle the flag with
2393                // Launch Darkly, but use it in boot before Launch Darkly is
2394                // available.
2395                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2396                    let with_0dt_deployment_max_wait =
2397                        Duration::parse(VarInput::Flat(&parsed_value))
2398                            .expect("parsing succeeded above");
2399                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2400                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2401                    let with_0dt_deployment_ddl_check_interval =
2402                        Duration::parse(VarInput::Flat(&parsed_value))
2403                            .expect("parsing succeeded above");
2404                    tx.set_0dt_deployment_ddl_check_interval(
2405                        with_0dt_deployment_ddl_check_interval,
2406                    )?;
2407                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2408                    let panic_after_timeout =
2409                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2410                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2411                }
2412
2413                CatalogState::add_to_audit_log(
2414                    &state.system_configuration,
2415                    oracle_write_ts,
2416                    session,
2417                    tx,
2418                    audit_events,
2419                    EventType::Alter,
2420                    ObjectType::System,
2421                    EventDetails::SetV1(mz_audit_log::SetV1 {
2422                        name,
2423                        value: Some(value.borrow().to_vec().join(", ")),
2424                    }),
2425                )?;
2426            }
2427            Op::ResetSystemConfiguration { name } => {
2428                tx.remove_system_config(&name);
2429                // This mirrors some "system vars" into the catalog storage
2430                // "config" collection so that we can toggle the flag with
2431                // Launch Darkly, but use it in boot before Launch Darkly is
2432                // available.
2433                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2434                    tx.reset_0dt_deployment_max_wait()?;
2435                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2436                    tx.reset_0dt_deployment_ddl_check_interval()?;
2437                }
2438
2439                CatalogState::add_to_audit_log(
2440                    &state.system_configuration,
2441                    oracle_write_ts,
2442                    session,
2443                    tx,
2444                    audit_events,
2445                    EventType::Alter,
2446                    ObjectType::System,
2447                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2448                )?;
2449            }
2450            Op::ResetAllSystemConfiguration => {
2451                tx.clear_system_configs();
2452                tx.reset_0dt_deployment_max_wait()?;
2453                tx.reset_0dt_deployment_ddl_check_interval()?;
2454
2455                CatalogState::add_to_audit_log(
2456                    &state.system_configuration,
2457                    oracle_write_ts,
2458                    session,
2459                    tx,
2460                    audit_events,
2461                    EventType::Alter,
2462                    ObjectType::System,
2463                    EventDetails::ResetAllV1,
2464                )?;
2465            }
2466            Op::WeirdStorageUsageUpdates {
2467                object_id,
2468                size_bytes,
2469                collection_timestamp,
2470            } => {
2471                let id = tx.allocate_storage_usage_ids()?;
2472                let metric =
2473                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2474                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2475                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2476                weird_builtin_table_update = Some(builtin_table_update);
2477            }
2478        };
2479        Ok((weird_builtin_table_update, temporary_item_updates))
2480    }
2481
2482    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2483        let entry = state.get_entry(id);
2484        info!(
2485            "update {} {} ({})",
2486            entry.item_type(),
2487            state.resolve_full_name(entry.name(), entry.conn_id()),
2488            id
2489        );
2490    }
2491
2492    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2493    /// implementation:
2494    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2495    fn update_privilege_owners(
2496        privileges: &mut PrivilegeMap,
2497        old_owner: RoleId,
2498        new_owner: RoleId,
2499    ) {
2500        // TODO(jkosh44) Would be nice not to clone every privilege.
2501        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2502
2503        let mut new_present = false;
2504        for privilege in flat_privileges.iter_mut() {
2505            // Old owner's granted privilege are updated to be granted by the new
2506            // owner.
2507            if privilege.grantor == old_owner {
2508                privilege.grantor = new_owner;
2509            } else if privilege.grantor == new_owner {
2510                new_present = true;
2511            }
2512            // Old owner's privileges is given to the new owner.
2513            if privilege.grantee == old_owner {
2514                privilege.grantee = new_owner;
2515            } else if privilege.grantee == new_owner {
2516                new_present = true;
2517            }
2518        }
2519
2520        // If the old privilege list contained references to the new owner, we may
2521        // have created duplicate entries. Here we try and consolidate them. This
2522        // is inspired by PostgreSQL's algorithm but not identical.
2523        if new_present {
2524            // Group privileges by (grantee, grantor).
2525            let privilege_map: BTreeMap<_, Vec<_>> =
2526                flat_privileges
2527                    .into_iter()
2528                    .fold(BTreeMap::new(), |mut accum, privilege| {
2529                        accum
2530                            .entry((privilege.grantee, privilege.grantor))
2531                            .or_default()
2532                            .push(privilege);
2533                        accum
2534                    });
2535
2536            // Consolidate and update all privileges.
2537            flat_privileges = privilege_map
2538                .into_iter()
2539                .map(|((grantee, grantor), values)|
2540                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2541                    values.into_iter().fold(
2542                        MzAclItem::empty(grantee, grantor),
2543                        |mut accum, mz_aclitem| {
2544                            accum.acl_mode =
2545                                accum.acl_mode.union(mz_aclitem.acl_mode);
2546                            accum
2547                        },
2548                    ))
2549                .collect();
2550        }
2551
2552        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2553    }
2554}
2555
2556/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2557///
2558/// Note: Previously we used to omit a single `Op::DropObject` for every object
2559/// we needed to drop. But removing a batch of objects from a durable Catalog
2560/// Transaction is O(n) where `n` is the number of objects that exist in the
2561/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2562/// `DROP ... CASCADE` statement.
2563#[derive(Debug, Default)]
2564pub(crate) struct ObjectsToDrop {
2565    pub comments: BTreeSet<CommentObjectId>,
2566    pub databases: BTreeSet<DatabaseId>,
2567    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2568    pub clusters: BTreeSet<ClusterId>,
2569    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2570    pub roles: BTreeSet<RoleId>,
2571    pub items: Vec<CatalogItemId>,
2572    pub network_policies: BTreeSet<NetworkPolicyId>,
2573}
2574
2575impl ObjectsToDrop {
2576    pub fn generate(
2577        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2578        state: &CatalogState,
2579        session: Option<&ConnMeta>,
2580    ) -> Result<Self, AdapterError> {
2581        let mut delta = ObjectsToDrop::default();
2582
2583        for drop_object_info in drop_object_infos {
2584            delta.add_item(drop_object_info, state, session)?;
2585        }
2586
2587        Ok(delta)
2588    }
2589
2590    fn add_item(
2591        &mut self,
2592        drop_object_info: DropObjectInfo,
2593        state: &CatalogState,
2594        session: Option<&ConnMeta>,
2595    ) -> Result<(), AdapterError> {
2596        self.comments
2597            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2598
2599        match drop_object_info {
2600            DropObjectInfo::Database(database_id) => {
2601                let database = &state.database_by_id[&database_id];
2602                if database_id.is_system() {
2603                    return Err(AdapterError::Catalog(Error::new(
2604                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2605                    )));
2606                }
2607
2608                self.databases.insert(database_id);
2609            }
2610            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2611                let schema = state.get_schema(
2612                    &database_spec,
2613                    &schema_spec,
2614                    session
2615                        .map(|session| session.conn_id())
2616                        .unwrap_or(&SYSTEM_CONN_ID),
2617                );
2618                let schema_id: SchemaId = schema_spec.into();
2619                if schema_id.is_system() {
2620                    let name = schema.name();
2621                    let full_name = state.resolve_full_schema_name(name);
2622                    return Err(AdapterError::Catalog(Error::new(
2623                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2624                    )));
2625                }
2626
2627                self.schemas.insert(schema_spec, database_spec);
2628            }
2629            DropObjectInfo::Role(role_id) => {
2630                let name = state.get_role(&role_id).name().to_string();
2631                if role_id.is_system() || role_id.is_predefined() {
2632                    return Err(AdapterError::Catalog(Error::new(
2633                        ErrorKind::ReservedRoleName(name.clone()),
2634                    )));
2635                }
2636                state.ensure_not_reserved_role(&role_id)?;
2637
2638                self.roles.insert(role_id);
2639            }
2640            DropObjectInfo::Cluster(cluster_id) => {
2641                let cluster = state.get_cluster(cluster_id);
2642                let name = &cluster.name;
2643                if cluster_id.is_system() {
2644                    return Err(AdapterError::Catalog(Error::new(
2645                        ErrorKind::ReadOnlyCluster(name.clone()),
2646                    )));
2647                }
2648
2649                self.clusters.insert(cluster_id);
2650            }
2651            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2652                let cluster = state.get_cluster(cluster_id);
2653                let replica = cluster.replica(replica_id).expect("Must exist");
2654
2655                self.replicas
2656                    .insert(replica.replica_id, (cluster.id, reason));
2657            }
2658            DropObjectInfo::Item(item_id) => {
2659                let entry = state.get_entry(&item_id);
2660                if item_id.is_system() {
2661                    let name = entry.name();
2662                    let full_name =
2663                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2664                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2665                        full_name.to_string(),
2666                    ))));
2667                }
2668
2669                self.items.push(item_id);
2670            }
2671            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2672                let policy = state.get_network_policy(&network_policy_id);
2673                let name = &policy.name;
2674                if network_policy_id.is_system() {
2675                    return Err(AdapterError::Catalog(Error::new(
2676                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2677                    )));
2678                }
2679
2680                self.network_policies.insert(network_policy_id);
2681            }
2682        }
2683
2684        Ok(())
2685    }
2686}
2687
2688#[cfg(test)]
2689mod tests {
2690    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2691    use mz_repr::role_id::RoleId;
2692
2693    use crate::catalog::Catalog;
2694
2695    #[mz_ore::test]
2696    fn test_update_privilege_owners() {
2697        let old_owner = RoleId::User(1);
2698        let new_owner = RoleId::User(2);
2699        let other_role = RoleId::User(3);
2700
2701        // older owner exists as grantor.
2702        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2703            MzAclItem {
2704                grantee: other_role,
2705                grantor: old_owner,
2706                acl_mode: AclMode::UPDATE,
2707            },
2708            MzAclItem {
2709                grantee: other_role,
2710                grantor: new_owner,
2711                acl_mode: AclMode::SELECT,
2712            },
2713        ]);
2714        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2715        assert_eq!(1, privileges.all_values().count());
2716        assert_eq!(
2717            vec![MzAclItem {
2718                grantee: other_role,
2719                grantor: new_owner,
2720                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2721            }],
2722            privileges.all_values_owned().collect::<Vec<_>>()
2723        );
2724
2725        // older owner exists as grantee.
2726        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2727            MzAclItem {
2728                grantee: old_owner,
2729                grantor: other_role,
2730                acl_mode: AclMode::UPDATE,
2731            },
2732            MzAclItem {
2733                grantee: new_owner,
2734                grantor: other_role,
2735                acl_mode: AclMode::SELECT,
2736            },
2737        ]);
2738        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2739        assert_eq!(1, privileges.all_values().count());
2740        assert_eq!(
2741            vec![MzAclItem {
2742                grantee: new_owner,
2743                grantor: other_role,
2744                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2745            }],
2746            privileges.all_values_owned().collect::<Vec<_>>()
2747        );
2748
2749        // older owner exists as grantee and grantor.
2750        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2751            MzAclItem {
2752                grantee: old_owner,
2753                grantor: old_owner,
2754                acl_mode: AclMode::UPDATE,
2755            },
2756            MzAclItem {
2757                grantee: new_owner,
2758                grantor: new_owner,
2759                acl_mode: AclMode::SELECT,
2760            },
2761        ]);
2762        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2763        assert_eq!(1, privileges.all_values().count());
2764        assert_eq!(
2765            vec![MzAclItem {
2766                grantee: new_owner,
2767                grantor: new_owner,
2768                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2769            }],
2770            privileges.all_values_owned().collect::<Vec<_>>()
2771        );
2772    }
2773}