mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34    StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50    RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::state::LocalExpressionCache;
67use crate::catalog::{
68    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
69    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
70    is_reserved_role_name, object_type_to_audit_object_type,
71    system_object_type_to_audit_object_type,
72};
73use crate::coord::ConnMeta;
74use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
75use crate::coord::cluster_scheduling::SchedulingDecision;
76use crate::util::ResultExt;
77
78#[derive(Debug, Clone)]
79pub enum Op {
80    AlterRetainHistory {
81        id: CatalogItemId,
82        value: Option<Value>,
83        window: CompactionWindow,
84    },
85    AlterRole {
86        id: RoleId,
87        name: String,
88        attributes: RoleAttributesRaw,
89        nopassword: bool,
90        vars: RoleVars,
91    },
92    AlterNetworkPolicy {
93        id: NetworkPolicyId,
94        rules: Vec<NetworkPolicyRule>,
95        name: String,
96        owner_id: RoleId,
97    },
98    AlterAddColumn {
99        id: CatalogItemId,
100        new_global_id: GlobalId,
101        name: ColumnName,
102        typ: SqlColumnType,
103        sql: RawDataType,
104    },
105    AlterMaterializedViewApplyReplacement {
106        id: CatalogItemId,
107        replacement_id: CatalogItemId,
108    },
109    CreateDatabase {
110        name: String,
111        owner_id: RoleId,
112    },
113    CreateSchema {
114        database_id: ResolvedDatabaseSpecifier,
115        schema_name: String,
116        owner_id: RoleId,
117    },
118    CreateRole {
119        name: String,
120        attributes: RoleAttributesRaw,
121    },
122    CreateCluster {
123        id: ClusterId,
124        name: String,
125        introspection_sources: Vec<&'static BuiltinLog>,
126        owner_id: RoleId,
127        config: ClusterConfig,
128    },
129    CreateClusterReplica {
130        cluster_id: ClusterId,
131        name: String,
132        config: ReplicaConfig,
133        owner_id: RoleId,
134        reason: ReplicaCreateDropReason,
135    },
136    CreateItem {
137        id: CatalogItemId,
138        name: QualifiedItemName,
139        item: CatalogItem,
140        owner_id: RoleId,
141    },
142    CreateNetworkPolicy {
143        rules: Vec<NetworkPolicyRule>,
144        name: String,
145        owner_id: RoleId,
146    },
147    Comment {
148        object_id: CommentObjectId,
149        sub_component: Option<usize>,
150        comment: Option<String>,
151    },
152    DropObjects(Vec<DropObjectInfo>),
153    GrantRole {
154        role_id: RoleId,
155        member_id: RoleId,
156        grantor_id: RoleId,
157    },
158    RenameCluster {
159        id: ClusterId,
160        name: String,
161        to_name: String,
162        check_reserved_names: bool,
163    },
164    RenameClusterReplica {
165        cluster_id: ClusterId,
166        replica_id: ReplicaId,
167        name: QualifiedReplica,
168        to_name: String,
169    },
170    RenameItem {
171        id: CatalogItemId,
172        current_full_name: FullItemName,
173        to_name: String,
174    },
175    RenameSchema {
176        database_spec: ResolvedDatabaseSpecifier,
177        schema_spec: SchemaSpecifier,
178        new_name: String,
179        check_reserved_names: bool,
180    },
181    UpdateOwner {
182        id: ObjectId,
183        new_owner: RoleId,
184    },
185    UpdatePrivilege {
186        target_id: SystemObjectId,
187        privilege: MzAclItem,
188        variant: UpdatePrivilegeVariant,
189    },
190    UpdateDefaultPrivilege {
191        privilege_object: DefaultPrivilegeObject,
192        privilege_acl_item: DefaultPrivilegeAclItem,
193        variant: UpdatePrivilegeVariant,
194    },
195    RevokeRole {
196        role_id: RoleId,
197        member_id: RoleId,
198        grantor_id: RoleId,
199    },
200    UpdateClusterConfig {
201        id: ClusterId,
202        name: String,
203        config: ClusterConfig,
204    },
205    UpdateClusterReplicaConfig {
206        cluster_id: ClusterId,
207        replica_id: ReplicaId,
208        config: ReplicaConfig,
209    },
210    UpdateItem {
211        id: CatalogItemId,
212        name: QualifiedItemName,
213        to_item: CatalogItem,
214    },
215    UpdateSourceReferences {
216        source_id: CatalogItemId,
217        references: SourceReferences,
218    },
219    UpdateSystemConfiguration {
220        name: String,
221        value: OwnedVarInput,
222    },
223    ResetSystemConfiguration {
224        name: String,
225    },
226    ResetAllSystemConfiguration,
227    /// Performs updates to the storage usage table, which probably should be a builtin source.
228    ///
229    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
230    /// might not work. If a process crashes after collecting storage usage events
231    /// but before updating the builtin table, then another listening catalog
232    /// will never know to update the builtin table.
233    WeirdStorageUsageUpdates {
234        object_id: Option<String>,
235        size_bytes: u64,
236        collection_timestamp: EpochMillis,
237    },
238    /// Performs a dry run of the commit, but errors with
239    /// [`AdapterError::TransactionDryRun`].
240    ///
241    /// When using this value, it should be included only as the last element of
242    /// the transaction and should not be the only value in the transaction.
243    TransactionDryRun,
244}
245
246/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
247/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
248/// the `Op::DropObjects`.
249#[derive(Debug, Clone)]
250pub enum DropObjectInfo {
251    Cluster(ClusterId),
252    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
253    Database(DatabaseId),
254    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
255    Role(RoleId),
256    Item(CatalogItemId),
257    NetworkPolicy(NetworkPolicyId),
258}
259
260impl DropObjectInfo {
261    /// Creates a `DropObjectInfo` from an `ObjectId`.
262    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
263    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
264        match id {
265            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
266            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
267                cluster_id,
268                replica_id,
269                ReplicaCreateDropReason::Manual,
270            )),
271            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
272            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
273            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
274            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
275            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
276        }
277    }
278
279    /// Creates an `ObjectId` from a `DropObjectInfo`.
280    /// Loses the `ReplicaCreateDropReason` if there is one!
281    fn to_object_id(&self) -> ObjectId {
282        match &self {
283            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
284            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
285                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
286            }
287            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
288            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
289            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
290            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
291            DropObjectInfo::NetworkPolicy(network_policy_id) => {
292                ObjectId::NetworkPolicy(network_policy_id.clone())
293            }
294        }
295    }
296}
297
298/// The reason for creating or dropping a replica.
299#[derive(Debug, Clone)]
300pub enum ReplicaCreateDropReason {
301    /// The user initiated the replica create or drop, e.g., by
302    /// - creating/dropping a cluster,
303    /// - ALTERing various options on a managed cluster,
304    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
305    Manual,
306    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
307    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
308    ClusterScheduling(Vec<SchedulingDecision>),
309}
310
311impl ReplicaCreateDropReason {
312    pub fn into_audit_log(
313        self,
314    ) -> (
315        CreateOrDropClusterReplicaReasonV1,
316        Option<SchedulingDecisionsWithReasonsV2>,
317    ) {
318        let (reason, scheduling_policies) = match self {
319            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
320            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
321                CreateOrDropClusterReplicaReasonV1::Schedule,
322                Some(scheduling_decisions),
323            ),
324        };
325        (
326            reason,
327            scheduling_policies
328                .as_ref()
329                .map(SchedulingDecision::reasons_to_audit_log_reasons),
330        )
331    }
332}
333
334pub struct TransactionResult {
335    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
336    /// Parsed catalog updates from which we will derive catalog implications.
337    pub catalog_updates: Vec<ParsedStateUpdate>,
338    pub audit_events: Vec<VersionedEvent>,
339}
340
341impl Catalog {
342    fn should_audit_log_item(item: &CatalogItem) -> bool {
343        !item.is_temporary()
344    }
345
346    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
347    /// within a connection id.
348    fn temporary_ids(
349        &self,
350        ops: &[Op],
351        temporary_drops: BTreeSet<(&ConnectionId, String)>,
352    ) -> Result<BTreeSet<CatalogItemId>, Error> {
353        let mut creating = BTreeSet::new();
354        let mut temporary_ids = BTreeSet::new();
355        for op in ops.iter() {
356            if let Op::CreateItem {
357                id,
358                name,
359                item,
360                owner_id: _,
361            } = op
362            {
363                if let Some(conn_id) = item.conn_id() {
364                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
365                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
366                        || creating.contains(&(conn_id, &name.item))
367                    {
368                        return Err(
369                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
370                        );
371                    } else {
372                        creating.insert((conn_id, &name.item));
373                        temporary_ids.insert(id.clone());
374                    }
375                }
376            }
377        }
378        Ok(temporary_ids)
379    }
380
381    #[instrument(name = "catalog::transact")]
382    pub async fn transact(
383        &mut self,
384        // n.b. this is an option to prevent us from needing to build out a
385        // dummy impl of `StorageController` for tests.
386        storage_collections: Option<
387            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
388        >,
389        oracle_write_ts: mz_repr::Timestamp,
390        session: Option<&ConnMeta>,
391        ops: Vec<Op>,
392    ) -> Result<TransactionResult, AdapterError> {
393        trace!("transact: {:?}", ops);
394        fail::fail_point!("catalog_transact", |arg| {
395            Err(AdapterError::Unstructured(anyhow::anyhow!(
396                "failpoint: {arg:?}"
397            )))
398        });
399
400        let drop_ids: BTreeSet<CatalogItemId> = ops
401            .iter()
402            .filter_map(|op| match op {
403                Op::DropObjects(drop_object_infos) => {
404                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
405                    let item_ids = ids.filter_map(|id| match id {
406                        ObjectId::Item(id) => Some(id),
407                        _ => None,
408                    });
409                    Some(item_ids)
410                }
411                _ => None,
412            })
413            .flatten()
414            .collect();
415        let temporary_drops = drop_ids
416            .iter()
417            .filter_map(|id| {
418                let entry = self.get_entry(id);
419                match entry.item().conn_id() {
420                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
421                    None => None,
422                }
423            })
424            .collect();
425        let dropped_global_ids = drop_ids
426            .iter()
427            .flat_map(|item_id| self.get_global_ids(item_id))
428            .collect();
429
430        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
431        let mut builtin_table_updates = vec![];
432        let mut catalog_updates = vec![];
433        let mut audit_events = vec![];
434        let mut storage = self.storage().await;
435        let mut tx = storage
436            .transaction()
437            .await
438            .unwrap_or_terminate("starting catalog transaction");
439
440        let new_state = Self::transact_inner(
441            storage_collections,
442            oracle_write_ts,
443            session,
444            ops,
445            temporary_ids,
446            &mut builtin_table_updates,
447            &mut catalog_updates,
448            &mut audit_events,
449            &mut tx,
450            &self.state,
451        )
452        .await?;
453
454        // The user closure was successful, apply the updates. Terminate the
455        // process if this fails, because we have to restart envd due to
456        // indeterminate catalog state, which we only reconcile during catalog
457        // init.
458        tx.commit(oracle_write_ts)
459            .await
460            .unwrap_or_terminate("catalog storage transaction commit must succeed");
461
462        // Dropping here keeps the mutable borrow on self, preventing us accidentally
463        // mutating anything until after f is executed.
464        drop(storage);
465        if let Some(new_state) = new_state {
466            self.transient_revision += 1;
467            self.state = new_state;
468        }
469
470        // Drop in-memory planning metadata.
471        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
472        if self.state.system_config().enable_mz_notices() {
473            // Generate retractions for the Builtin tables.
474            self.state().pack_optimizer_notices(
475                &mut builtin_table_updates,
476                dropped_notices.iter(),
477                Diff::MINUS_ONE,
478            );
479        }
480
481        Ok(TransactionResult {
482            builtin_table_updates,
483            catalog_updates,
484            audit_events,
485        })
486    }
487
488    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
489    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
490    ///
491    /// # Panics
492    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
493    ///   final element.
494    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
495    #[instrument(name = "catalog::transact_inner")]
496    async fn transact_inner(
497        storage_collections: Option<
498            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
499        >,
500        oracle_write_ts: mz_repr::Timestamp,
501        session: Option<&ConnMeta>,
502        mut ops: Vec<Op>,
503        temporary_ids: BTreeSet<CatalogItemId>,
504        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
505        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
506        audit_events: &mut Vec<VersionedEvent>,
507        tx: &mut Transaction<'_>,
508        state: &CatalogState,
509    ) -> Result<Option<CatalogState>, AdapterError> {
510        // We come up with new catalog state, builtin state updates, and parsed
511        // catalog updates (for deriving catalog implications) in two phases:
512        //
513        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
514        //    one-by-one. This will give us the full list of updates to apply to
515        //    the catalog, which will allow us to apply it in one batch, which
516        //    in turn will allow the apply machinery to consolidate the updates.
517        // 2. We do one final apply call with all updates, which gives us the
518        //    final builtin table updates and parsed catalog updates.
519        //
520        // The reason is that the loop that is working off ops first does a
521        // transact_op to derive the state updates for that op, and then calls
522        // apply_updates on the catalog state. And successive ops might expect
523        // the catalog state to reflect the modified state _after_ applying
524        // previous ops.
525        //
526        // We want to, however, have one final apply_state that takes all the
527        // accumulated updates to derive the required controller updates and the
528        // builtin table updates.
529        //
530        // We won't win any DDL throughput benchmarks, but so far that's not
531        // what we're optimizing for and there would probably be other
532        // bottlenecks before we hit this one as a bottleneck.
533        //
534        // We could work around this by refactoring how the interplay of
535        // transact_op and apply_updates works, but that's a larger undertaking.
536        let mut preliminary_state = Cow::Borrowed(state);
537
538        // The final state that we will return, if modified.
539        let mut state = Cow::Borrowed(state);
540
541        let dry_run_ops = match ops.last() {
542            Some(Op::TransactionDryRun) => {
543                // Remove dry run marker.
544                ops.pop();
545                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
546                ops.clone()
547            }
548            Some(_) => vec![],
549            None => return Ok(None),
550        };
551
552        let mut storage_collections_to_create = BTreeSet::new();
553        let mut storage_collections_to_drop = BTreeSet::new();
554        let mut storage_collections_to_register = BTreeMap::new();
555
556        let mut updates = Vec::new();
557
558        for op in ops {
559            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
560                oracle_write_ts,
561                session,
562                op,
563                &temporary_ids,
564                audit_events,
565                tx,
566                &*preliminary_state,
567                &mut storage_collections_to_create,
568                &mut storage_collections_to_drop,
569                &mut storage_collections_to_register,
570            )
571            .await?;
572
573            // Certain builtin tables are not derived from the durable catalog state, so they need
574            // to be updated ad-hoc based on the current transaction. This is weird and will not
575            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
576            // this instance crashes after committing the durable catalog but before applying the
577            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
578            // world. Currently, this works fine and there are no correctness issues because
579            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
580            // state of all builtin tables to the correct values.
581            if let Some(builtin_table_update) = weird_builtin_table_update {
582                builtin_table_updates.push(builtin_table_update);
583            }
584
585            // Temporary items are not stored in the durable catalog, so they need to be handled
586            // separately for updating state and builtin tables.
587            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
588            // in a multi-subscriber catalog world.
589            let upper = tx.upper();
590            let temporary_item_updates =
591                temporary_item_updates
592                    .into_iter()
593                    .map(|(item, diff)| StateUpdate {
594                        kind: StateUpdateKind::TemporaryItem(item),
595                        ts: upper,
596                        diff,
597                    });
598
599            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
600            op_updates.extend(temporary_item_updates);
601            if !op_updates.is_empty() {
602                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
603                    .to_mut()
604                    .apply_updates(op_updates.clone(), &mut LocalExpressionCache::Closed)
605                    .await;
606            }
607            updates.append(&mut op_updates);
608        }
609
610        if !updates.is_empty() {
611            let (op_builtin_table_updates, op_catalog_updates) = state
612                .to_mut()
613                .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
614                .await;
615            let op_builtin_table_updates = state
616                .to_mut()
617                .resolve_builtin_table_updates(op_builtin_table_updates);
618            builtin_table_updates.extend(op_builtin_table_updates);
619            parsed_catalog_updates.extend(op_catalog_updates);
620        }
621
622        if dry_run_ops.is_empty() {
623            // `storage_collections` should only be `None` for tests.
624            if let Some(c) = storage_collections {
625                c.prepare_state(
626                    tx,
627                    storage_collections_to_create,
628                    storage_collections_to_drop,
629                    storage_collections_to_register,
630                )
631                .await?;
632            }
633
634            let updates = tx.get_and_commit_op_updates();
635            if !updates.is_empty() {
636                let (op_builtin_table_updates, op_catalog_updates) = state
637                    .to_mut()
638                    .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed)
639                    .await;
640                let op_builtin_table_updates = state
641                    .to_mut()
642                    .resolve_builtin_table_updates(op_builtin_table_updates);
643                builtin_table_updates.extend(op_builtin_table_updates);
644                parsed_catalog_updates.extend(op_catalog_updates);
645            }
646
647            match state {
648                Cow::Owned(state) => Ok(Some(state)),
649                Cow::Borrowed(_) => Ok(None),
650            }
651        } else {
652            Err(AdapterError::TransactionDryRun {
653                new_ops: dry_run_ops,
654                new_state: state.into_owned(),
655            })
656        }
657    }
658
659    /// Performs the transaction operation described by `op`. This function prepares the changes in
660    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
661    /// changes.
662    ///
663    /// Optionally returns a builtin table update for any builtin table updates than cannot be
664    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
665    /// scenarios and ideally in the future don't exist.
666    #[instrument]
667    async fn transact_op(
668        oracle_write_ts: mz_repr::Timestamp,
669        session: Option<&ConnMeta>,
670        op: Op,
671        temporary_ids: &BTreeSet<CatalogItemId>,
672        audit_events: &mut Vec<VersionedEvent>,
673        tx: &mut Transaction<'_>,
674        state: &CatalogState,
675        storage_collections_to_create: &mut BTreeSet<GlobalId>,
676        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
677        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
678    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
679        let mut weird_builtin_table_update = None;
680        let mut temporary_item_updates = Vec::new();
681
682        match op {
683            Op::TransactionDryRun => {
684                unreachable!("TransactionDryRun can only be used a final element of ops")
685            }
686            Op::AlterRetainHistory { id, value, window } => {
687                let entry = state.get_entry(&id);
688                if id.is_system() {
689                    let name = entry.name();
690                    let full_name =
691                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
692                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
693                        full_name.to_string(),
694                    ))));
695                }
696
697                let mut new_entry = entry.clone();
698                let previous = new_entry
699                    .item
700                    .update_retain_history(value.clone(), window)
701                    .map_err(|_| {
702                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
703                            "planner should have rejected invalid alter retain history item type"
704                                .to_string(),
705                        )))
706                    })?;
707
708                if Self::should_audit_log_item(new_entry.item()) {
709                    let details =
710                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
711                            id: id.to_string(),
712                            old_history: previous.map(|previous| previous.to_string()),
713                            new_history: value.map(|v| v.to_string()),
714                        });
715                    CatalogState::add_to_audit_log(
716                        &state.system_configuration,
717                        oracle_write_ts,
718                        session,
719                        tx,
720                        audit_events,
721                        EventType::Alter,
722                        catalog_type_to_audit_object_type(new_entry.item().typ()),
723                        details,
724                    )?;
725                }
726
727                tx.update_item(id, new_entry.into())?;
728
729                Self::log_update(state, &id);
730            }
731            Op::AlterRole {
732                id,
733                name,
734                attributes,
735                nopassword,
736                vars,
737            } => {
738                state.ensure_not_reserved_role(&id)?;
739
740                let mut existing_role = state.get_role(&id).clone();
741                let password = attributes.password.clone();
742                let scram_iterations = attributes
743                    .scram_iterations
744                    .unwrap_or_else(|| state.system_config().scram_iterations());
745                existing_role.attributes = attributes.into();
746                existing_role.vars = vars;
747                let password_action = if nopassword {
748                    PasswordAction::Clear
749                } else if let Some(password) = password {
750                    PasswordAction::Set(PasswordConfig {
751                        password,
752                        scram_iterations,
753                    })
754                } else {
755                    PasswordAction::NoChange
756                };
757                tx.update_role(id, existing_role.into(), password_action)?;
758
759                CatalogState::add_to_audit_log(
760                    &state.system_configuration,
761                    oracle_write_ts,
762                    session,
763                    tx,
764                    audit_events,
765                    EventType::Alter,
766                    ObjectType::Role,
767                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
768                        id: id.to_string(),
769                        name: name.clone(),
770                    }),
771                )?;
772
773                info!("update role {name} ({id})");
774            }
775            Op::AlterNetworkPolicy {
776                id,
777                rules,
778                name,
779                owner_id: _owner_id,
780            } => {
781                let existing_policy = state.get_network_policy(&id).clone();
782                let mut policy: NetworkPolicy = existing_policy.into();
783                policy.rules = rules;
784                if is_reserved_name(&name) {
785                    return Err(AdapterError::Catalog(Error::new(
786                        ErrorKind::ReservedNetworkPolicyName(name),
787                    )));
788                }
789                tx.update_network_policy(id, policy.clone())?;
790
791                CatalogState::add_to_audit_log(
792                    &state.system_configuration,
793                    oracle_write_ts,
794                    session,
795                    tx,
796                    audit_events,
797                    EventType::Alter,
798                    ObjectType::NetworkPolicy,
799                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
800                        id: id.to_string(),
801                        name: name.clone(),
802                    }),
803                )?;
804
805                info!("update network policy {name} ({id})");
806            }
807            Op::AlterAddColumn {
808                id,
809                new_global_id,
810                name,
811                typ,
812                sql,
813            } => {
814                let mut new_entry = state.get_entry(&id).clone();
815                let version = new_entry.item.add_column(name, typ, sql)?;
816                // All versions of a table share the same shard, so it shouldn't matter what
817                // GlobalId we use here.
818                let shard_id = state
819                    .storage_metadata()
820                    .get_collection_shard(new_entry.latest_global_id())?;
821
822                // TODO(alter_table): Support adding columns to sources.
823                let CatalogItem::Table(table) = &mut new_entry.item else {
824                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
825                };
826                table.collections.insert(version, new_global_id);
827
828                tx.update_item(id, new_entry.into())?;
829                storage_collections_to_register.insert(new_global_id, shard_id);
830            }
831            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
832                let mut new_entry = state.get_entry(&id).clone();
833                let replacement = state.get_entry(&replacement_id);
834
835                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
836                    return Err(AdapterError::internal(
837                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
838                        "id must refer to a materialized view",
839                    ));
840                };
841                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
842                    return Err(AdapterError::internal(
843                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
844                        "replacement_id must refer to a materialized view",
845                    ));
846                };
847
848                mv.apply_replacement(replacement_mv.clone());
849
850                tx.remove_item(replacement_id)?;
851                tx.update_item(id, new_entry.into())?;
852
853                // TODO(alter-mv): audit logging
854            }
855            Op::CreateDatabase { name, owner_id } => {
856                let database_owner_privileges = vec![rbac::owner_privilege(
857                    mz_sql::catalog::ObjectType::Database,
858                    owner_id,
859                )];
860                let database_default_privileges = state
861                    .default_privileges
862                    .get_applicable_privileges(
863                        owner_id,
864                        None,
865                        None,
866                        mz_sql::catalog::ObjectType::Database,
867                    )
868                    .map(|item| item.mz_acl_item(owner_id));
869                let database_privileges: Vec<_> = merge_mz_acl_items(
870                    database_owner_privileges
871                        .into_iter()
872                        .chain(database_default_privileges),
873                )
874                .collect();
875
876                let schema_owner_privileges = vec![rbac::owner_privilege(
877                    mz_sql::catalog::ObjectType::Schema,
878                    owner_id,
879                )];
880                let schema_default_privileges = state
881                    .default_privileges
882                    .get_applicable_privileges(
883                        owner_id,
884                        None,
885                        None,
886                        mz_sql::catalog::ObjectType::Schema,
887                    )
888                    .map(|item| item.mz_acl_item(owner_id))
889                    // Special default privilege on public schemas.
890                    .chain(std::iter::once(MzAclItem {
891                        grantee: RoleId::Public,
892                        grantor: owner_id,
893                        acl_mode: AclMode::USAGE,
894                    }));
895                let schema_privileges: Vec<_> = merge_mz_acl_items(
896                    schema_owner_privileges
897                        .into_iter()
898                        .chain(schema_default_privileges),
899                )
900                .collect();
901
902                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
903                let (database_id, _) = tx.insert_user_database(
904                    &name,
905                    owner_id,
906                    database_privileges.clone(),
907                    &temporary_oids,
908                )?;
909                let (schema_id, _) = tx.insert_user_schema(
910                    database_id,
911                    DEFAULT_SCHEMA,
912                    owner_id,
913                    schema_privileges.clone(),
914                    &temporary_oids,
915                )?;
916                CatalogState::add_to_audit_log(
917                    &state.system_configuration,
918                    oracle_write_ts,
919                    session,
920                    tx,
921                    audit_events,
922                    EventType::Create,
923                    ObjectType::Database,
924                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
925                        id: database_id.to_string(),
926                        name: name.clone(),
927                    }),
928                )?;
929                info!("create database {}", name);
930
931                CatalogState::add_to_audit_log(
932                    &state.system_configuration,
933                    oracle_write_ts,
934                    session,
935                    tx,
936                    audit_events,
937                    EventType::Create,
938                    ObjectType::Schema,
939                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
940                        id: schema_id.to_string(),
941                        name: DEFAULT_SCHEMA.to_string(),
942                        database_name: Some(name),
943                    }),
944                )?;
945            }
946            Op::CreateSchema {
947                database_id,
948                schema_name,
949                owner_id,
950            } => {
951                if is_reserved_name(&schema_name) {
952                    return Err(AdapterError::Catalog(Error::new(
953                        ErrorKind::ReservedSchemaName(schema_name),
954                    )));
955                }
956                let database_id = match database_id {
957                    ResolvedDatabaseSpecifier::Id(id) => id,
958                    ResolvedDatabaseSpecifier::Ambient => {
959                        return Err(AdapterError::Catalog(Error::new(
960                            ErrorKind::ReadOnlySystemSchema(schema_name),
961                        )));
962                    }
963                };
964                let owner_privileges = vec![rbac::owner_privilege(
965                    mz_sql::catalog::ObjectType::Schema,
966                    owner_id,
967                )];
968                let default_privileges = state
969                    .default_privileges
970                    .get_applicable_privileges(
971                        owner_id,
972                        Some(database_id),
973                        None,
974                        mz_sql::catalog::ObjectType::Schema,
975                    )
976                    .map(|item| item.mz_acl_item(owner_id));
977                let privileges: Vec<_> =
978                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
979                        .collect();
980                let (schema_id, _) = tx.insert_user_schema(
981                    database_id,
982                    &schema_name,
983                    owner_id,
984                    privileges.clone(),
985                    &state.get_temporary_oids().collect(),
986                )?;
987                CatalogState::add_to_audit_log(
988                    &state.system_configuration,
989                    oracle_write_ts,
990                    session,
991                    tx,
992                    audit_events,
993                    EventType::Create,
994                    ObjectType::Schema,
995                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
996                        id: schema_id.to_string(),
997                        name: schema_name.clone(),
998                        database_name: Some(state.database_by_id[&database_id].name.clone()),
999                    }),
1000                )?;
1001            }
1002            Op::CreateRole { name, attributes } => {
1003                if is_reserved_role_name(&name) {
1004                    return Err(AdapterError::Catalog(Error::new(
1005                        ErrorKind::ReservedRoleName(name),
1006                    )));
1007                }
1008                let membership = RoleMembership::new();
1009                let vars = RoleVars::default();
1010                let (id, _) = tx.insert_user_role(
1011                    name.clone(),
1012                    attributes.clone(),
1013                    membership.clone(),
1014                    vars.clone(),
1015                    &state.get_temporary_oids().collect(),
1016                )?;
1017                CatalogState::add_to_audit_log(
1018                    &state.system_configuration,
1019                    oracle_write_ts,
1020                    session,
1021                    tx,
1022                    audit_events,
1023                    EventType::Create,
1024                    ObjectType::Role,
1025                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1026                        id: id.to_string(),
1027                        name: name.clone(),
1028                    }),
1029                )?;
1030                info!("create role {}", name);
1031            }
1032            Op::CreateCluster {
1033                id,
1034                name,
1035                introspection_sources,
1036                owner_id,
1037                config,
1038            } => {
1039                if is_reserved_name(&name) {
1040                    return Err(AdapterError::Catalog(Error::new(
1041                        ErrorKind::ReservedClusterName(name),
1042                    )));
1043                }
1044                let owner_privileges = vec![rbac::owner_privilege(
1045                    mz_sql::catalog::ObjectType::Cluster,
1046                    owner_id,
1047                )];
1048                let default_privileges = state
1049                    .default_privileges
1050                    .get_applicable_privileges(
1051                        owner_id,
1052                        None,
1053                        None,
1054                        mz_sql::catalog::ObjectType::Cluster,
1055                    )
1056                    .map(|item| item.mz_acl_item(owner_id));
1057                let privileges: Vec<_> =
1058                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1059                        .collect();
1060                let introspection_source_ids: Vec<_> = introspection_sources
1061                    .iter()
1062                    .map(|introspection_source| {
1063                        Transaction::allocate_introspection_source_index_id(
1064                            &id,
1065                            introspection_source.variant,
1066                        )
1067                    })
1068                    .collect();
1069
1070                let introspection_sources = introspection_sources
1071                    .into_iter()
1072                    .zip_eq(introspection_source_ids)
1073                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1074                    .collect();
1075
1076                tx.insert_user_cluster(
1077                    id,
1078                    &name,
1079                    introspection_sources,
1080                    owner_id,
1081                    privileges.clone(),
1082                    config.clone().into(),
1083                    &state.get_temporary_oids().collect(),
1084                )?;
1085                CatalogState::add_to_audit_log(
1086                    &state.system_configuration,
1087                    oracle_write_ts,
1088                    session,
1089                    tx,
1090                    audit_events,
1091                    EventType::Create,
1092                    ObjectType::Cluster,
1093                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1094                        id: id.to_string(),
1095                        name: name.clone(),
1096                    }),
1097                )?;
1098                info!("create cluster {}", name);
1099            }
1100            Op::CreateClusterReplica {
1101                cluster_id,
1102                name,
1103                config,
1104                owner_id,
1105                reason,
1106            } => {
1107                if is_reserved_name(&name) {
1108                    return Err(AdapterError::Catalog(Error::new(
1109                        ErrorKind::ReservedReplicaName(name),
1110                    )));
1111                }
1112                let cluster = state.get_cluster(cluster_id);
1113                let id =
1114                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1115                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1116                    size,
1117                    billed_as,
1118                    internal,
1119                    ..
1120                }) = &config.location
1121                {
1122                    let (reason, scheduling_policies) = reason.into_audit_log();
1123                    let details = EventDetails::CreateClusterReplicaV4(
1124                        mz_audit_log::CreateClusterReplicaV4 {
1125                            cluster_id: cluster_id.to_string(),
1126                            cluster_name: cluster.name.clone(),
1127                            replica_id: Some(id.to_string()),
1128                            replica_name: name.clone(),
1129                            logical_size: size.clone(),
1130                            billed_as: billed_as.clone(),
1131                            internal: *internal,
1132                            reason,
1133                            scheduling_policies,
1134                        },
1135                    );
1136                    CatalogState::add_to_audit_log(
1137                        &state.system_configuration,
1138                        oracle_write_ts,
1139                        session,
1140                        tx,
1141                        audit_events,
1142                        EventType::Create,
1143                        ObjectType::ClusterReplica,
1144                        details,
1145                    )?;
1146                }
1147            }
1148            Op::CreateItem {
1149                id,
1150                name,
1151                item,
1152                owner_id,
1153            } => {
1154                state.check_unstable_dependencies(&item)?;
1155
1156                match &item {
1157                    CatalogItem::Table(table) => {
1158                        let gids: Vec<_> = table.global_ids().collect();
1159                        assert_eq!(gids.len(), 1);
1160                        storage_collections_to_create.extend(gids);
1161                    }
1162                    CatalogItem::Source(source) => {
1163                        storage_collections_to_create.insert(source.global_id());
1164                    }
1165                    CatalogItem::MaterializedView(mv) => {
1166                        let mv_gid = mv.global_id_writes();
1167                        if let Some(target_id) = mv.replacement_target {
1168                            let target_gid = state.get_entry(&target_id).latest_global_id();
1169                            let shard_id =
1170                                state.storage_metadata().get_collection_shard(target_gid)?;
1171                            storage_collections_to_register.insert(mv_gid, shard_id);
1172                        } else {
1173                            storage_collections_to_create.insert(mv_gid);
1174                        }
1175                    }
1176                    CatalogItem::ContinualTask(ct) => {
1177                        storage_collections_to_create.insert(ct.global_id());
1178                    }
1179                    CatalogItem::Sink(sink) => {
1180                        storage_collections_to_create.insert(sink.global_id());
1181                    }
1182                    CatalogItem::Log(_)
1183                    | CatalogItem::View(_)
1184                    | CatalogItem::Index(_)
1185                    | CatalogItem::Type(_)
1186                    | CatalogItem::Func(_)
1187                    | CatalogItem::Secret(_)
1188                    | CatalogItem::Connection(_) => (),
1189                }
1190
1191                let system_user = session.map_or(false, |s| s.user().is_system_user());
1192                if !system_user {
1193                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1194                        let cluster_name = state.clusters_by_id[&id].name.clone();
1195                        return Err(AdapterError::Catalog(Error::new(
1196                            ErrorKind::ReadOnlyCluster(cluster_name),
1197                        )));
1198                    }
1199                }
1200
1201                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1202                let default_privileges = state
1203                    .default_privileges
1204                    .get_applicable_privileges(
1205                        owner_id,
1206                        name.qualifiers.database_spec.id(),
1207                        Some(name.qualifiers.schema_spec.into()),
1208                        item.typ().into(),
1209                    )
1210                    .map(|item| item.mz_acl_item(owner_id));
1211                // mz_support can read all progress sources.
1212                let progress_source_privilege = if item.is_progress_source() {
1213                    Some(MzAclItem {
1214                        grantee: MZ_SUPPORT_ROLE_ID,
1215                        grantor: owner_id,
1216                        acl_mode: AclMode::SELECT,
1217                    })
1218                } else {
1219                    None
1220                };
1221                let privileges: Vec<_> = merge_mz_acl_items(
1222                    owner_privileges
1223                        .into_iter()
1224                        .chain(default_privileges)
1225                        .chain(progress_source_privilege),
1226                )
1227                .collect();
1228
1229                let temporary_oids = state.get_temporary_oids().collect();
1230
1231                if item.is_temporary() {
1232                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1233                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1234                    {
1235                        return Err(AdapterError::Catalog(Error::new(
1236                            ErrorKind::InvalidTemporarySchema,
1237                        )));
1238                    }
1239                    let oid = tx.allocate_oid(&temporary_oids)?;
1240
1241                    let schema_id = name.qualifiers.schema_spec.clone().into();
1242                    let item_type = item.typ();
1243                    let (create_sql, global_id, versions) = item.to_serialized();
1244
1245                    let item = TemporaryItem {
1246                        id,
1247                        oid,
1248                        global_id,
1249                        schema_id,
1250                        name: name.item.clone(),
1251                        create_sql,
1252                        conn_id: item.conn_id().cloned(),
1253                        owner_id,
1254                        privileges: privileges.clone(),
1255                        extra_versions: versions,
1256                    };
1257                    temporary_item_updates.push((item, StateDiff::Addition));
1258
1259                    info!(
1260                        "create temporary {} {} ({})",
1261                        item_type,
1262                        state.resolve_full_name(&name, None),
1263                        id
1264                    );
1265                } else {
1266                    if let Some(temp_id) =
1267                        item.uses()
1268                            .iter()
1269                            .find(|id| match state.try_get_entry(*id) {
1270                                Some(entry) => entry.item().is_temporary(),
1271                                None => temporary_ids.contains(id),
1272                            })
1273                    {
1274                        let temp_item = state.get_entry(temp_id);
1275                        return Err(AdapterError::Catalog(Error::new(
1276                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1277                        )));
1278                    }
1279                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1280                        && !system_user
1281                    {
1282                        let schema_name = state
1283                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1284                            .schema;
1285                        return Err(AdapterError::Catalog(Error::new(
1286                            ErrorKind::ReadOnlySystemSchema(schema_name),
1287                        )));
1288                    }
1289                    let schema_id = name.qualifiers.schema_spec.clone().into();
1290                    let item_type = item.typ();
1291                    let (create_sql, global_id, versions) = item.to_serialized();
1292                    tx.insert_user_item(
1293                        id,
1294                        global_id,
1295                        schema_id,
1296                        &name.item,
1297                        create_sql,
1298                        owner_id,
1299                        privileges.clone(),
1300                        &temporary_oids,
1301                        versions,
1302                    )?;
1303                    info!(
1304                        "create {} {} ({})",
1305                        item_type,
1306                        state.resolve_full_name(&name, None),
1307                        id
1308                    );
1309                }
1310
1311                if Self::should_audit_log_item(&item) {
1312                    let name = Self::full_name_detail(
1313                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1314                    );
1315                    let details = match &item {
1316                        CatalogItem::Source(s) => {
1317                            let cluster_id = match s.data_source {
1318                                // Ingestion exports don't have their own cluster, but
1319                                // run on their ingestion's cluster.
1320                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1321                                    match state.get_entry(&ingestion_id).cluster_id() {
1322                                        Some(cluster_id) => Some(cluster_id.to_string()),
1323                                        None => None,
1324                                    }
1325                                }
1326                                _ => match item.cluster_id() {
1327                                    Some(cluster_id) => Some(cluster_id.to_string()),
1328                                    None => None,
1329                                },
1330                            };
1331
1332                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1333                                id: id.to_string(),
1334                                cluster_id,
1335                                name,
1336                                external_type: s.source_type().to_string(),
1337                            })
1338                        }
1339                        CatalogItem::Sink(s) => {
1340                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1341                                id: id.to_string(),
1342                                cluster_id: Some(s.cluster_id.to_string()),
1343                                name,
1344                                external_type: s.sink_type().to_string(),
1345                            })
1346                        }
1347                        CatalogItem::Index(i) => {
1348                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1349                                id: id.to_string(),
1350                                name,
1351                                cluster_id: i.cluster_id.to_string(),
1352                            })
1353                        }
1354                        CatalogItem::MaterializedView(mv) => {
1355                            EventDetails::CreateMaterializedViewV1(
1356                                mz_audit_log::CreateMaterializedViewV1 {
1357                                    id: id.to_string(),
1358                                    name,
1359                                    cluster_id: mv.cluster_id.to_string(),
1360                                },
1361                            )
1362                        }
1363                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1364                            id: id.to_string(),
1365                            name,
1366                        }),
1367                    };
1368                    CatalogState::add_to_audit_log(
1369                        &state.system_configuration,
1370                        oracle_write_ts,
1371                        session,
1372                        tx,
1373                        audit_events,
1374                        EventType::Create,
1375                        catalog_type_to_audit_object_type(item.typ()),
1376                        details,
1377                    )?;
1378                }
1379            }
1380            Op::CreateNetworkPolicy {
1381                rules,
1382                name,
1383                owner_id,
1384            } => {
1385                if state.network_policies_by_name.contains_key(&name) {
1386                    return Err(AdapterError::PlanError(PlanError::Catalog(
1387                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1388                    )));
1389                }
1390                if is_reserved_name(&name) {
1391                    return Err(AdapterError::Catalog(Error::new(
1392                        ErrorKind::ReservedNetworkPolicyName(name),
1393                    )));
1394                }
1395
1396                let owner_privileges = vec![rbac::owner_privilege(
1397                    mz_sql::catalog::ObjectType::NetworkPolicy,
1398                    owner_id,
1399                )];
1400                let default_privileges = state
1401                    .default_privileges
1402                    .get_applicable_privileges(
1403                        owner_id,
1404                        None,
1405                        None,
1406                        mz_sql::catalog::ObjectType::NetworkPolicy,
1407                    )
1408                    .map(|item| item.mz_acl_item(owner_id));
1409                let privileges: Vec<_> =
1410                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1411                        .collect();
1412
1413                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1414                let id = tx.insert_user_network_policy(
1415                    name.clone(),
1416                    rules,
1417                    privileges,
1418                    owner_id,
1419                    &temporary_oids,
1420                )?;
1421
1422                CatalogState::add_to_audit_log(
1423                    &state.system_configuration,
1424                    oracle_write_ts,
1425                    session,
1426                    tx,
1427                    audit_events,
1428                    EventType::Create,
1429                    ObjectType::NetworkPolicy,
1430                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1431                        id: id.to_string(),
1432                        name: name.clone(),
1433                    }),
1434                )?;
1435
1436                info!("created network policy {name} ({id})");
1437            }
1438            Op::Comment {
1439                object_id,
1440                sub_component,
1441                comment,
1442            } => {
1443                tx.update_comment(object_id, sub_component, comment)?;
1444                let entry = state.get_comment_id_entry(&object_id);
1445                let should_log = entry
1446                    .map(|entry| Self::should_audit_log_item(entry.item()))
1447                    // Things that aren't catalog entries can't be temp, so should be logged.
1448                    .unwrap_or(true);
1449                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1450                // comments won't be logged for now.
1451                if let (Some(conn_id), true) =
1452                    (session.map(|session| session.conn_id()), should_log)
1453                {
1454                    CatalogState::add_to_audit_log(
1455                        &state.system_configuration,
1456                        oracle_write_ts,
1457                        session,
1458                        tx,
1459                        audit_events,
1460                        EventType::Comment,
1461                        comment_id_to_audit_object_type(object_id),
1462                        EventDetails::IdNameV1(IdNameV1 {
1463                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1464                            id: format!("{object_id:?}"),
1465                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1466                        }),
1467                    )?;
1468                }
1469            }
1470            Op::UpdateSourceReferences {
1471                source_id,
1472                references,
1473            } => {
1474                tx.update_source_references(
1475                    source_id,
1476                    references
1477                        .references
1478                        .into_iter()
1479                        .map(|reference| reference.into())
1480                        .collect(),
1481                    references.updated_at,
1482                )?;
1483            }
1484            Op::DropObjects(drop_object_infos) => {
1485                // Generate all of the objects that need to get dropped.
1486                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1487
1488                // Drop any associated comments.
1489                tx.drop_comments(&delta.comments)?;
1490
1491                // Drop any items.
1492                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1493                    delta
1494                        .items
1495                        .iter()
1496                        .map(|id| id)
1497                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1498                tx.remove_items(&durable_items_to_drop)?;
1499                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1500                    let entry = state.get_entry(&id);
1501                    (entry.clone().into(), StateDiff::Retraction)
1502                }));
1503
1504                for item_id in delta.items {
1505                    let entry = state.get_entry(&item_id);
1506
1507                    if entry.item().is_storage_collection() {
1508                        storage_collections_to_drop.extend(entry.global_ids());
1509                    }
1510
1511                    if state.source_references.contains_key(&item_id) {
1512                        tx.remove_source_references(item_id)?;
1513                    }
1514
1515                    if Self::should_audit_log_item(entry.item()) {
1516                        CatalogState::add_to_audit_log(
1517                            &state.system_configuration,
1518                            oracle_write_ts,
1519                            session,
1520                            tx,
1521                            audit_events,
1522                            EventType::Drop,
1523                            catalog_type_to_audit_object_type(entry.item().typ()),
1524                            EventDetails::IdFullNameV1(IdFullNameV1 {
1525                                id: item_id.to_string(),
1526                                name: Self::full_name_detail(&state.resolve_full_name(
1527                                    entry.name(),
1528                                    session.map(|session| session.conn_id()),
1529                                )),
1530                            }),
1531                        )?;
1532                    }
1533                    info!(
1534                        "drop {} {} ({})",
1535                        entry.item_type(),
1536                        state.resolve_full_name(entry.name(), entry.conn_id()),
1537                        item_id
1538                    );
1539                }
1540
1541                // Drop any schemas.
1542                let schemas = delta
1543                    .schemas
1544                    .iter()
1545                    .map(|(schema_spec, database_spec)| {
1546                        (SchemaId::from(schema_spec), *database_spec)
1547                    })
1548                    .collect();
1549                tx.remove_schemas(&schemas)?;
1550
1551                for (schema_spec, database_spec) in delta.schemas {
1552                    let schema = state.get_schema(
1553                        &database_spec,
1554                        &schema_spec,
1555                        session
1556                            .map(|session| session.conn_id())
1557                            .unwrap_or(&SYSTEM_CONN_ID),
1558                    );
1559
1560                    let schema_id = SchemaId::from(schema_spec);
1561                    let database_id = match database_spec {
1562                        ResolvedDatabaseSpecifier::Ambient => None,
1563                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1564                    };
1565
1566                    CatalogState::add_to_audit_log(
1567                        &state.system_configuration,
1568                        oracle_write_ts,
1569                        session,
1570                        tx,
1571                        audit_events,
1572                        EventType::Drop,
1573                        ObjectType::Schema,
1574                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1575                            id: schema_id.to_string(),
1576                            name: schema.name.schema.to_string(),
1577                            database_name: database_id
1578                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1579                        }),
1580                    )?;
1581                }
1582
1583                // Drop any databases.
1584                tx.remove_databases(&delta.databases)?;
1585
1586                for database_id in delta.databases {
1587                    let database = state.get_database(&database_id).clone();
1588
1589                    CatalogState::add_to_audit_log(
1590                        &state.system_configuration,
1591                        oracle_write_ts,
1592                        session,
1593                        tx,
1594                        audit_events,
1595                        EventType::Drop,
1596                        ObjectType::Database,
1597                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1598                            id: database_id.to_string(),
1599                            name: database.name.clone(),
1600                        }),
1601                    )?;
1602                }
1603
1604                // Drop any roles.
1605                tx.remove_user_roles(&delta.roles)?;
1606
1607                for role_id in delta.roles {
1608                    let role = state
1609                        .roles_by_id
1610                        .get(&role_id)
1611                        .expect("catalog out of sync");
1612
1613                    CatalogState::add_to_audit_log(
1614                        &state.system_configuration,
1615                        oracle_write_ts,
1616                        session,
1617                        tx,
1618                        audit_events,
1619                        EventType::Drop,
1620                        ObjectType::Role,
1621                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1622                            id: role.id.to_string(),
1623                            name: role.name.clone(),
1624                        }),
1625                    )?;
1626                    info!("drop role {}", role.name());
1627                }
1628
1629                // Drop any network policies.
1630                tx.remove_network_policies(&delta.network_policies)?;
1631
1632                for network_policy_id in delta.network_policies {
1633                    let policy = state
1634                        .network_policies_by_id
1635                        .get(&network_policy_id)
1636                        .expect("catalog out of sync");
1637
1638                    CatalogState::add_to_audit_log(
1639                        &state.system_configuration,
1640                        oracle_write_ts,
1641                        session,
1642                        tx,
1643                        audit_events,
1644                        EventType::Drop,
1645                        ObjectType::NetworkPolicy,
1646                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1647                            id: policy.id.to_string(),
1648                            name: policy.name.clone(),
1649                        }),
1650                    )?;
1651                    info!("drop network policy {}", policy.name.clone());
1652                }
1653
1654                // Drop any replicas.
1655                let replicas = delta.replicas.keys().copied().collect();
1656                tx.remove_cluster_replicas(&replicas)?;
1657
1658                for (replica_id, (cluster_id, reason)) in delta.replicas {
1659                    let cluster = state.get_cluster(cluster_id);
1660                    let replica = cluster.replica(replica_id).expect("Must exist");
1661
1662                    let (reason, scheduling_policies) = reason.into_audit_log();
1663                    let details =
1664                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1665                            cluster_id: cluster_id.to_string(),
1666                            cluster_name: cluster.name.clone(),
1667                            replica_id: Some(replica_id.to_string()),
1668                            replica_name: replica.name.clone(),
1669                            reason,
1670                            scheduling_policies,
1671                        });
1672                    CatalogState::add_to_audit_log(
1673                        &state.system_configuration,
1674                        oracle_write_ts,
1675                        session,
1676                        tx,
1677                        audit_events,
1678                        EventType::Drop,
1679                        ObjectType::ClusterReplica,
1680                        details,
1681                    )?;
1682                }
1683
1684                // Drop any clusters.
1685                tx.remove_clusters(&delta.clusters)?;
1686
1687                for cluster_id in delta.clusters {
1688                    let cluster = state.get_cluster(cluster_id);
1689
1690                    CatalogState::add_to_audit_log(
1691                        &state.system_configuration,
1692                        oracle_write_ts,
1693                        session,
1694                        tx,
1695                        audit_events,
1696                        EventType::Drop,
1697                        ObjectType::Cluster,
1698                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1699                            id: cluster.id.to_string(),
1700                            name: cluster.name.clone(),
1701                        }),
1702                    )?;
1703                }
1704            }
1705            Op::GrantRole {
1706                role_id,
1707                member_id,
1708                grantor_id,
1709            } => {
1710                state.ensure_not_reserved_role(&member_id)?;
1711                state.ensure_grantable_role(&role_id)?;
1712                if state.collect_role_membership(&role_id).contains(&member_id) {
1713                    let group_role = state.get_role(&role_id);
1714                    let member_role = state.get_role(&member_id);
1715                    return Err(AdapterError::Catalog(Error::new(
1716                        ErrorKind::CircularRoleMembership {
1717                            role_name: group_role.name().to_string(),
1718                            member_name: member_role.name().to_string(),
1719                        },
1720                    )));
1721                }
1722                let mut member_role = state.get_role(&member_id).clone();
1723                member_role.membership.map.insert(role_id, grantor_id);
1724                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1725
1726                CatalogState::add_to_audit_log(
1727                    &state.system_configuration,
1728                    oracle_write_ts,
1729                    session,
1730                    tx,
1731                    audit_events,
1732                    EventType::Grant,
1733                    ObjectType::Role,
1734                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1735                        role_id: role_id.to_string(),
1736                        member_id: member_id.to_string(),
1737                        grantor_id: grantor_id.to_string(),
1738                        executed_by: session
1739                            .map(|session| session.authenticated_role_id())
1740                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1741                            .to_string(),
1742                    }),
1743                )?;
1744            }
1745            Op::RevokeRole {
1746                role_id,
1747                member_id,
1748                grantor_id,
1749            } => {
1750                state.ensure_not_reserved_role(&member_id)?;
1751                state.ensure_grantable_role(&role_id)?;
1752                let mut member_role = state.get_role(&member_id).clone();
1753                member_role.membership.map.remove(&role_id);
1754                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1755
1756                CatalogState::add_to_audit_log(
1757                    &state.system_configuration,
1758                    oracle_write_ts,
1759                    session,
1760                    tx,
1761                    audit_events,
1762                    EventType::Revoke,
1763                    ObjectType::Role,
1764                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1765                        role_id: role_id.to_string(),
1766                        member_id: member_id.to_string(),
1767                        grantor_id: grantor_id.to_string(),
1768                        executed_by: session
1769                            .map(|session| session.authenticated_role_id())
1770                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1771                            .to_string(),
1772                    }),
1773                )?;
1774            }
1775            Op::UpdatePrivilege {
1776                target_id,
1777                privilege,
1778                variant,
1779            } => {
1780                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1781                    UpdatePrivilegeVariant::Grant => {
1782                        privileges.grant(privilege);
1783                    }
1784                    UpdatePrivilegeVariant::Revoke => {
1785                        privileges.revoke(&privilege);
1786                    }
1787                };
1788                match &target_id {
1789                    SystemObjectId::Object(object_id) => match object_id {
1790                        ObjectId::Cluster(id) => {
1791                            let mut cluster = state.get_cluster(*id).clone();
1792                            update_privilege_fn(&mut cluster.privileges);
1793                            tx.update_cluster(*id, cluster.into())?;
1794                        }
1795                        ObjectId::Database(id) => {
1796                            let mut database = state.get_database(id).clone();
1797                            update_privilege_fn(&mut database.privileges);
1798                            tx.update_database(*id, database.into())?;
1799                        }
1800                        ObjectId::NetworkPolicy(id) => {
1801                            let mut policy = state.get_network_policy(id).clone();
1802                            update_privilege_fn(&mut policy.privileges);
1803                            tx.update_network_policy(*id, policy.into())?;
1804                        }
1805                        ObjectId::Schema((database_spec, schema_spec)) => {
1806                            let schema_id = schema_spec.clone().into();
1807                            let mut schema = state
1808                                .get_schema(
1809                                    database_spec,
1810                                    schema_spec,
1811                                    session
1812                                        .map(|session| session.conn_id())
1813                                        .unwrap_or(&SYSTEM_CONN_ID),
1814                                )
1815                                .clone();
1816                            update_privilege_fn(&mut schema.privileges);
1817                            tx.update_schema(schema_id, schema.into())?;
1818                        }
1819                        ObjectId::Item(id) => {
1820                            let entry = state.get_entry(id);
1821                            let mut new_entry = entry.clone();
1822                            update_privilege_fn(&mut new_entry.privileges);
1823                            if !new_entry.item().is_temporary() {
1824                                tx.update_item(*id, new_entry.into())?;
1825                            } else {
1826                                temporary_item_updates
1827                                    .push((entry.clone().into(), StateDiff::Retraction));
1828                                temporary_item_updates
1829                                    .push((new_entry.into(), StateDiff::Addition));
1830                            }
1831                        }
1832                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1833                    },
1834                    SystemObjectId::System => {
1835                        let mut system_privileges = state.system_privileges.clone();
1836                        update_privilege_fn(&mut system_privileges);
1837                        let new_privilege =
1838                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1839                        tx.set_system_privilege(
1840                            privilege.grantee,
1841                            privilege.grantor,
1842                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1843                        )?;
1844                    }
1845                }
1846                let object_type = state.get_system_object_type(&target_id);
1847                let object_id_str = match &target_id {
1848                    SystemObjectId::System => "SYSTEM".to_string(),
1849                    SystemObjectId::Object(id) => id.to_string(),
1850                };
1851                CatalogState::add_to_audit_log(
1852                    &state.system_configuration,
1853                    oracle_write_ts,
1854                    session,
1855                    tx,
1856                    audit_events,
1857                    variant.into(),
1858                    system_object_type_to_audit_object_type(&object_type),
1859                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1860                        object_id: object_id_str,
1861                        grantee_id: privilege.grantee.to_string(),
1862                        grantor_id: privilege.grantor.to_string(),
1863                        privileges: privilege.acl_mode.to_string(),
1864                    }),
1865                )?;
1866            }
1867            Op::UpdateDefaultPrivilege {
1868                privilege_object,
1869                privilege_acl_item,
1870                variant,
1871            } => {
1872                let mut default_privileges = state.default_privileges.clone();
1873                match variant {
1874                    UpdatePrivilegeVariant::Grant => default_privileges
1875                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1876                    UpdatePrivilegeVariant::Revoke => {
1877                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1878                    }
1879                }
1880                let new_acl_mode = default_privileges
1881                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1882                tx.set_default_privilege(
1883                    privilege_object.role_id,
1884                    privilege_object.database_id,
1885                    privilege_object.schema_id,
1886                    privilege_object.object_type,
1887                    privilege_acl_item.grantee,
1888                    new_acl_mode.cloned(),
1889                )?;
1890                CatalogState::add_to_audit_log(
1891                    &state.system_configuration,
1892                    oracle_write_ts,
1893                    session,
1894                    tx,
1895                    audit_events,
1896                    variant.into(),
1897                    object_type_to_audit_object_type(privilege_object.object_type),
1898                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1899                        role_id: privilege_object.role_id.to_string(),
1900                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1901                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1902                        grantee_id: privilege_acl_item.grantee.to_string(),
1903                        privileges: privilege_acl_item.acl_mode.to_string(),
1904                    }),
1905                )?;
1906            }
1907            Op::RenameCluster {
1908                id,
1909                name,
1910                to_name,
1911                check_reserved_names,
1912            } => {
1913                if id.is_system() {
1914                    return Err(AdapterError::Catalog(Error::new(
1915                        ErrorKind::ReadOnlyCluster(name.clone()),
1916                    )));
1917                }
1918                if check_reserved_names && is_reserved_name(&to_name) {
1919                    return Err(AdapterError::Catalog(Error::new(
1920                        ErrorKind::ReservedClusterName(to_name),
1921                    )));
1922                }
1923                tx.rename_cluster(id, &name, &to_name)?;
1924                CatalogState::add_to_audit_log(
1925                    &state.system_configuration,
1926                    oracle_write_ts,
1927                    session,
1928                    tx,
1929                    audit_events,
1930                    EventType::Alter,
1931                    ObjectType::Cluster,
1932                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1933                        id: id.to_string(),
1934                        old_name: name.clone(),
1935                        new_name: to_name.clone(),
1936                    }),
1937                )?;
1938                info!("rename cluster {name} to {to_name}");
1939            }
1940            Op::RenameClusterReplica {
1941                cluster_id,
1942                replica_id,
1943                name,
1944                to_name,
1945            } => {
1946                if is_reserved_name(&to_name) {
1947                    return Err(AdapterError::Catalog(Error::new(
1948                        ErrorKind::ReservedReplicaName(to_name),
1949                    )));
1950                }
1951                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1952                CatalogState::add_to_audit_log(
1953                    &state.system_configuration,
1954                    oracle_write_ts,
1955                    session,
1956                    tx,
1957                    audit_events,
1958                    EventType::Alter,
1959                    ObjectType::ClusterReplica,
1960                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1961                        cluster_id: cluster_id.to_string(),
1962                        replica_id: replica_id.to_string(),
1963                        old_name: name.replica.as_str().to_string(),
1964                        new_name: to_name.clone(),
1965                    }),
1966                )?;
1967                info!("rename cluster replica {name} to {to_name}");
1968            }
1969            Op::RenameItem {
1970                id,
1971                to_name,
1972                current_full_name,
1973            } => {
1974                let mut updates = Vec::new();
1975
1976                let entry = state.get_entry(&id);
1977                if let CatalogItem::Type(_) = entry.item() {
1978                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1979                        current_full_name.to_string(),
1980                    ))));
1981                }
1982
1983                if entry.id().is_system() {
1984                    let name = state
1985                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1986                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1987                        name.to_string(),
1988                    ))));
1989                }
1990
1991                let mut to_full_name = current_full_name.clone();
1992                to_full_name.item.clone_from(&to_name);
1993
1994                let mut to_qualified_name = entry.name().clone();
1995                to_qualified_name.item.clone_from(&to_name);
1996
1997                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1998                    id: id.to_string(),
1999                    old_name: Self::full_name_detail(&current_full_name),
2000                    new_name: Self::full_name_detail(&to_full_name),
2001                });
2002                if Self::should_audit_log_item(entry.item()) {
2003                    CatalogState::add_to_audit_log(
2004                        &state.system_configuration,
2005                        oracle_write_ts,
2006                        session,
2007                        tx,
2008                        audit_events,
2009                        EventType::Alter,
2010                        catalog_type_to_audit_object_type(entry.item().typ()),
2011                        details,
2012                    )?;
2013                }
2014
2015                // Rename item itself.
2016                let mut new_entry = entry.clone();
2017                new_entry.name.item.clone_from(&to_name);
2018                new_entry.item = entry
2019                    .item()
2020                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2021                    .map_err(|e| {
2022                        Error::new(ErrorKind::from(AmbiguousRename {
2023                            depender: state
2024                                .resolve_full_name(entry.name(), entry.conn_id())
2025                                .to_string(),
2026                            dependee: state
2027                                .resolve_full_name(entry.name(), entry.conn_id())
2028                                .to_string(),
2029                            message: e,
2030                        }))
2031                    })?;
2032
2033                for id in entry.referenced_by() {
2034                    let dependent_item = state.get_entry(id);
2035                    let mut to_entry = dependent_item.clone();
2036                    to_entry.item = dependent_item
2037                        .item()
2038                        .rename_item_refs(
2039                            current_full_name.clone(),
2040                            to_full_name.item.clone(),
2041                            false,
2042                        )
2043                        .map_err(|e| {
2044                            Error::new(ErrorKind::from(AmbiguousRename {
2045                                depender: state
2046                                    .resolve_full_name(
2047                                        dependent_item.name(),
2048                                        dependent_item.conn_id(),
2049                                    )
2050                                    .to_string(),
2051                                dependee: state
2052                                    .resolve_full_name(entry.name(), entry.conn_id())
2053                                    .to_string(),
2054                                message: e,
2055                            }))
2056                        })?;
2057
2058                    if !to_entry.item().is_temporary() {
2059                        tx.update_item(*id, to_entry.into())?;
2060                    } else {
2061                        temporary_item_updates
2062                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2063                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2064                    }
2065                    updates.push(*id);
2066                }
2067                if !new_entry.item().is_temporary() {
2068                    tx.update_item(id, new_entry.into())?;
2069                } else {
2070                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2071                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2072                }
2073
2074                updates.push(id);
2075                for id in updates {
2076                    Self::log_update(state, &id);
2077                }
2078            }
2079            Op::RenameSchema {
2080                database_spec,
2081                schema_spec,
2082                new_name,
2083                check_reserved_names,
2084            } => {
2085                if check_reserved_names && is_reserved_name(&new_name) {
2086                    return Err(AdapterError::Catalog(Error::new(
2087                        ErrorKind::ReservedSchemaName(new_name),
2088                    )));
2089                }
2090
2091                let conn_id = session
2092                    .map(|session| session.conn_id())
2093                    .unwrap_or(&SYSTEM_CONN_ID);
2094
2095                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2096                let cur_name = schema.name().schema.clone();
2097
2098                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2099                    return Err(AdapterError::Catalog(Error::new(
2100                        ErrorKind::AmbientSchemaRename(cur_name),
2101                    )));
2102                };
2103                let database = state.get_database(&database_id);
2104                let database_name = &database.name;
2105
2106                let mut updates = Vec::new();
2107                let mut items_to_update = BTreeMap::new();
2108
2109                let mut update_item = |id| {
2110                    if items_to_update.contains_key(id) {
2111                        return Ok(());
2112                    }
2113
2114                    let entry = state.get_entry(id);
2115
2116                    // Update our item.
2117                    let mut new_entry = entry.clone();
2118                    new_entry.item = entry
2119                        .item
2120                        .rename_schema_refs(database_name, &cur_name, &new_name)
2121                        .map_err(|(s, _i)| {
2122                            Error::new(ErrorKind::from(AmbiguousRename {
2123                                depender: state
2124                                    .resolve_full_name(entry.name(), entry.conn_id())
2125                                    .to_string(),
2126                                dependee: format!("{database_name}.{cur_name}"),
2127                                message: format!("ambiguous reference to schema named {s}"),
2128                            }))
2129                        })?;
2130
2131                    // Queue updates for Catalog storage and Builtin Tables.
2132                    if !new_entry.item().is_temporary() {
2133                        items_to_update.insert(*id, new_entry.into());
2134                    } else {
2135                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2136                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2137                    }
2138                    updates.push(id);
2139
2140                    Ok::<_, AdapterError>(())
2141                };
2142
2143                // Update all of the items in the schema.
2144                for (_name, item_id) in &schema.items {
2145                    // Update the item itself.
2146                    update_item(item_id)?;
2147
2148                    // Update everything that depends on this item.
2149                    for id in state.get_entry(item_id).referenced_by() {
2150                        update_item(id)?;
2151                    }
2152                }
2153                // Note: When updating the transaction it's very important that we update the
2154                // items as a whole group, otherwise we exhibit quadratic behavior.
2155                tx.update_items(items_to_update)?;
2156
2157                // Renaming temporary schemas is not supported.
2158                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2159                    let schema_name = schema.name().schema.clone();
2160                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2161                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2162                    )));
2163                };
2164
2165                // Add an entry to the audit log.
2166                let database_name = database_spec
2167                    .id()
2168                    .map(|id| state.get_database(&id).name.clone());
2169                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2170                    id: schema_id.to_string(),
2171                    old_name: schema.name().schema.clone(),
2172                    new_name: new_name.clone(),
2173                    database_name,
2174                });
2175                CatalogState::add_to_audit_log(
2176                    &state.system_configuration,
2177                    oracle_write_ts,
2178                    session,
2179                    tx,
2180                    audit_events,
2181                    EventType::Alter,
2182                    mz_audit_log::ObjectType::Schema,
2183                    details,
2184                )?;
2185
2186                // Update the schema itself.
2187                let mut new_schema = schema.clone();
2188                new_schema.name.schema.clone_from(&new_name);
2189                tx.update_schema(schema_id, new_schema.into())?;
2190
2191                for id in updates {
2192                    Self::log_update(state, id);
2193                }
2194            }
2195            Op::UpdateOwner { id, new_owner } => {
2196                let conn_id = session
2197                    .map(|session| session.conn_id())
2198                    .unwrap_or(&SYSTEM_CONN_ID);
2199                let old_owner = state
2200                    .get_owner_id(&id, conn_id)
2201                    .expect("cannot update the owner of an object without an owner");
2202                match &id {
2203                    ObjectId::Cluster(id) => {
2204                        let mut cluster = state.get_cluster(*id).clone();
2205                        if id.is_system() {
2206                            return Err(AdapterError::Catalog(Error::new(
2207                                ErrorKind::ReadOnlyCluster(cluster.name),
2208                            )));
2209                        }
2210                        Self::update_privilege_owners(
2211                            &mut cluster.privileges,
2212                            cluster.owner_id,
2213                            new_owner,
2214                        );
2215                        cluster.owner_id = new_owner;
2216                        tx.update_cluster(*id, cluster.into())?;
2217                    }
2218                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2219                        let cluster = state.get_cluster(*cluster_id);
2220                        let mut replica = cluster
2221                            .replica(*replica_id)
2222                            .expect("catalog out of sync")
2223                            .clone();
2224                        if replica_id.is_system() {
2225                            return Err(AdapterError::Catalog(Error::new(
2226                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2227                            )));
2228                        }
2229                        replica.owner_id = new_owner;
2230                        tx.update_cluster_replica(*replica_id, replica.into())?;
2231                    }
2232                    ObjectId::Database(id) => {
2233                        let mut database = state.get_database(id).clone();
2234                        if id.is_system() {
2235                            return Err(AdapterError::Catalog(Error::new(
2236                                ErrorKind::ReadOnlyDatabase(database.name),
2237                            )));
2238                        }
2239                        Self::update_privilege_owners(
2240                            &mut database.privileges,
2241                            database.owner_id,
2242                            new_owner,
2243                        );
2244                        database.owner_id = new_owner;
2245                        tx.update_database(*id, database.clone().into())?;
2246                    }
2247                    ObjectId::Schema((database_spec, schema_spec)) => {
2248                        let schema_id: SchemaId = schema_spec.clone().into();
2249                        let mut schema = state
2250                            .get_schema(database_spec, schema_spec, conn_id)
2251                            .clone();
2252                        if schema_id.is_system() {
2253                            let name = schema.name();
2254                            let full_name = state.resolve_full_schema_name(name);
2255                            return Err(AdapterError::Catalog(Error::new(
2256                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2257                            )));
2258                        }
2259                        Self::update_privilege_owners(
2260                            &mut schema.privileges,
2261                            schema.owner_id,
2262                            new_owner,
2263                        );
2264                        schema.owner_id = new_owner;
2265                        tx.update_schema(schema_id, schema.into())?;
2266                    }
2267                    ObjectId::Item(id) => {
2268                        let entry = state.get_entry(id);
2269                        let mut new_entry = entry.clone();
2270                        if id.is_system() {
2271                            let full_name = state.resolve_full_name(
2272                                new_entry.name(),
2273                                session.map(|session| session.conn_id()),
2274                            );
2275                            return Err(AdapterError::Catalog(Error::new(
2276                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2277                            )));
2278                        }
2279                        Self::update_privilege_owners(
2280                            &mut new_entry.privileges,
2281                            new_entry.owner_id,
2282                            new_owner,
2283                        );
2284                        new_entry.owner_id = new_owner;
2285                        if !new_entry.item().is_temporary() {
2286                            tx.update_item(*id, new_entry.into())?;
2287                        } else {
2288                            temporary_item_updates
2289                                .push((entry.clone().into(), StateDiff::Retraction));
2290                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2291                        }
2292                    }
2293                    ObjectId::NetworkPolicy(id) => {
2294                        let mut policy = state.get_network_policy(id).clone();
2295                        if id.is_system() {
2296                            return Err(AdapterError::Catalog(Error::new(
2297                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2298                            )));
2299                        }
2300                        Self::update_privilege_owners(
2301                            &mut policy.privileges,
2302                            policy.owner_id,
2303                            new_owner,
2304                        );
2305                        policy.owner_id = new_owner;
2306                        tx.update_network_policy(*id, policy.into())?;
2307                    }
2308                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2309                }
2310                let object_type = state.get_object_type(&id);
2311                CatalogState::add_to_audit_log(
2312                    &state.system_configuration,
2313                    oracle_write_ts,
2314                    session,
2315                    tx,
2316                    audit_events,
2317                    EventType::Alter,
2318                    object_type_to_audit_object_type(object_type),
2319                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2320                        object_id: id.to_string(),
2321                        old_owner_id: old_owner.to_string(),
2322                        new_owner_id: new_owner.to_string(),
2323                    }),
2324                )?;
2325            }
2326            Op::UpdateClusterConfig { id, name, config } => {
2327                let mut cluster = state.get_cluster(id).clone();
2328                cluster.config = config;
2329                tx.update_cluster(id, cluster.into())?;
2330                info!("update cluster {}", name);
2331
2332                CatalogState::add_to_audit_log(
2333                    &state.system_configuration,
2334                    oracle_write_ts,
2335                    session,
2336                    tx,
2337                    audit_events,
2338                    EventType::Alter,
2339                    ObjectType::Cluster,
2340                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2341                        id: id.to_string(),
2342                        name,
2343                    }),
2344                )?;
2345            }
2346            Op::UpdateClusterReplicaConfig {
2347                replica_id,
2348                cluster_id,
2349                config,
2350            } => {
2351                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2352                info!("update replica {}", replica.name);
2353                tx.update_cluster_replica(
2354                    replica_id,
2355                    mz_catalog::durable::ClusterReplica {
2356                        cluster_id,
2357                        replica_id,
2358                        name: replica.name.clone(),
2359                        config: config.clone().into(),
2360                        owner_id: replica.owner_id,
2361                    },
2362                )?;
2363            }
2364            Op::UpdateItem { id, name, to_item } => {
2365                let mut entry = state.get_entry(&id).clone();
2366                entry.name = name.clone();
2367                entry.item = to_item.clone();
2368                tx.update_item(id, entry.into())?;
2369
2370                if Self::should_audit_log_item(&to_item) {
2371                    let mut full_name = Self::full_name_detail(
2372                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2373                    );
2374                    full_name.item = name.item;
2375
2376                    CatalogState::add_to_audit_log(
2377                        &state.system_configuration,
2378                        oracle_write_ts,
2379                        session,
2380                        tx,
2381                        audit_events,
2382                        EventType::Alter,
2383                        catalog_type_to_audit_object_type(to_item.typ()),
2384                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2385                            id: id.to_string(),
2386                            name: full_name,
2387                        }),
2388                    )?;
2389                }
2390
2391                Self::log_update(state, &id);
2392            }
2393            Op::UpdateSystemConfiguration { name, value } => {
2394                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2395                tx.upsert_system_config(&name, parsed_value.clone())?;
2396                // This mirrors some "system vars" into the catalog storage
2397                // "config" collection so that we can toggle the flag with
2398                // Launch Darkly, but use it in boot before Launch Darkly is
2399                // available.
2400                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2401                    let with_0dt_deployment_max_wait =
2402                        Duration::parse(VarInput::Flat(&parsed_value))
2403                            .expect("parsing succeeded above");
2404                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2405                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2406                    let with_0dt_deployment_ddl_check_interval =
2407                        Duration::parse(VarInput::Flat(&parsed_value))
2408                            .expect("parsing succeeded above");
2409                    tx.set_0dt_deployment_ddl_check_interval(
2410                        with_0dt_deployment_ddl_check_interval,
2411                    )?;
2412                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2413                    let panic_after_timeout =
2414                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2415                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2416                }
2417
2418                CatalogState::add_to_audit_log(
2419                    &state.system_configuration,
2420                    oracle_write_ts,
2421                    session,
2422                    tx,
2423                    audit_events,
2424                    EventType::Alter,
2425                    ObjectType::System,
2426                    EventDetails::SetV1(mz_audit_log::SetV1 {
2427                        name,
2428                        value: Some(value.borrow().to_vec().join(", ")),
2429                    }),
2430                )?;
2431            }
2432            Op::ResetSystemConfiguration { name } => {
2433                tx.remove_system_config(&name);
2434                // This mirrors some "system vars" into the catalog storage
2435                // "config" collection so that we can toggle the flag with
2436                // Launch Darkly, but use it in boot before Launch Darkly is
2437                // available.
2438                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2439                    tx.reset_0dt_deployment_max_wait()?;
2440                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2441                    tx.reset_0dt_deployment_ddl_check_interval()?;
2442                }
2443
2444                CatalogState::add_to_audit_log(
2445                    &state.system_configuration,
2446                    oracle_write_ts,
2447                    session,
2448                    tx,
2449                    audit_events,
2450                    EventType::Alter,
2451                    ObjectType::System,
2452                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2453                )?;
2454            }
2455            Op::ResetAllSystemConfiguration => {
2456                tx.clear_system_configs();
2457                tx.reset_0dt_deployment_max_wait()?;
2458                tx.reset_0dt_deployment_ddl_check_interval()?;
2459
2460                CatalogState::add_to_audit_log(
2461                    &state.system_configuration,
2462                    oracle_write_ts,
2463                    session,
2464                    tx,
2465                    audit_events,
2466                    EventType::Alter,
2467                    ObjectType::System,
2468                    EventDetails::ResetAllV1,
2469                )?;
2470            }
2471            Op::WeirdStorageUsageUpdates {
2472                object_id,
2473                size_bytes,
2474                collection_timestamp,
2475            } => {
2476                let id = tx.allocate_storage_usage_ids()?;
2477                let metric =
2478                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2479                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2480                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2481                weird_builtin_table_update = Some(builtin_table_update);
2482            }
2483        };
2484        Ok((weird_builtin_table_update, temporary_item_updates))
2485    }
2486
2487    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2488        let entry = state.get_entry(id);
2489        info!(
2490            "update {} {} ({})",
2491            entry.item_type(),
2492            state.resolve_full_name(entry.name(), entry.conn_id()),
2493            id
2494        );
2495    }
2496
2497    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2498    /// implementation:
2499    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2500    fn update_privilege_owners(
2501        privileges: &mut PrivilegeMap,
2502        old_owner: RoleId,
2503        new_owner: RoleId,
2504    ) {
2505        // TODO(jkosh44) Would be nice not to clone every privilege.
2506        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2507
2508        let mut new_present = false;
2509        for privilege in flat_privileges.iter_mut() {
2510            // Old owner's granted privilege are updated to be granted by the new
2511            // owner.
2512            if privilege.grantor == old_owner {
2513                privilege.grantor = new_owner;
2514            } else if privilege.grantor == new_owner {
2515                new_present = true;
2516            }
2517            // Old owner's privileges is given to the new owner.
2518            if privilege.grantee == old_owner {
2519                privilege.grantee = new_owner;
2520            } else if privilege.grantee == new_owner {
2521                new_present = true;
2522            }
2523        }
2524
2525        // If the old privilege list contained references to the new owner, we may
2526        // have created duplicate entries. Here we try and consolidate them. This
2527        // is inspired by PostgreSQL's algorithm but not identical.
2528        if new_present {
2529            // Group privileges by (grantee, grantor).
2530            let privilege_map: BTreeMap<_, Vec<_>> =
2531                flat_privileges
2532                    .into_iter()
2533                    .fold(BTreeMap::new(), |mut accum, privilege| {
2534                        accum
2535                            .entry((privilege.grantee, privilege.grantor))
2536                            .or_default()
2537                            .push(privilege);
2538                        accum
2539                    });
2540
2541            // Consolidate and update all privileges.
2542            flat_privileges = privilege_map
2543                .into_iter()
2544                .map(|((grantee, grantor), values)|
2545                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2546                    values.into_iter().fold(
2547                        MzAclItem::empty(grantee, grantor),
2548                        |mut accum, mz_aclitem| {
2549                            accum.acl_mode =
2550                                accum.acl_mode.union(mz_aclitem.acl_mode);
2551                            accum
2552                        },
2553                    ))
2554                .collect();
2555        }
2556
2557        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2558    }
2559}
2560
2561/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2562///
2563/// Note: Previously we used to omit a single `Op::DropObject` for every object
2564/// we needed to drop. But removing a batch of objects from a durable Catalog
2565/// Transaction is O(n) where `n` is the number of objects that exist in the
2566/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2567/// `DROP ... CASCADE` statement.
2568#[derive(Debug, Default)]
2569pub(crate) struct ObjectsToDrop {
2570    pub comments: BTreeSet<CommentObjectId>,
2571    pub databases: BTreeSet<DatabaseId>,
2572    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2573    pub clusters: BTreeSet<ClusterId>,
2574    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2575    pub roles: BTreeSet<RoleId>,
2576    pub items: Vec<CatalogItemId>,
2577    pub network_policies: BTreeSet<NetworkPolicyId>,
2578}
2579
2580impl ObjectsToDrop {
2581    pub fn generate(
2582        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2583        state: &CatalogState,
2584        session: Option<&ConnMeta>,
2585    ) -> Result<Self, AdapterError> {
2586        let mut delta = ObjectsToDrop::default();
2587
2588        for drop_object_info in drop_object_infos {
2589            delta.add_item(drop_object_info, state, session)?;
2590        }
2591
2592        Ok(delta)
2593    }
2594
2595    fn add_item(
2596        &mut self,
2597        drop_object_info: DropObjectInfo,
2598        state: &CatalogState,
2599        session: Option<&ConnMeta>,
2600    ) -> Result<(), AdapterError> {
2601        self.comments
2602            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2603
2604        match drop_object_info {
2605            DropObjectInfo::Database(database_id) => {
2606                let database = &state.database_by_id[&database_id];
2607                if database_id.is_system() {
2608                    return Err(AdapterError::Catalog(Error::new(
2609                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2610                    )));
2611                }
2612
2613                self.databases.insert(database_id);
2614            }
2615            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2616                let schema = state.get_schema(
2617                    &database_spec,
2618                    &schema_spec,
2619                    session
2620                        .map(|session| session.conn_id())
2621                        .unwrap_or(&SYSTEM_CONN_ID),
2622                );
2623                let schema_id: SchemaId = schema_spec.into();
2624                if schema_id.is_system() {
2625                    let name = schema.name();
2626                    let full_name = state.resolve_full_schema_name(name);
2627                    return Err(AdapterError::Catalog(Error::new(
2628                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2629                    )));
2630                }
2631
2632                self.schemas.insert(schema_spec, database_spec);
2633            }
2634            DropObjectInfo::Role(role_id) => {
2635                let name = state.get_role(&role_id).name().to_string();
2636                if role_id.is_system() || role_id.is_predefined() {
2637                    return Err(AdapterError::Catalog(Error::new(
2638                        ErrorKind::ReservedRoleName(name.clone()),
2639                    )));
2640                }
2641                state.ensure_not_reserved_role(&role_id)?;
2642
2643                self.roles.insert(role_id);
2644            }
2645            DropObjectInfo::Cluster(cluster_id) => {
2646                let cluster = state.get_cluster(cluster_id);
2647                let name = &cluster.name;
2648                if cluster_id.is_system() {
2649                    return Err(AdapterError::Catalog(Error::new(
2650                        ErrorKind::ReadOnlyCluster(name.clone()),
2651                    )));
2652                }
2653
2654                self.clusters.insert(cluster_id);
2655            }
2656            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2657                let cluster = state.get_cluster(cluster_id);
2658                let replica = cluster.replica(replica_id).expect("Must exist");
2659
2660                self.replicas
2661                    .insert(replica.replica_id, (cluster.id, reason));
2662            }
2663            DropObjectInfo::Item(item_id) => {
2664                let entry = state.get_entry(&item_id);
2665                if item_id.is_system() {
2666                    let name = entry.name();
2667                    let full_name =
2668                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2669                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2670                        full_name.to_string(),
2671                    ))));
2672                }
2673
2674                self.items.push(item_id);
2675            }
2676            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2677                let policy = state.get_network_policy(&network_policy_id);
2678                let name = &policy.name;
2679                if network_policy_id.is_system() {
2680                    return Err(AdapterError::Catalog(Error::new(
2681                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2682                    )));
2683                }
2684
2685                self.network_policies.insert(network_policy_id);
2686            }
2687        }
2688
2689        Ok(())
2690    }
2691}
2692
2693#[cfg(test)]
2694mod tests {
2695    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2696    use mz_repr::role_id::RoleId;
2697
2698    use crate::catalog::Catalog;
2699
2700    #[mz_ore::test]
2701    fn test_update_privilege_owners() {
2702        let old_owner = RoleId::User(1);
2703        let new_owner = RoleId::User(2);
2704        let other_role = RoleId::User(3);
2705
2706        // older owner exists as grantor.
2707        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2708            MzAclItem {
2709                grantee: other_role,
2710                grantor: old_owner,
2711                acl_mode: AclMode::UPDATE,
2712            },
2713            MzAclItem {
2714                grantee: other_role,
2715                grantor: new_owner,
2716                acl_mode: AclMode::SELECT,
2717            },
2718        ]);
2719        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2720        assert_eq!(1, privileges.all_values().count());
2721        assert_eq!(
2722            vec![MzAclItem {
2723                grantee: other_role,
2724                grantor: new_owner,
2725                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2726            }],
2727            privileges.all_values_owned().collect::<Vec<_>>()
2728        );
2729
2730        // older owner exists as grantee.
2731        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2732            MzAclItem {
2733                grantee: old_owner,
2734                grantor: other_role,
2735                acl_mode: AclMode::UPDATE,
2736            },
2737            MzAclItem {
2738                grantee: new_owner,
2739                grantor: other_role,
2740                acl_mode: AclMode::SELECT,
2741            },
2742        ]);
2743        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2744        assert_eq!(1, privileges.all_values().count());
2745        assert_eq!(
2746            vec![MzAclItem {
2747                grantee: new_owner,
2748                grantor: other_role,
2749                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2750            }],
2751            privileges.all_values_owned().collect::<Vec<_>>()
2752        );
2753
2754        // older owner exists as grantee and grantor.
2755        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2756            MzAclItem {
2757                grantee: old_owner,
2758                grantor: old_owner,
2759                acl_mode: AclMode::UPDATE,
2760            },
2761            MzAclItem {
2762                grantee: new_owner,
2763                grantor: new_owner,
2764                acl_mode: AclMode::SELECT,
2765            },
2766        ]);
2767        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2768        assert_eq!(1, privileges.all_values().count());
2769        assert_eq!(
2770            vec![MzAclItem {
2771                grantee: new_owner,
2772                grantor: new_owner,
2773                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2774            }],
2775            privileges.all_values_owned().collect::<Vec<_>>()
2776        );
2777    }
2778}