Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff,
35    StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52    RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72    is_reserved_role_name, object_type_to_audit_object_type,
73    system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82    AlterRetainHistory {
83        id: CatalogItemId,
84        value: Option<Value>,
85        window: CompactionWindow,
86    },
87    AlterRole {
88        id: RoleId,
89        name: String,
90        attributes: RoleAttributesRaw,
91        nopassword: bool,
92        vars: RoleVars,
93    },
94    AlterNetworkPolicy {
95        id: NetworkPolicyId,
96        rules: Vec<NetworkPolicyRule>,
97        name: String,
98        owner_id: RoleId,
99    },
100    AlterAddColumn {
101        id: CatalogItemId,
102        new_global_id: GlobalId,
103        name: ColumnName,
104        typ: SqlColumnType,
105        sql: RawDataType,
106    },
107    AlterMaterializedViewApplyReplacement {
108        id: CatalogItemId,
109        replacement_id: CatalogItemId,
110    },
111    CreateDatabase {
112        name: String,
113        owner_id: RoleId,
114    },
115    CreateSchema {
116        database_id: ResolvedDatabaseSpecifier,
117        schema_name: String,
118        owner_id: RoleId,
119    },
120    CreateRole {
121        name: String,
122        attributes: RoleAttributesRaw,
123    },
124    CreateCluster {
125        id: ClusterId,
126        name: String,
127        introspection_sources: Vec<&'static BuiltinLog>,
128        owner_id: RoleId,
129        config: ClusterConfig,
130    },
131    CreateClusterReplica {
132        cluster_id: ClusterId,
133        name: String,
134        config: ReplicaConfig,
135        owner_id: RoleId,
136        reason: ReplicaCreateDropReason,
137    },
138    CreateItem {
139        id: CatalogItemId,
140        name: QualifiedItemName,
141        item: CatalogItem,
142        owner_id: RoleId,
143    },
144    CreateNetworkPolicy {
145        rules: Vec<NetworkPolicyRule>,
146        name: String,
147        owner_id: RoleId,
148    },
149    Comment {
150        object_id: CommentObjectId,
151        sub_component: Option<usize>,
152        comment: Option<String>,
153    },
154    DropObjects(Vec<DropObjectInfo>),
155    GrantRole {
156        role_id: RoleId,
157        member_id: RoleId,
158        grantor_id: RoleId,
159    },
160    RenameCluster {
161        id: ClusterId,
162        name: String,
163        to_name: String,
164        check_reserved_names: bool,
165    },
166    RenameClusterReplica {
167        cluster_id: ClusterId,
168        replica_id: ReplicaId,
169        name: QualifiedReplica,
170        to_name: String,
171    },
172    RenameItem {
173        id: CatalogItemId,
174        current_full_name: FullItemName,
175        to_name: String,
176    },
177    RenameSchema {
178        database_spec: ResolvedDatabaseSpecifier,
179        schema_spec: SchemaSpecifier,
180        new_name: String,
181        check_reserved_names: bool,
182    },
183    UpdateOwner {
184        id: ObjectId,
185        new_owner: RoleId,
186    },
187    UpdatePrivilege {
188        target_id: SystemObjectId,
189        privilege: MzAclItem,
190        variant: UpdatePrivilegeVariant,
191    },
192    UpdateDefaultPrivilege {
193        privilege_object: DefaultPrivilegeObject,
194        privilege_acl_item: DefaultPrivilegeAclItem,
195        variant: UpdatePrivilegeVariant,
196    },
197    RevokeRole {
198        role_id: RoleId,
199        member_id: RoleId,
200        grantor_id: RoleId,
201    },
202    UpdateClusterConfig {
203        id: ClusterId,
204        name: String,
205        config: ClusterConfig,
206    },
207    UpdateClusterReplicaConfig {
208        cluster_id: ClusterId,
209        replica_id: ReplicaId,
210        config: ReplicaConfig,
211    },
212    UpdateItem {
213        id: CatalogItemId,
214        name: QualifiedItemName,
215        to_item: CatalogItem,
216    },
217    UpdateSourceReferences {
218        source_id: CatalogItemId,
219        references: SourceReferences,
220    },
221    UpdateSystemConfiguration {
222        name: String,
223        value: OwnedVarInput,
224    },
225    ResetSystemConfiguration {
226        name: String,
227    },
228    ResetAllSystemConfiguration,
229    /// Performs updates to the storage usage table, which probably should be a builtin source.
230    ///
231    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
232    /// might not work. If a process crashes after collecting storage usage events
233    /// but before updating the builtin table, then another listening catalog
234    /// will never know to update the builtin table.
235    WeirdStorageUsageUpdates {
236        object_id: Option<String>,
237        size_bytes: u64,
238        collection_timestamp: EpochMillis,
239    },
240    /// Performs a dry run of the commit, but errors with
241    /// [`AdapterError::TransactionDryRun`].
242    ///
243    /// When using this value, it should be included only as the last element of
244    /// the transaction and should not be the only value in the transaction.
245    TransactionDryRun,
246}
247
248/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
249/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
250/// the `Op::DropObjects`.
251#[derive(Debug, Clone)]
252pub enum DropObjectInfo {
253    Cluster(ClusterId),
254    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
255    Database(DatabaseId),
256    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
257    Role(RoleId),
258    Item(CatalogItemId),
259    NetworkPolicy(NetworkPolicyId),
260}
261
262impl DropObjectInfo {
263    /// Creates a `DropObjectInfo` from an `ObjectId`.
264    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
265    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
266        match id {
267            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
268            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
269                cluster_id,
270                replica_id,
271                ReplicaCreateDropReason::Manual,
272            )),
273            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
274            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
275            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
276            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
277            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
278        }
279    }
280
281    /// Creates an `ObjectId` from a `DropObjectInfo`.
282    /// Loses the `ReplicaCreateDropReason` if there is one!
283    fn to_object_id(&self) -> ObjectId {
284        match &self {
285            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
286            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
287                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
288            }
289            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
290            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
291            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
292            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
293            DropObjectInfo::NetworkPolicy(network_policy_id) => {
294                ObjectId::NetworkPolicy(network_policy_id.clone())
295            }
296        }
297    }
298}
299
300/// The reason for creating or dropping a replica.
301#[derive(Debug, Clone)]
302pub enum ReplicaCreateDropReason {
303    /// The user initiated the replica create or drop, e.g., by
304    /// - creating/dropping a cluster,
305    /// - ALTERing various options on a managed cluster,
306    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
307    Manual,
308    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
309    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
310    ClusterScheduling(Vec<SchedulingDecision>),
311}
312
313impl ReplicaCreateDropReason {
314    pub fn into_audit_log(
315        self,
316    ) -> (
317        CreateOrDropClusterReplicaReasonV1,
318        Option<SchedulingDecisionsWithReasonsV2>,
319    ) {
320        let (reason, scheduling_policies) = match self {
321            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
322            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
323                CreateOrDropClusterReplicaReasonV1::Schedule,
324                Some(scheduling_decisions),
325            ),
326        };
327        (
328            reason,
329            scheduling_policies
330                .as_ref()
331                .map(SchedulingDecision::reasons_to_audit_log_reasons),
332        )
333    }
334}
335
336pub struct TransactionResult {
337    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
338    /// Parsed catalog updates from which we will derive catalog implications.
339    pub catalog_updates: Vec<ParsedStateUpdate>,
340    pub audit_events: Vec<VersionedEvent>,
341}
342
343impl Catalog {
344    fn should_audit_log_item(item: &CatalogItem) -> bool {
345        !item.is_temporary()
346    }
347
348    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
349    /// within a connection id.
350    fn temporary_ids(
351        &self,
352        ops: &[Op],
353        temporary_drops: BTreeSet<(&ConnectionId, String)>,
354    ) -> Result<BTreeSet<CatalogItemId>, Error> {
355        let mut creating = BTreeSet::new();
356        let mut temporary_ids = BTreeSet::new();
357        for op in ops.iter() {
358            if let Op::CreateItem {
359                id,
360                name,
361                item,
362                owner_id: _,
363            } = op
364            {
365                if let Some(conn_id) = item.conn_id() {
366                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
367                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
368                        || creating.contains(&(conn_id, &name.item))
369                    {
370                        return Err(
371                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
372                        );
373                    } else {
374                        creating.insert((conn_id, &name.item));
375                        temporary_ids.insert(id.clone());
376                    }
377                }
378            }
379        }
380        Ok(temporary_ids)
381    }
382
383    #[instrument(name = "catalog::transact")]
384    pub async fn transact(
385        &mut self,
386        // n.b. this is an option to prevent us from needing to build out a
387        // dummy impl of `StorageController` for tests.
388        storage_collections: Option<
389            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
390        >,
391        oracle_write_ts: mz_repr::Timestamp,
392        session: Option<&ConnMeta>,
393        ops: Vec<Op>,
394    ) -> Result<TransactionResult, AdapterError> {
395        trace!("transact: {:?}", ops);
396        fail::fail_point!("catalog_transact", |arg| {
397            Err(AdapterError::Unstructured(anyhow::anyhow!(
398                "failpoint: {arg:?}"
399            )))
400        });
401
402        let drop_ids: BTreeSet<CatalogItemId> = ops
403            .iter()
404            .filter_map(|op| match op {
405                Op::DropObjects(drop_object_infos) => {
406                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
407                    let item_ids = ids.filter_map(|id| match id {
408                        ObjectId::Item(id) => Some(id),
409                        _ => None,
410                    });
411                    Some(item_ids)
412                }
413                _ => None,
414            })
415            .flatten()
416            .collect();
417        let temporary_drops = drop_ids
418            .iter()
419            .filter_map(|id| {
420                let entry = self.get_entry(id);
421                match entry.item().conn_id() {
422                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
423                    None => None,
424                }
425            })
426            .collect();
427        let dropped_global_ids = drop_ids
428            .iter()
429            .flat_map(|item_id| self.get_global_ids(item_id))
430            .collect();
431
432        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
433        let mut builtin_table_updates = vec![];
434        let mut catalog_updates = vec![];
435        let mut audit_events = vec![];
436        let mut storage = self.storage().await;
437        let mut tx = storage
438            .transaction()
439            .await
440            .unwrap_or_terminate("starting catalog transaction");
441
442        let new_state = Self::transact_inner(
443            storage_collections,
444            oracle_write_ts,
445            session,
446            ops,
447            temporary_ids,
448            &mut builtin_table_updates,
449            &mut catalog_updates,
450            &mut audit_events,
451            &mut tx,
452            &self.state,
453        )
454        .await?;
455
456        // The user closure was successful, apply the updates. Terminate the
457        // process if this fails, because we have to restart envd due to
458        // indeterminate catalog state, which we only reconcile during catalog
459        // init.
460        tx.commit(oracle_write_ts)
461            .await
462            .unwrap_or_terminate("catalog storage transaction commit must succeed");
463
464        // Dropping here keeps the mutable borrow on self, preventing us accidentally
465        // mutating anything until after f is executed.
466        drop(storage);
467        if let Some(new_state) = new_state {
468            self.transient_revision += 1;
469            self.state = new_state;
470        }
471
472        // Drop in-memory planning metadata.
473        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
474        if self.state.system_config().enable_mz_notices() {
475            // Generate retractions for the Builtin tables.
476            self.state().pack_optimizer_notices(
477                &mut builtin_table_updates,
478                dropped_notices.iter(),
479                Diff::MINUS_ONE,
480            );
481        }
482
483        Ok(TransactionResult {
484            builtin_table_updates,
485            catalog_updates,
486            audit_events,
487        })
488    }
489
490    /// Extracts optimized expressions from `Op::CreateItem` operations for views
491    /// and materialized views. These can be used to populate a `LocalExpressionCache`
492    /// to avoid re-optimization during `apply_updates`.
493    fn extract_expressions_from_ops(
494        ops: &[Op],
495        optimizer_features: &OptimizerFeatures,
496    ) -> BTreeMap<GlobalId, LocalExpressions> {
497        let mut exprs = BTreeMap::new();
498
499        for op in ops {
500            if let Op::CreateItem { item, .. } = op {
501                match item {
502                    CatalogItem::View(view) => {
503                        exprs.insert(
504                            view.global_id,
505                            LocalExpressions {
506                                local_mir: (*view.optimized_expr).clone(),
507                                optimizer_features: optimizer_features.clone(),
508                            },
509                        );
510                    }
511                    CatalogItem::MaterializedView(mv) => {
512                        exprs.insert(
513                            mv.global_id_writes(),
514                            LocalExpressions {
515                                local_mir: (*mv.optimized_expr).clone(),
516                                optimizer_features: optimizer_features.clone(),
517                            },
518                        );
519                    }
520                    _ => {}
521                }
522            }
523        }
524
525        exprs
526    }
527
528    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
529    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
530    ///
531    /// # Panics
532    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
533    ///   final element.
534    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
535    #[instrument(name = "catalog::transact_inner")]
536    async fn transact_inner(
537        storage_collections: Option<
538            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
539        >,
540        oracle_write_ts: mz_repr::Timestamp,
541        session: Option<&ConnMeta>,
542        mut ops: Vec<Op>,
543        temporary_ids: BTreeSet<CatalogItemId>,
544        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
545        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
546        audit_events: &mut Vec<VersionedEvent>,
547        tx: &mut Transaction<'_>,
548        state: &CatalogState,
549    ) -> Result<Option<CatalogState>, AdapterError> {
550        // We come up with new catalog state, builtin state updates, and parsed
551        // catalog updates (for deriving catalog implications) in two phases:
552        //
553        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
554        //    one-by-one. This will give us the full list of updates to apply to
555        //    the catalog, which will allow us to apply it in one batch, which
556        //    in turn will allow the apply machinery to consolidate the updates.
557        // 2. We do one final apply call with all updates, which gives us the
558        //    final builtin table updates and parsed catalog updates.
559        //
560        // The reason is that the loop that is working off ops first does a
561        // transact_op to derive the state updates for that op, and then calls
562        // apply_updates on the catalog state. And successive ops might expect
563        // the catalog state to reflect the modified state _after_ applying
564        // previous ops.
565        //
566        // We want to, however, have one final apply_state that takes all the
567        // accumulated updates to derive the required controller updates and the
568        // builtin table updates.
569        //
570        // We won't win any DDL throughput benchmarks, but so far that's not
571        // what we're optimizing for and there would probably be other
572        // bottlenecks before we hit this one as a bottleneck.
573        //
574        // We could work around this by refactoring how the interplay of
575        // transact_op and apply_updates works, but that's a larger undertaking.
576        let mut preliminary_state = Cow::Borrowed(state);
577
578        // The final state that we will return, if modified.
579        let mut state = Cow::Borrowed(state);
580
581        let dry_run_ops = match ops.last() {
582            Some(Op::TransactionDryRun) => {
583                // Remove dry run marker.
584                ops.pop();
585                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
586                ops.clone()
587            }
588            Some(_) => vec![],
589            None => return Ok(None),
590        };
591
592        // Extract optimized expressions from CreateItem ops to avoid re-optimization
593        // during apply_updates. We extract before the loop since `ops` is moved there.
594        let optimizer_features = OptimizerFeatures::from(state.system_config());
595        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
596
597        let mut storage_collections_to_create = BTreeSet::new();
598        let mut storage_collections_to_drop = BTreeSet::new();
599        let mut storage_collections_to_register = BTreeMap::new();
600
601        let mut updates = Vec::new();
602
603        for op in ops {
604            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
605                oracle_write_ts,
606                session,
607                op,
608                &temporary_ids,
609                audit_events,
610                tx,
611                &*preliminary_state,
612                &mut storage_collections_to_create,
613                &mut storage_collections_to_drop,
614                &mut storage_collections_to_register,
615            )
616            .await?;
617
618            // Certain builtin tables are not derived from the durable catalog state, so they need
619            // to be updated ad-hoc based on the current transaction. This is weird and will not
620            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
621            // this instance crashes after committing the durable catalog but before applying the
622            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
623            // world. Currently, this works fine and there are no correctness issues because
624            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
625            // state of all builtin tables to the correct values.
626            if let Some(builtin_table_update) = weird_builtin_table_update {
627                builtin_table_updates.push(builtin_table_update);
628            }
629
630            // Temporary items are not stored in the durable catalog, so they need to be handled
631            // separately for updating state and builtin tables.
632            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
633            // in a multi-subscriber catalog world.
634            let upper = tx.upper();
635            let temporary_item_updates =
636                temporary_item_updates
637                    .into_iter()
638                    .map(|(item, diff)| StateUpdate {
639                        kind: StateUpdateKind::TemporaryItem(item),
640                        ts: upper,
641                        diff,
642                    });
643
644            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
645            op_updates.extend(temporary_item_updates);
646            if !op_updates.is_empty() {
647                // Clone the cache so each apply_updates call has access to cached expressions.
648                // The cache uses `remove` semantics, so we need a fresh clone for each call.
649                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
650                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
651                    .to_mut()
652                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
653                    .await;
654            }
655            updates.append(&mut op_updates);
656        }
657
658        if !updates.is_empty() {
659            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
660            let (op_builtin_table_updates, op_catalog_updates) = state
661                .to_mut()
662                .apply_updates(updates.clone(), &mut local_expr_cache)
663                .await;
664            let op_builtin_table_updates = state
665                .to_mut()
666                .resolve_builtin_table_updates(op_builtin_table_updates);
667            builtin_table_updates.extend(op_builtin_table_updates);
668            parsed_catalog_updates.extend(op_catalog_updates);
669        }
670
671        if dry_run_ops.is_empty() {
672            // `storage_collections` should only be `None` for tests.
673            if let Some(c) = storage_collections {
674                c.prepare_state(
675                    tx,
676                    storage_collections_to_create,
677                    storage_collections_to_drop,
678                    storage_collections_to_register,
679                )
680                .await?;
681            }
682
683            let updates = tx.get_and_commit_op_updates();
684            if !updates.is_empty() {
685                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
686                let (op_builtin_table_updates, op_catalog_updates) = state
687                    .to_mut()
688                    .apply_updates(updates.clone(), &mut local_expr_cache)
689                    .await;
690                let op_builtin_table_updates = state
691                    .to_mut()
692                    .resolve_builtin_table_updates(op_builtin_table_updates);
693                builtin_table_updates.extend(op_builtin_table_updates);
694                parsed_catalog_updates.extend(op_catalog_updates);
695            }
696
697            match state {
698                Cow::Owned(state) => Ok(Some(state)),
699                Cow::Borrowed(_) => Ok(None),
700            }
701        } else {
702            Err(AdapterError::TransactionDryRun {
703                new_ops: dry_run_ops,
704                new_state: state.into_owned(),
705            })
706        }
707    }
708
709    /// Performs the transaction operation described by `op`. This function prepares the changes in
710    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
711    /// changes.
712    ///
713    /// Optionally returns a builtin table update for any builtin table updates than cannot be
714    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
715    /// scenarios and ideally in the future don't exist.
716    #[instrument]
717    async fn transact_op(
718        oracle_write_ts: mz_repr::Timestamp,
719        session: Option<&ConnMeta>,
720        op: Op,
721        temporary_ids: &BTreeSet<CatalogItemId>,
722        audit_events: &mut Vec<VersionedEvent>,
723        tx: &mut Transaction<'_>,
724        state: &CatalogState,
725        storage_collections_to_create: &mut BTreeSet<GlobalId>,
726        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
727        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
728    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
729        let mut weird_builtin_table_update = None;
730        let mut temporary_item_updates = Vec::new();
731
732        match op {
733            Op::TransactionDryRun => {
734                unreachable!("TransactionDryRun can only be used a final element of ops")
735            }
736            Op::AlterRetainHistory { id, value, window } => {
737                let entry = state.get_entry(&id);
738                if id.is_system() {
739                    let name = entry.name();
740                    let full_name =
741                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
742                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
743                        full_name.to_string(),
744                    ))));
745                }
746
747                let mut new_entry = entry.clone();
748                let previous = new_entry
749                    .item
750                    .update_retain_history(value.clone(), window)
751                    .map_err(|_| {
752                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
753                            "planner should have rejected invalid alter retain history item type"
754                                .to_string(),
755                        )))
756                    })?;
757
758                if Self::should_audit_log_item(new_entry.item()) {
759                    let details =
760                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
761                            id: id.to_string(),
762                            old_history: previous.map(|previous| previous.to_string()),
763                            new_history: value.map(|v| v.to_string()),
764                        });
765                    CatalogState::add_to_audit_log(
766                        &state.system_configuration,
767                        oracle_write_ts,
768                        session,
769                        tx,
770                        audit_events,
771                        EventType::Alter,
772                        catalog_type_to_audit_object_type(new_entry.item().typ()),
773                        details,
774                    )?;
775                }
776
777                tx.update_item(id, new_entry.into())?;
778
779                Self::log_update(state, &id);
780            }
781            Op::AlterRole {
782                id,
783                name,
784                attributes,
785                nopassword,
786                vars,
787            } => {
788                state.ensure_not_reserved_role(&id)?;
789
790                let mut existing_role = state.get_role(&id).clone();
791                let password = attributes.password.clone();
792                let scram_iterations = attributes
793                    .scram_iterations
794                    .unwrap_or_else(|| state.system_config().scram_iterations());
795                existing_role.attributes = attributes.into();
796                existing_role.vars = vars;
797                let password_action = if nopassword {
798                    PasswordAction::Clear
799                } else if let Some(password) = password {
800                    PasswordAction::Set(PasswordConfig {
801                        password,
802                        scram_iterations,
803                    })
804                } else {
805                    PasswordAction::NoChange
806                };
807                tx.update_role(id, existing_role.into(), password_action)?;
808
809                CatalogState::add_to_audit_log(
810                    &state.system_configuration,
811                    oracle_write_ts,
812                    session,
813                    tx,
814                    audit_events,
815                    EventType::Alter,
816                    ObjectType::Role,
817                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
818                        id: id.to_string(),
819                        name: name.clone(),
820                    }),
821                )?;
822
823                info!("update role {name} ({id})");
824            }
825            Op::AlterNetworkPolicy {
826                id,
827                rules,
828                name,
829                owner_id: _owner_id,
830            } => {
831                let existing_policy = state.get_network_policy(&id).clone();
832                let mut policy: NetworkPolicy = existing_policy.into();
833                policy.rules = rules;
834                if is_reserved_name(&name) {
835                    return Err(AdapterError::Catalog(Error::new(
836                        ErrorKind::ReservedNetworkPolicyName(name),
837                    )));
838                }
839                tx.update_network_policy(id, policy.clone())?;
840
841                CatalogState::add_to_audit_log(
842                    &state.system_configuration,
843                    oracle_write_ts,
844                    session,
845                    tx,
846                    audit_events,
847                    EventType::Alter,
848                    ObjectType::NetworkPolicy,
849                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
850                        id: id.to_string(),
851                        name: name.clone(),
852                    }),
853                )?;
854
855                info!("update network policy {name} ({id})");
856            }
857            Op::AlterAddColumn {
858                id,
859                new_global_id,
860                name,
861                typ,
862                sql,
863            } => {
864                let mut new_entry = state.get_entry(&id).clone();
865                let version = new_entry.item.add_column(name, typ, sql)?;
866                // All versions of a table share the same shard, so it shouldn't matter what
867                // GlobalId we use here.
868                let shard_id = state
869                    .storage_metadata()
870                    .get_collection_shard(new_entry.latest_global_id())?;
871
872                // TODO(alter_table): Support adding columns to sources.
873                let CatalogItem::Table(table) = &mut new_entry.item else {
874                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
875                };
876                table.collections.insert(version, new_global_id);
877
878                tx.update_item(id, new_entry.into())?;
879                storage_collections_to_register.insert(new_global_id, shard_id);
880            }
881            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
882                let mut new_entry = state.get_entry(&id).clone();
883                let replacement = state.get_entry(&replacement_id);
884
885                let new_audit_events =
886                    apply_replacement_audit_events(state, &new_entry, replacement);
887
888                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
889                    return Err(AdapterError::internal(
890                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
891                        "id must refer to a materialized view",
892                    ));
893                };
894                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
895                    return Err(AdapterError::internal(
896                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
897                        "replacement_id must refer to a materialized view",
898                    ));
899                };
900
901                mv.apply_replacement(replacement_mv.clone());
902
903                tx.remove_item(replacement_id)?;
904
905                new_entry.id = replacement_id;
906                tx_replace_item(tx, state, id, new_entry)?;
907
908                let comment_id = CommentObjectId::MaterializedView(replacement_id);
909                tx.drop_comments(&[comment_id].into())?;
910
911                for (event_type, details) in new_audit_events {
912                    CatalogState::add_to_audit_log(
913                        &state.system_configuration,
914                        oracle_write_ts,
915                        session,
916                        tx,
917                        audit_events,
918                        event_type,
919                        ObjectType::MaterializedView,
920                        details,
921                    )?;
922                }
923            }
924            Op::CreateDatabase { name, owner_id } => {
925                let database_owner_privileges = vec![rbac::owner_privilege(
926                    mz_sql::catalog::ObjectType::Database,
927                    owner_id,
928                )];
929                let database_default_privileges = state
930                    .default_privileges
931                    .get_applicable_privileges(
932                        owner_id,
933                        None,
934                        None,
935                        mz_sql::catalog::ObjectType::Database,
936                    )
937                    .map(|item| item.mz_acl_item(owner_id));
938                let database_privileges: Vec<_> = merge_mz_acl_items(
939                    database_owner_privileges
940                        .into_iter()
941                        .chain(database_default_privileges),
942                )
943                .collect();
944
945                let schema_owner_privileges = vec![rbac::owner_privilege(
946                    mz_sql::catalog::ObjectType::Schema,
947                    owner_id,
948                )];
949                let schema_default_privileges = state
950                    .default_privileges
951                    .get_applicable_privileges(
952                        owner_id,
953                        None,
954                        None,
955                        mz_sql::catalog::ObjectType::Schema,
956                    )
957                    .map(|item| item.mz_acl_item(owner_id))
958                    // Special default privilege on public schemas.
959                    .chain(std::iter::once(MzAclItem {
960                        grantee: RoleId::Public,
961                        grantor: owner_id,
962                        acl_mode: AclMode::USAGE,
963                    }));
964                let schema_privileges: Vec<_> = merge_mz_acl_items(
965                    schema_owner_privileges
966                        .into_iter()
967                        .chain(schema_default_privileges),
968                )
969                .collect();
970
971                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
972                let (database_id, _) = tx.insert_user_database(
973                    &name,
974                    owner_id,
975                    database_privileges.clone(),
976                    &temporary_oids,
977                )?;
978                let (schema_id, _) = tx.insert_user_schema(
979                    database_id,
980                    DEFAULT_SCHEMA,
981                    owner_id,
982                    schema_privileges.clone(),
983                    &temporary_oids,
984                )?;
985                CatalogState::add_to_audit_log(
986                    &state.system_configuration,
987                    oracle_write_ts,
988                    session,
989                    tx,
990                    audit_events,
991                    EventType::Create,
992                    ObjectType::Database,
993                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
994                        id: database_id.to_string(),
995                        name: name.clone(),
996                    }),
997                )?;
998                info!("create database {}", name);
999
1000                CatalogState::add_to_audit_log(
1001                    &state.system_configuration,
1002                    oracle_write_ts,
1003                    session,
1004                    tx,
1005                    audit_events,
1006                    EventType::Create,
1007                    ObjectType::Schema,
1008                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1009                        id: schema_id.to_string(),
1010                        name: DEFAULT_SCHEMA.to_string(),
1011                        database_name: Some(name),
1012                    }),
1013                )?;
1014            }
1015            Op::CreateSchema {
1016                database_id,
1017                schema_name,
1018                owner_id,
1019            } => {
1020                if is_reserved_name(&schema_name) {
1021                    return Err(AdapterError::Catalog(Error::new(
1022                        ErrorKind::ReservedSchemaName(schema_name),
1023                    )));
1024                }
1025                let database_id = match database_id {
1026                    ResolvedDatabaseSpecifier::Id(id) => id,
1027                    ResolvedDatabaseSpecifier::Ambient => {
1028                        return Err(AdapterError::Catalog(Error::new(
1029                            ErrorKind::ReadOnlySystemSchema(schema_name),
1030                        )));
1031                    }
1032                };
1033                let owner_privileges = vec![rbac::owner_privilege(
1034                    mz_sql::catalog::ObjectType::Schema,
1035                    owner_id,
1036                )];
1037                let default_privileges = state
1038                    .default_privileges
1039                    .get_applicable_privileges(
1040                        owner_id,
1041                        Some(database_id),
1042                        None,
1043                        mz_sql::catalog::ObjectType::Schema,
1044                    )
1045                    .map(|item| item.mz_acl_item(owner_id));
1046                let privileges: Vec<_> =
1047                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1048                        .collect();
1049                let (schema_id, _) = tx.insert_user_schema(
1050                    database_id,
1051                    &schema_name,
1052                    owner_id,
1053                    privileges.clone(),
1054                    &state.get_temporary_oids().collect(),
1055                )?;
1056                CatalogState::add_to_audit_log(
1057                    &state.system_configuration,
1058                    oracle_write_ts,
1059                    session,
1060                    tx,
1061                    audit_events,
1062                    EventType::Create,
1063                    ObjectType::Schema,
1064                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1065                        id: schema_id.to_string(),
1066                        name: schema_name.clone(),
1067                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1068                    }),
1069                )?;
1070            }
1071            Op::CreateRole { name, attributes } => {
1072                if is_reserved_role_name(&name) {
1073                    return Err(AdapterError::Catalog(Error::new(
1074                        ErrorKind::ReservedRoleName(name),
1075                    )));
1076                }
1077                let membership = RoleMembership::new();
1078                let vars = RoleVars::default();
1079                let (id, _) = tx.insert_user_role(
1080                    name.clone(),
1081                    attributes.clone(),
1082                    membership.clone(),
1083                    vars.clone(),
1084                    &state.get_temporary_oids().collect(),
1085                )?;
1086                CatalogState::add_to_audit_log(
1087                    &state.system_configuration,
1088                    oracle_write_ts,
1089                    session,
1090                    tx,
1091                    audit_events,
1092                    EventType::Create,
1093                    ObjectType::Role,
1094                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1095                        id: id.to_string(),
1096                        name: name.clone(),
1097                    }),
1098                )?;
1099                info!("create role {}", name);
1100            }
1101            Op::CreateCluster {
1102                id,
1103                name,
1104                introspection_sources,
1105                owner_id,
1106                config,
1107            } => {
1108                if is_reserved_name(&name) {
1109                    return Err(AdapterError::Catalog(Error::new(
1110                        ErrorKind::ReservedClusterName(name),
1111                    )));
1112                }
1113                let owner_privileges = vec![rbac::owner_privilege(
1114                    mz_sql::catalog::ObjectType::Cluster,
1115                    owner_id,
1116                )];
1117                let default_privileges = state
1118                    .default_privileges
1119                    .get_applicable_privileges(
1120                        owner_id,
1121                        None,
1122                        None,
1123                        mz_sql::catalog::ObjectType::Cluster,
1124                    )
1125                    .map(|item| item.mz_acl_item(owner_id));
1126                let privileges: Vec<_> =
1127                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1128                        .collect();
1129                let introspection_source_ids: Vec<_> = introspection_sources
1130                    .iter()
1131                    .map(|introspection_source| {
1132                        Transaction::allocate_introspection_source_index_id(
1133                            &id,
1134                            introspection_source.variant,
1135                        )
1136                    })
1137                    .collect();
1138
1139                let introspection_sources = introspection_sources
1140                    .into_iter()
1141                    .zip_eq(introspection_source_ids)
1142                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1143                    .collect();
1144
1145                tx.insert_user_cluster(
1146                    id,
1147                    &name,
1148                    introspection_sources,
1149                    owner_id,
1150                    privileges.clone(),
1151                    config.clone().into(),
1152                    &state.get_temporary_oids().collect(),
1153                )?;
1154                CatalogState::add_to_audit_log(
1155                    &state.system_configuration,
1156                    oracle_write_ts,
1157                    session,
1158                    tx,
1159                    audit_events,
1160                    EventType::Create,
1161                    ObjectType::Cluster,
1162                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1163                        id: id.to_string(),
1164                        name: name.clone(),
1165                    }),
1166                )?;
1167                info!("create cluster {}", name);
1168            }
1169            Op::CreateClusterReplica {
1170                cluster_id,
1171                name,
1172                config,
1173                owner_id,
1174                reason,
1175            } => {
1176                if is_reserved_name(&name) {
1177                    return Err(AdapterError::Catalog(Error::new(
1178                        ErrorKind::ReservedReplicaName(name),
1179                    )));
1180                }
1181                let cluster = state.get_cluster(cluster_id);
1182                let id =
1183                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1184                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1185                    size,
1186                    billed_as,
1187                    internal,
1188                    ..
1189                }) = &config.location
1190                {
1191                    let (reason, scheduling_policies) = reason.into_audit_log();
1192                    let details = EventDetails::CreateClusterReplicaV4(
1193                        mz_audit_log::CreateClusterReplicaV4 {
1194                            cluster_id: cluster_id.to_string(),
1195                            cluster_name: cluster.name.clone(),
1196                            replica_id: Some(id.to_string()),
1197                            replica_name: name.clone(),
1198                            logical_size: size.clone(),
1199                            billed_as: billed_as.clone(),
1200                            internal: *internal,
1201                            reason,
1202                            scheduling_policies,
1203                        },
1204                    );
1205                    CatalogState::add_to_audit_log(
1206                        &state.system_configuration,
1207                        oracle_write_ts,
1208                        session,
1209                        tx,
1210                        audit_events,
1211                        EventType::Create,
1212                        ObjectType::ClusterReplica,
1213                        details,
1214                    )?;
1215                }
1216            }
1217            Op::CreateItem {
1218                id,
1219                name,
1220                item,
1221                owner_id,
1222            } => {
1223                state.check_unstable_dependencies(&item)?;
1224
1225                match &item {
1226                    CatalogItem::Table(table) => {
1227                        let gids: Vec<_> = table.global_ids().collect();
1228                        assert_eq!(gids.len(), 1);
1229                        storage_collections_to_create.extend(gids);
1230                    }
1231                    CatalogItem::Source(source) => {
1232                        storage_collections_to_create.insert(source.global_id());
1233                    }
1234                    CatalogItem::MaterializedView(mv) => {
1235                        let mv_gid = mv.global_id_writes();
1236                        if let Some(target_id) = mv.replacement_target {
1237                            let target_gid = state.get_entry(&target_id).latest_global_id();
1238                            let shard_id =
1239                                state.storage_metadata().get_collection_shard(target_gid)?;
1240                            storage_collections_to_register.insert(mv_gid, shard_id);
1241                        } else {
1242                            storage_collections_to_create.insert(mv_gid);
1243                        }
1244                    }
1245                    CatalogItem::ContinualTask(ct) => {
1246                        storage_collections_to_create.insert(ct.global_id());
1247                    }
1248                    CatalogItem::Sink(sink) => {
1249                        storage_collections_to_create.insert(sink.global_id());
1250                    }
1251                    CatalogItem::Log(_)
1252                    | CatalogItem::View(_)
1253                    | CatalogItem::Index(_)
1254                    | CatalogItem::Type(_)
1255                    | CatalogItem::Func(_)
1256                    | CatalogItem::Secret(_)
1257                    | CatalogItem::Connection(_) => (),
1258                }
1259
1260                let system_user = session.map_or(false, |s| s.user().is_system_user());
1261                if !system_user {
1262                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1263                        let cluster_name = state.clusters_by_id[&id].name.clone();
1264                        return Err(AdapterError::Catalog(Error::new(
1265                            ErrorKind::ReadOnlyCluster(cluster_name),
1266                        )));
1267                    }
1268                }
1269
1270                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1271                let default_privileges = state
1272                    .default_privileges
1273                    .get_applicable_privileges(
1274                        owner_id,
1275                        name.qualifiers.database_spec.id(),
1276                        Some(name.qualifiers.schema_spec.into()),
1277                        item.typ().into(),
1278                    )
1279                    .map(|item| item.mz_acl_item(owner_id));
1280                // mz_support can read all progress sources.
1281                let progress_source_privilege = if item.is_progress_source() {
1282                    Some(MzAclItem {
1283                        grantee: MZ_SUPPORT_ROLE_ID,
1284                        grantor: owner_id,
1285                        acl_mode: AclMode::SELECT,
1286                    })
1287                } else {
1288                    None
1289                };
1290                let privileges: Vec<_> = merge_mz_acl_items(
1291                    owner_privileges
1292                        .into_iter()
1293                        .chain(default_privileges)
1294                        .chain(progress_source_privilege),
1295                )
1296                .collect();
1297
1298                let temporary_oids = state.get_temporary_oids().collect();
1299
1300                if item.is_temporary() {
1301                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1302                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1303                    {
1304                        return Err(AdapterError::Catalog(Error::new(
1305                            ErrorKind::InvalidTemporarySchema,
1306                        )));
1307                    }
1308                    let oid = tx.allocate_oid(&temporary_oids)?;
1309
1310                    let schema_id = name.qualifiers.schema_spec.clone().into();
1311                    let item_type = item.typ();
1312                    let (create_sql, global_id, versions) = item.to_serialized();
1313
1314                    let item = TemporaryItem {
1315                        id,
1316                        oid,
1317                        global_id,
1318                        schema_id,
1319                        name: name.item.clone(),
1320                        create_sql,
1321                        conn_id: item.conn_id().cloned(),
1322                        owner_id,
1323                        privileges: privileges.clone(),
1324                        extra_versions: versions,
1325                    };
1326                    temporary_item_updates.push((item, StateDiff::Addition));
1327
1328                    info!(
1329                        "create temporary {} {} ({})",
1330                        item_type,
1331                        state.resolve_full_name(&name, None),
1332                        id
1333                    );
1334                } else {
1335                    if let Some(temp_id) =
1336                        item.uses()
1337                            .iter()
1338                            .find(|id| match state.try_get_entry(*id) {
1339                                Some(entry) => entry.item().is_temporary(),
1340                                None => temporary_ids.contains(id),
1341                            })
1342                    {
1343                        let temp_item = state.get_entry(temp_id);
1344                        return Err(AdapterError::Catalog(Error::new(
1345                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1346                        )));
1347                    }
1348                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1349                        && !system_user
1350                    {
1351                        let schema_name = state
1352                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1353                            .schema;
1354                        return Err(AdapterError::Catalog(Error::new(
1355                            ErrorKind::ReadOnlySystemSchema(schema_name),
1356                        )));
1357                    }
1358                    let schema_id = name.qualifiers.schema_spec.clone().into();
1359                    let item_type = item.typ();
1360                    let (create_sql, global_id, versions) = item.to_serialized();
1361                    tx.insert_user_item(
1362                        id,
1363                        global_id,
1364                        schema_id,
1365                        &name.item,
1366                        create_sql,
1367                        owner_id,
1368                        privileges.clone(),
1369                        &temporary_oids,
1370                        versions,
1371                    )?;
1372                    info!(
1373                        "create {} {} ({})",
1374                        item_type,
1375                        state.resolve_full_name(&name, None),
1376                        id
1377                    );
1378                }
1379
1380                if Self::should_audit_log_item(&item) {
1381                    let name = Self::full_name_detail(
1382                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1383                    );
1384                    let details = match &item {
1385                        CatalogItem::Source(s) => {
1386                            let cluster_id = match s.data_source {
1387                                // Ingestion exports don't have their own cluster, but
1388                                // run on their ingestion's cluster.
1389                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1390                                    match state.get_entry(&ingestion_id).cluster_id() {
1391                                        Some(cluster_id) => Some(cluster_id.to_string()),
1392                                        None => None,
1393                                    }
1394                                }
1395                                _ => match item.cluster_id() {
1396                                    Some(cluster_id) => Some(cluster_id.to_string()),
1397                                    None => None,
1398                                },
1399                            };
1400
1401                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1402                                id: id.to_string(),
1403                                cluster_id,
1404                                name,
1405                                external_type: s.source_type().to_string(),
1406                            })
1407                        }
1408                        CatalogItem::Sink(s) => {
1409                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1410                                id: id.to_string(),
1411                                cluster_id: Some(s.cluster_id.to_string()),
1412                                name,
1413                                external_type: s.sink_type().to_string(),
1414                            })
1415                        }
1416                        CatalogItem::Index(i) => {
1417                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1418                                id: id.to_string(),
1419                                name,
1420                                cluster_id: i.cluster_id.to_string(),
1421                            })
1422                        }
1423                        CatalogItem::MaterializedView(mv) => {
1424                            EventDetails::CreateMaterializedViewV1(
1425                                mz_audit_log::CreateMaterializedViewV1 {
1426                                    id: id.to_string(),
1427                                    name,
1428                                    cluster_id: mv.cluster_id.to_string(),
1429                                    replacement_target_id: mv
1430                                        .replacement_target
1431                                        .map(|id| id.to_string()),
1432                                },
1433                            )
1434                        }
1435                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1436                            id: id.to_string(),
1437                            name,
1438                        }),
1439                    };
1440                    CatalogState::add_to_audit_log(
1441                        &state.system_configuration,
1442                        oracle_write_ts,
1443                        session,
1444                        tx,
1445                        audit_events,
1446                        EventType::Create,
1447                        catalog_type_to_audit_object_type(item.typ()),
1448                        details,
1449                    )?;
1450                }
1451            }
1452            Op::CreateNetworkPolicy {
1453                rules,
1454                name,
1455                owner_id,
1456            } => {
1457                if state.network_policies_by_name.contains_key(&name) {
1458                    return Err(AdapterError::PlanError(PlanError::Catalog(
1459                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1460                    )));
1461                }
1462                if is_reserved_name(&name) {
1463                    return Err(AdapterError::Catalog(Error::new(
1464                        ErrorKind::ReservedNetworkPolicyName(name),
1465                    )));
1466                }
1467
1468                let owner_privileges = vec![rbac::owner_privilege(
1469                    mz_sql::catalog::ObjectType::NetworkPolicy,
1470                    owner_id,
1471                )];
1472                let default_privileges = state
1473                    .default_privileges
1474                    .get_applicable_privileges(
1475                        owner_id,
1476                        None,
1477                        None,
1478                        mz_sql::catalog::ObjectType::NetworkPolicy,
1479                    )
1480                    .map(|item| item.mz_acl_item(owner_id));
1481                let privileges: Vec<_> =
1482                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1483                        .collect();
1484
1485                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1486                let id = tx.insert_user_network_policy(
1487                    name.clone(),
1488                    rules,
1489                    privileges,
1490                    owner_id,
1491                    &temporary_oids,
1492                )?;
1493
1494                CatalogState::add_to_audit_log(
1495                    &state.system_configuration,
1496                    oracle_write_ts,
1497                    session,
1498                    tx,
1499                    audit_events,
1500                    EventType::Create,
1501                    ObjectType::NetworkPolicy,
1502                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1503                        id: id.to_string(),
1504                        name: name.clone(),
1505                    }),
1506                )?;
1507
1508                info!("created network policy {name} ({id})");
1509            }
1510            Op::Comment {
1511                object_id,
1512                sub_component,
1513                comment,
1514            } => {
1515                tx.update_comment(object_id, sub_component, comment)?;
1516                let entry = state.get_comment_id_entry(&object_id);
1517                let should_log = entry
1518                    .map(|entry| Self::should_audit_log_item(entry.item()))
1519                    // Things that aren't catalog entries can't be temp, so should be logged.
1520                    .unwrap_or(true);
1521                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1522                // comments won't be logged for now.
1523                if let (Some(conn_id), true) =
1524                    (session.map(|session| session.conn_id()), should_log)
1525                {
1526                    CatalogState::add_to_audit_log(
1527                        &state.system_configuration,
1528                        oracle_write_ts,
1529                        session,
1530                        tx,
1531                        audit_events,
1532                        EventType::Comment,
1533                        comment_id_to_audit_object_type(object_id),
1534                        EventDetails::IdNameV1(IdNameV1 {
1535                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1536                            id: format!("{object_id:?}"),
1537                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1538                        }),
1539                    )?;
1540                }
1541            }
1542            Op::UpdateSourceReferences {
1543                source_id,
1544                references,
1545            } => {
1546                tx.update_source_references(
1547                    source_id,
1548                    references
1549                        .references
1550                        .into_iter()
1551                        .map(|reference| reference.into())
1552                        .collect(),
1553                    references.updated_at,
1554                )?;
1555            }
1556            Op::DropObjects(drop_object_infos) => {
1557                // Generate all of the objects that need to get dropped.
1558                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1559
1560                // Drop any associated comments.
1561                tx.drop_comments(&delta.comments)?;
1562
1563                // Drop any items.
1564                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1565                    delta
1566                        .items
1567                        .iter()
1568                        .map(|id| id)
1569                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1570                tx.remove_items(&durable_items_to_drop)?;
1571                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1572                    let entry = state.get_entry(&id);
1573                    (entry.clone().into(), StateDiff::Retraction)
1574                }));
1575
1576                for item_id in delta.items {
1577                    let entry = state.get_entry(&item_id);
1578
1579                    if entry.item().is_storage_collection() {
1580                        storage_collections_to_drop.extend(entry.global_ids());
1581                    }
1582
1583                    if state.source_references.contains_key(&item_id) {
1584                        tx.remove_source_references(item_id)?;
1585                    }
1586
1587                    if Self::should_audit_log_item(entry.item()) {
1588                        CatalogState::add_to_audit_log(
1589                            &state.system_configuration,
1590                            oracle_write_ts,
1591                            session,
1592                            tx,
1593                            audit_events,
1594                            EventType::Drop,
1595                            catalog_type_to_audit_object_type(entry.item().typ()),
1596                            EventDetails::IdFullNameV1(IdFullNameV1 {
1597                                id: item_id.to_string(),
1598                                name: Self::full_name_detail(&state.resolve_full_name(
1599                                    entry.name(),
1600                                    session.map(|session| session.conn_id()),
1601                                )),
1602                            }),
1603                        )?;
1604                    }
1605                    info!(
1606                        "drop {} {} ({})",
1607                        entry.item_type(),
1608                        state.resolve_full_name(entry.name(), entry.conn_id()),
1609                        item_id
1610                    );
1611                }
1612
1613                // Drop any schemas.
1614                let schemas = delta
1615                    .schemas
1616                    .iter()
1617                    .map(|(schema_spec, database_spec)| {
1618                        (SchemaId::from(schema_spec), *database_spec)
1619                    })
1620                    .collect();
1621                tx.remove_schemas(&schemas)?;
1622
1623                for (schema_spec, database_spec) in delta.schemas {
1624                    let schema = state.get_schema(
1625                        &database_spec,
1626                        &schema_spec,
1627                        session
1628                            .map(|session| session.conn_id())
1629                            .unwrap_or(&SYSTEM_CONN_ID),
1630                    );
1631
1632                    let schema_id = SchemaId::from(schema_spec);
1633                    let database_id = match database_spec {
1634                        ResolvedDatabaseSpecifier::Ambient => None,
1635                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1636                    };
1637
1638                    CatalogState::add_to_audit_log(
1639                        &state.system_configuration,
1640                        oracle_write_ts,
1641                        session,
1642                        tx,
1643                        audit_events,
1644                        EventType::Drop,
1645                        ObjectType::Schema,
1646                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1647                            id: schema_id.to_string(),
1648                            name: schema.name.schema.to_string(),
1649                            database_name: database_id
1650                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1651                        }),
1652                    )?;
1653                }
1654
1655                // Drop any databases.
1656                tx.remove_databases(&delta.databases)?;
1657
1658                for database_id in delta.databases {
1659                    let database = state.get_database(&database_id).clone();
1660
1661                    CatalogState::add_to_audit_log(
1662                        &state.system_configuration,
1663                        oracle_write_ts,
1664                        session,
1665                        tx,
1666                        audit_events,
1667                        EventType::Drop,
1668                        ObjectType::Database,
1669                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1670                            id: database_id.to_string(),
1671                            name: database.name.clone(),
1672                        }),
1673                    )?;
1674                }
1675
1676                // Drop any roles.
1677                tx.remove_user_roles(&delta.roles)?;
1678
1679                for role_id in delta.roles {
1680                    let role = state
1681                        .roles_by_id
1682                        .get(&role_id)
1683                        .expect("catalog out of sync");
1684
1685                    CatalogState::add_to_audit_log(
1686                        &state.system_configuration,
1687                        oracle_write_ts,
1688                        session,
1689                        tx,
1690                        audit_events,
1691                        EventType::Drop,
1692                        ObjectType::Role,
1693                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1694                            id: role.id.to_string(),
1695                            name: role.name.clone(),
1696                        }),
1697                    )?;
1698                    info!("drop role {}", role.name());
1699                }
1700
1701                // Drop any network policies.
1702                tx.remove_network_policies(&delta.network_policies)?;
1703
1704                for network_policy_id in delta.network_policies {
1705                    let policy = state
1706                        .network_policies_by_id
1707                        .get(&network_policy_id)
1708                        .expect("catalog out of sync");
1709
1710                    CatalogState::add_to_audit_log(
1711                        &state.system_configuration,
1712                        oracle_write_ts,
1713                        session,
1714                        tx,
1715                        audit_events,
1716                        EventType::Drop,
1717                        ObjectType::NetworkPolicy,
1718                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1719                            id: policy.id.to_string(),
1720                            name: policy.name.clone(),
1721                        }),
1722                    )?;
1723                    info!("drop network policy {}", policy.name.clone());
1724                }
1725
1726                // Drop any replicas.
1727                let replicas = delta.replicas.keys().copied().collect();
1728                tx.remove_cluster_replicas(&replicas)?;
1729
1730                for (replica_id, (cluster_id, reason)) in delta.replicas {
1731                    let cluster = state.get_cluster(cluster_id);
1732                    let replica = cluster.replica(replica_id).expect("Must exist");
1733
1734                    let (reason, scheduling_policies) = reason.into_audit_log();
1735                    let details =
1736                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1737                            cluster_id: cluster_id.to_string(),
1738                            cluster_name: cluster.name.clone(),
1739                            replica_id: Some(replica_id.to_string()),
1740                            replica_name: replica.name.clone(),
1741                            reason,
1742                            scheduling_policies,
1743                        });
1744                    CatalogState::add_to_audit_log(
1745                        &state.system_configuration,
1746                        oracle_write_ts,
1747                        session,
1748                        tx,
1749                        audit_events,
1750                        EventType::Drop,
1751                        ObjectType::ClusterReplica,
1752                        details,
1753                    )?;
1754                }
1755
1756                // Drop any clusters.
1757                tx.remove_clusters(&delta.clusters)?;
1758
1759                for cluster_id in delta.clusters {
1760                    let cluster = state.get_cluster(cluster_id);
1761
1762                    CatalogState::add_to_audit_log(
1763                        &state.system_configuration,
1764                        oracle_write_ts,
1765                        session,
1766                        tx,
1767                        audit_events,
1768                        EventType::Drop,
1769                        ObjectType::Cluster,
1770                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1771                            id: cluster.id.to_string(),
1772                            name: cluster.name.clone(),
1773                        }),
1774                    )?;
1775                }
1776            }
1777            Op::GrantRole {
1778                role_id,
1779                member_id,
1780                grantor_id,
1781            } => {
1782                state.ensure_not_reserved_role(&member_id)?;
1783                state.ensure_grantable_role(&role_id)?;
1784                if state.collect_role_membership(&role_id).contains(&member_id) {
1785                    let group_role = state.get_role(&role_id);
1786                    let member_role = state.get_role(&member_id);
1787                    return Err(AdapterError::Catalog(Error::new(
1788                        ErrorKind::CircularRoleMembership {
1789                            role_name: group_role.name().to_string(),
1790                            member_name: member_role.name().to_string(),
1791                        },
1792                    )));
1793                }
1794                let mut member_role = state.get_role(&member_id).clone();
1795                member_role.membership.map.insert(role_id, grantor_id);
1796                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1797
1798                CatalogState::add_to_audit_log(
1799                    &state.system_configuration,
1800                    oracle_write_ts,
1801                    session,
1802                    tx,
1803                    audit_events,
1804                    EventType::Grant,
1805                    ObjectType::Role,
1806                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1807                        role_id: role_id.to_string(),
1808                        member_id: member_id.to_string(),
1809                        grantor_id: grantor_id.to_string(),
1810                        executed_by: session
1811                            .map(|session| session.authenticated_role_id())
1812                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1813                            .to_string(),
1814                    }),
1815                )?;
1816            }
1817            Op::RevokeRole {
1818                role_id,
1819                member_id,
1820                grantor_id,
1821            } => {
1822                state.ensure_not_reserved_role(&member_id)?;
1823                state.ensure_grantable_role(&role_id)?;
1824                let mut member_role = state.get_role(&member_id).clone();
1825                member_role.membership.map.remove(&role_id);
1826                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1827
1828                CatalogState::add_to_audit_log(
1829                    &state.system_configuration,
1830                    oracle_write_ts,
1831                    session,
1832                    tx,
1833                    audit_events,
1834                    EventType::Revoke,
1835                    ObjectType::Role,
1836                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1837                        role_id: role_id.to_string(),
1838                        member_id: member_id.to_string(),
1839                        grantor_id: grantor_id.to_string(),
1840                        executed_by: session
1841                            .map(|session| session.authenticated_role_id())
1842                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1843                            .to_string(),
1844                    }),
1845                )?;
1846            }
1847            Op::UpdatePrivilege {
1848                target_id,
1849                privilege,
1850                variant,
1851            } => {
1852                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1853                    UpdatePrivilegeVariant::Grant => {
1854                        privileges.grant(privilege);
1855                    }
1856                    UpdatePrivilegeVariant::Revoke => {
1857                        privileges.revoke(&privilege);
1858                    }
1859                };
1860                match &target_id {
1861                    SystemObjectId::Object(object_id) => match object_id {
1862                        ObjectId::Cluster(id) => {
1863                            let mut cluster = state.get_cluster(*id).clone();
1864                            update_privilege_fn(&mut cluster.privileges);
1865                            tx.update_cluster(*id, cluster.into())?;
1866                        }
1867                        ObjectId::Database(id) => {
1868                            let mut database = state.get_database(id).clone();
1869                            update_privilege_fn(&mut database.privileges);
1870                            tx.update_database(*id, database.into())?;
1871                        }
1872                        ObjectId::NetworkPolicy(id) => {
1873                            let mut policy = state.get_network_policy(id).clone();
1874                            update_privilege_fn(&mut policy.privileges);
1875                            tx.update_network_policy(*id, policy.into())?;
1876                        }
1877                        ObjectId::Schema((database_spec, schema_spec)) => {
1878                            let schema_id = schema_spec.clone().into();
1879                            let mut schema = state
1880                                .get_schema(
1881                                    database_spec,
1882                                    schema_spec,
1883                                    session
1884                                        .map(|session| session.conn_id())
1885                                        .unwrap_or(&SYSTEM_CONN_ID),
1886                                )
1887                                .clone();
1888                            update_privilege_fn(&mut schema.privileges);
1889                            tx.update_schema(schema_id, schema.into())?;
1890                        }
1891                        ObjectId::Item(id) => {
1892                            let entry = state.get_entry(id);
1893                            let mut new_entry = entry.clone();
1894                            update_privilege_fn(&mut new_entry.privileges);
1895                            if !new_entry.item().is_temporary() {
1896                                tx.update_item(*id, new_entry.into())?;
1897                            } else {
1898                                temporary_item_updates
1899                                    .push((entry.clone().into(), StateDiff::Retraction));
1900                                temporary_item_updates
1901                                    .push((new_entry.into(), StateDiff::Addition));
1902                            }
1903                        }
1904                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1905                    },
1906                    SystemObjectId::System => {
1907                        let mut system_privileges = state.system_privileges.clone();
1908                        update_privilege_fn(&mut system_privileges);
1909                        let new_privilege =
1910                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1911                        tx.set_system_privilege(
1912                            privilege.grantee,
1913                            privilege.grantor,
1914                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1915                        )?;
1916                    }
1917                }
1918                let object_type = state.get_system_object_type(&target_id);
1919                let object_id_str = match &target_id {
1920                    SystemObjectId::System => "SYSTEM".to_string(),
1921                    SystemObjectId::Object(id) => id.to_string(),
1922                };
1923                CatalogState::add_to_audit_log(
1924                    &state.system_configuration,
1925                    oracle_write_ts,
1926                    session,
1927                    tx,
1928                    audit_events,
1929                    variant.into(),
1930                    system_object_type_to_audit_object_type(&object_type),
1931                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1932                        object_id: object_id_str,
1933                        grantee_id: privilege.grantee.to_string(),
1934                        grantor_id: privilege.grantor.to_string(),
1935                        privileges: privilege.acl_mode.to_string(),
1936                    }),
1937                )?;
1938            }
1939            Op::UpdateDefaultPrivilege {
1940                privilege_object,
1941                privilege_acl_item,
1942                variant,
1943            } => {
1944                let mut default_privileges = state.default_privileges.clone();
1945                match variant {
1946                    UpdatePrivilegeVariant::Grant => default_privileges
1947                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1948                    UpdatePrivilegeVariant::Revoke => {
1949                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1950                    }
1951                }
1952                let new_acl_mode = default_privileges
1953                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1954                tx.set_default_privilege(
1955                    privilege_object.role_id,
1956                    privilege_object.database_id,
1957                    privilege_object.schema_id,
1958                    privilege_object.object_type,
1959                    privilege_acl_item.grantee,
1960                    new_acl_mode.cloned(),
1961                )?;
1962                CatalogState::add_to_audit_log(
1963                    &state.system_configuration,
1964                    oracle_write_ts,
1965                    session,
1966                    tx,
1967                    audit_events,
1968                    variant.into(),
1969                    object_type_to_audit_object_type(privilege_object.object_type),
1970                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1971                        role_id: privilege_object.role_id.to_string(),
1972                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1973                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1974                        grantee_id: privilege_acl_item.grantee.to_string(),
1975                        privileges: privilege_acl_item.acl_mode.to_string(),
1976                    }),
1977                )?;
1978            }
1979            Op::RenameCluster {
1980                id,
1981                name,
1982                to_name,
1983                check_reserved_names,
1984            } => {
1985                if id.is_system() {
1986                    return Err(AdapterError::Catalog(Error::new(
1987                        ErrorKind::ReadOnlyCluster(name.clone()),
1988                    )));
1989                }
1990                if check_reserved_names && is_reserved_name(&to_name) {
1991                    return Err(AdapterError::Catalog(Error::new(
1992                        ErrorKind::ReservedClusterName(to_name),
1993                    )));
1994                }
1995                tx.rename_cluster(id, &name, &to_name)?;
1996                CatalogState::add_to_audit_log(
1997                    &state.system_configuration,
1998                    oracle_write_ts,
1999                    session,
2000                    tx,
2001                    audit_events,
2002                    EventType::Alter,
2003                    ObjectType::Cluster,
2004                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2005                        id: id.to_string(),
2006                        old_name: name.clone(),
2007                        new_name: to_name.clone(),
2008                    }),
2009                )?;
2010                info!("rename cluster {name} to {to_name}");
2011            }
2012            Op::RenameClusterReplica {
2013                cluster_id,
2014                replica_id,
2015                name,
2016                to_name,
2017            } => {
2018                if is_reserved_name(&to_name) {
2019                    return Err(AdapterError::Catalog(Error::new(
2020                        ErrorKind::ReservedReplicaName(to_name),
2021                    )));
2022                }
2023                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2024                CatalogState::add_to_audit_log(
2025                    &state.system_configuration,
2026                    oracle_write_ts,
2027                    session,
2028                    tx,
2029                    audit_events,
2030                    EventType::Alter,
2031                    ObjectType::ClusterReplica,
2032                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2033                        cluster_id: cluster_id.to_string(),
2034                        replica_id: replica_id.to_string(),
2035                        old_name: name.replica.as_str().to_string(),
2036                        new_name: to_name.clone(),
2037                    }),
2038                )?;
2039                info!("rename cluster replica {name} to {to_name}");
2040            }
2041            Op::RenameItem {
2042                id,
2043                to_name,
2044                current_full_name,
2045            } => {
2046                let mut updates = Vec::new();
2047
2048                let entry = state.get_entry(&id);
2049                if let CatalogItem::Type(_) = entry.item() {
2050                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2051                        current_full_name.to_string(),
2052                    ))));
2053                }
2054
2055                if entry.id().is_system() {
2056                    let name = state
2057                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2058                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2059                        name.to_string(),
2060                    ))));
2061                }
2062
2063                let mut to_full_name = current_full_name.clone();
2064                to_full_name.item.clone_from(&to_name);
2065
2066                let mut to_qualified_name = entry.name().clone();
2067                to_qualified_name.item.clone_from(&to_name);
2068
2069                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2070                    id: id.to_string(),
2071                    old_name: Self::full_name_detail(&current_full_name),
2072                    new_name: Self::full_name_detail(&to_full_name),
2073                });
2074                if Self::should_audit_log_item(entry.item()) {
2075                    CatalogState::add_to_audit_log(
2076                        &state.system_configuration,
2077                        oracle_write_ts,
2078                        session,
2079                        tx,
2080                        audit_events,
2081                        EventType::Alter,
2082                        catalog_type_to_audit_object_type(entry.item().typ()),
2083                        details,
2084                    )?;
2085                }
2086
2087                // Rename item itself.
2088                let mut new_entry = entry.clone();
2089                new_entry.name.item.clone_from(&to_name);
2090                new_entry.item = entry
2091                    .item()
2092                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2093                    .map_err(|e| {
2094                        Error::new(ErrorKind::from(AmbiguousRename {
2095                            depender: state
2096                                .resolve_full_name(entry.name(), entry.conn_id())
2097                                .to_string(),
2098                            dependee: state
2099                                .resolve_full_name(entry.name(), entry.conn_id())
2100                                .to_string(),
2101                            message: e,
2102                        }))
2103                    })?;
2104
2105                for id in entry.referenced_by() {
2106                    let dependent_item = state.get_entry(id);
2107                    let mut to_entry = dependent_item.clone();
2108                    to_entry.item = dependent_item
2109                        .item()
2110                        .rename_item_refs(
2111                            current_full_name.clone(),
2112                            to_full_name.item.clone(),
2113                            false,
2114                        )
2115                        .map_err(|e| {
2116                            Error::new(ErrorKind::from(AmbiguousRename {
2117                                depender: state
2118                                    .resolve_full_name(
2119                                        dependent_item.name(),
2120                                        dependent_item.conn_id(),
2121                                    )
2122                                    .to_string(),
2123                                dependee: state
2124                                    .resolve_full_name(entry.name(), entry.conn_id())
2125                                    .to_string(),
2126                                message: e,
2127                            }))
2128                        })?;
2129
2130                    if !to_entry.item().is_temporary() {
2131                        tx.update_item(*id, to_entry.into())?;
2132                    } else {
2133                        temporary_item_updates
2134                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2135                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2136                    }
2137                    updates.push(*id);
2138                }
2139                if !new_entry.item().is_temporary() {
2140                    tx.update_item(id, new_entry.into())?;
2141                } else {
2142                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2143                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2144                }
2145
2146                updates.push(id);
2147                for id in updates {
2148                    Self::log_update(state, &id);
2149                }
2150            }
2151            Op::RenameSchema {
2152                database_spec,
2153                schema_spec,
2154                new_name,
2155                check_reserved_names,
2156            } => {
2157                if check_reserved_names && is_reserved_name(&new_name) {
2158                    return Err(AdapterError::Catalog(Error::new(
2159                        ErrorKind::ReservedSchemaName(new_name),
2160                    )));
2161                }
2162
2163                let conn_id = session
2164                    .map(|session| session.conn_id())
2165                    .unwrap_or(&SYSTEM_CONN_ID);
2166
2167                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2168                let cur_name = schema.name().schema.clone();
2169
2170                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2171                    return Err(AdapterError::Catalog(Error::new(
2172                        ErrorKind::AmbientSchemaRename(cur_name),
2173                    )));
2174                };
2175                let database = state.get_database(&database_id);
2176                let database_name = &database.name;
2177
2178                let mut updates = Vec::new();
2179                let mut items_to_update = BTreeMap::new();
2180
2181                let mut update_item = |id| {
2182                    if items_to_update.contains_key(id) {
2183                        return Ok(());
2184                    }
2185
2186                    let entry = state.get_entry(id);
2187
2188                    // Update our item.
2189                    let mut new_entry = entry.clone();
2190                    new_entry.item = entry
2191                        .item
2192                        .rename_schema_refs(database_name, &cur_name, &new_name)
2193                        .map_err(|(s, _i)| {
2194                            Error::new(ErrorKind::from(AmbiguousRename {
2195                                depender: state
2196                                    .resolve_full_name(entry.name(), entry.conn_id())
2197                                    .to_string(),
2198                                dependee: format!("{database_name}.{cur_name}"),
2199                                message: format!("ambiguous reference to schema named {s}"),
2200                            }))
2201                        })?;
2202
2203                    // Queue updates for Catalog storage and Builtin Tables.
2204                    if !new_entry.item().is_temporary() {
2205                        items_to_update.insert(*id, new_entry.into());
2206                    } else {
2207                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2208                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2209                    }
2210                    updates.push(id);
2211
2212                    Ok::<_, AdapterError>(())
2213                };
2214
2215                // Update all of the items in the schema.
2216                for (_name, item_id) in &schema.items {
2217                    // Update the item itself.
2218                    update_item(item_id)?;
2219
2220                    // Update everything that depends on this item.
2221                    for id in state.get_entry(item_id).referenced_by() {
2222                        update_item(id)?;
2223                    }
2224                }
2225                // Note: When updating the transaction it's very important that we update the
2226                // items as a whole group, otherwise we exhibit quadratic behavior.
2227                tx.update_items(items_to_update)?;
2228
2229                // Renaming temporary schemas is not supported.
2230                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2231                    let schema_name = schema.name().schema.clone();
2232                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2233                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2234                    )));
2235                };
2236
2237                // Add an entry to the audit log.
2238                let database_name = database_spec
2239                    .id()
2240                    .map(|id| state.get_database(&id).name.clone());
2241                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2242                    id: schema_id.to_string(),
2243                    old_name: schema.name().schema.clone(),
2244                    new_name: new_name.clone(),
2245                    database_name,
2246                });
2247                CatalogState::add_to_audit_log(
2248                    &state.system_configuration,
2249                    oracle_write_ts,
2250                    session,
2251                    tx,
2252                    audit_events,
2253                    EventType::Alter,
2254                    mz_audit_log::ObjectType::Schema,
2255                    details,
2256                )?;
2257
2258                // Update the schema itself.
2259                let mut new_schema = schema.clone();
2260                new_schema.name.schema.clone_from(&new_name);
2261                tx.update_schema(schema_id, new_schema.into())?;
2262
2263                for id in updates {
2264                    Self::log_update(state, id);
2265                }
2266            }
2267            Op::UpdateOwner { id, new_owner } => {
2268                let conn_id = session
2269                    .map(|session| session.conn_id())
2270                    .unwrap_or(&SYSTEM_CONN_ID);
2271                let old_owner = state
2272                    .get_owner_id(&id, conn_id)
2273                    .expect("cannot update the owner of an object without an owner");
2274                match &id {
2275                    ObjectId::Cluster(id) => {
2276                        let mut cluster = state.get_cluster(*id).clone();
2277                        if id.is_system() {
2278                            return Err(AdapterError::Catalog(Error::new(
2279                                ErrorKind::ReadOnlyCluster(cluster.name),
2280                            )));
2281                        }
2282                        Self::update_privilege_owners(
2283                            &mut cluster.privileges,
2284                            cluster.owner_id,
2285                            new_owner,
2286                        );
2287                        cluster.owner_id = new_owner;
2288                        tx.update_cluster(*id, cluster.into())?;
2289                    }
2290                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2291                        let cluster = state.get_cluster(*cluster_id);
2292                        let mut replica = cluster
2293                            .replica(*replica_id)
2294                            .expect("catalog out of sync")
2295                            .clone();
2296                        if replica_id.is_system() {
2297                            return Err(AdapterError::Catalog(Error::new(
2298                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2299                            )));
2300                        }
2301                        replica.owner_id = new_owner;
2302                        tx.update_cluster_replica(*replica_id, replica.into())?;
2303                    }
2304                    ObjectId::Database(id) => {
2305                        let mut database = state.get_database(id).clone();
2306                        if id.is_system() {
2307                            return Err(AdapterError::Catalog(Error::new(
2308                                ErrorKind::ReadOnlyDatabase(database.name),
2309                            )));
2310                        }
2311                        Self::update_privilege_owners(
2312                            &mut database.privileges,
2313                            database.owner_id,
2314                            new_owner,
2315                        );
2316                        database.owner_id = new_owner;
2317                        tx.update_database(*id, database.clone().into())?;
2318                    }
2319                    ObjectId::Schema((database_spec, schema_spec)) => {
2320                        let schema_id: SchemaId = schema_spec.clone().into();
2321                        let mut schema = state
2322                            .get_schema(database_spec, schema_spec, conn_id)
2323                            .clone();
2324                        if schema_id.is_system() {
2325                            let name = schema.name();
2326                            let full_name = state.resolve_full_schema_name(name);
2327                            return Err(AdapterError::Catalog(Error::new(
2328                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2329                            )));
2330                        }
2331                        Self::update_privilege_owners(
2332                            &mut schema.privileges,
2333                            schema.owner_id,
2334                            new_owner,
2335                        );
2336                        schema.owner_id = new_owner;
2337                        tx.update_schema(schema_id, schema.into())?;
2338                    }
2339                    ObjectId::Item(id) => {
2340                        let entry = state.get_entry(id);
2341                        let mut new_entry = entry.clone();
2342                        if id.is_system() {
2343                            let full_name = state.resolve_full_name(
2344                                new_entry.name(),
2345                                session.map(|session| session.conn_id()),
2346                            );
2347                            return Err(AdapterError::Catalog(Error::new(
2348                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2349                            )));
2350                        }
2351                        Self::update_privilege_owners(
2352                            &mut new_entry.privileges,
2353                            new_entry.owner_id,
2354                            new_owner,
2355                        );
2356                        new_entry.owner_id = new_owner;
2357                        if !new_entry.item().is_temporary() {
2358                            tx.update_item(*id, new_entry.into())?;
2359                        } else {
2360                            temporary_item_updates
2361                                .push((entry.clone().into(), StateDiff::Retraction));
2362                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2363                        }
2364                    }
2365                    ObjectId::NetworkPolicy(id) => {
2366                        let mut policy = state.get_network_policy(id).clone();
2367                        if id.is_system() {
2368                            return Err(AdapterError::Catalog(Error::new(
2369                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2370                            )));
2371                        }
2372                        Self::update_privilege_owners(
2373                            &mut policy.privileges,
2374                            policy.owner_id,
2375                            new_owner,
2376                        );
2377                        policy.owner_id = new_owner;
2378                        tx.update_network_policy(*id, policy.into())?;
2379                    }
2380                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2381                }
2382                let object_type = state.get_object_type(&id);
2383                CatalogState::add_to_audit_log(
2384                    &state.system_configuration,
2385                    oracle_write_ts,
2386                    session,
2387                    tx,
2388                    audit_events,
2389                    EventType::Alter,
2390                    object_type_to_audit_object_type(object_type),
2391                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2392                        object_id: id.to_string(),
2393                        old_owner_id: old_owner.to_string(),
2394                        new_owner_id: new_owner.to_string(),
2395                    }),
2396                )?;
2397            }
2398            Op::UpdateClusterConfig { id, name, config } => {
2399                let mut cluster = state.get_cluster(id).clone();
2400                cluster.config = config;
2401                tx.update_cluster(id, cluster.into())?;
2402                info!("update cluster {}", name);
2403
2404                CatalogState::add_to_audit_log(
2405                    &state.system_configuration,
2406                    oracle_write_ts,
2407                    session,
2408                    tx,
2409                    audit_events,
2410                    EventType::Alter,
2411                    ObjectType::Cluster,
2412                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2413                        id: id.to_string(),
2414                        name,
2415                    }),
2416                )?;
2417            }
2418            Op::UpdateClusterReplicaConfig {
2419                replica_id,
2420                cluster_id,
2421                config,
2422            } => {
2423                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2424                info!("update replica {}", replica.name);
2425                tx.update_cluster_replica(
2426                    replica_id,
2427                    mz_catalog::durable::ClusterReplica {
2428                        cluster_id,
2429                        replica_id,
2430                        name: replica.name.clone(),
2431                        config: config.clone().into(),
2432                        owner_id: replica.owner_id,
2433                    },
2434                )?;
2435            }
2436            Op::UpdateItem { id, name, to_item } => {
2437                let mut entry = state.get_entry(&id).clone();
2438                entry.name = name.clone();
2439                entry.item = to_item.clone();
2440                tx.update_item(id, entry.into())?;
2441
2442                if Self::should_audit_log_item(&to_item) {
2443                    let mut full_name = Self::full_name_detail(
2444                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2445                    );
2446                    full_name.item = name.item;
2447
2448                    CatalogState::add_to_audit_log(
2449                        &state.system_configuration,
2450                        oracle_write_ts,
2451                        session,
2452                        tx,
2453                        audit_events,
2454                        EventType::Alter,
2455                        catalog_type_to_audit_object_type(to_item.typ()),
2456                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2457                            id: id.to_string(),
2458                            name: full_name,
2459                        }),
2460                    )?;
2461                }
2462
2463                Self::log_update(state, &id);
2464            }
2465            Op::UpdateSystemConfiguration { name, value } => {
2466                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2467                tx.upsert_system_config(&name, parsed_value.clone())?;
2468                // This mirrors some "system vars" into the catalog storage
2469                // "config" collection so that we can toggle the flag with
2470                // Launch Darkly, but use it in boot before Launch Darkly is
2471                // available.
2472                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2473                    let with_0dt_deployment_max_wait =
2474                        Duration::parse(VarInput::Flat(&parsed_value))
2475                            .expect("parsing succeeded above");
2476                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2477                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2478                    let with_0dt_deployment_ddl_check_interval =
2479                        Duration::parse(VarInput::Flat(&parsed_value))
2480                            .expect("parsing succeeded above");
2481                    tx.set_0dt_deployment_ddl_check_interval(
2482                        with_0dt_deployment_ddl_check_interval,
2483                    )?;
2484                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2485                    let panic_after_timeout =
2486                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2487                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2488                }
2489
2490                CatalogState::add_to_audit_log(
2491                    &state.system_configuration,
2492                    oracle_write_ts,
2493                    session,
2494                    tx,
2495                    audit_events,
2496                    EventType::Alter,
2497                    ObjectType::System,
2498                    EventDetails::SetV1(mz_audit_log::SetV1 {
2499                        name,
2500                        value: Some(value.borrow().to_vec().join(", ")),
2501                    }),
2502                )?;
2503            }
2504            Op::ResetSystemConfiguration { name } => {
2505                tx.remove_system_config(&name);
2506                // This mirrors some "system vars" into the catalog storage
2507                // "config" collection so that we can toggle the flag with
2508                // Launch Darkly, but use it in boot before Launch Darkly is
2509                // available.
2510                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2511                    tx.reset_0dt_deployment_max_wait()?;
2512                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2513                    tx.reset_0dt_deployment_ddl_check_interval()?;
2514                }
2515
2516                CatalogState::add_to_audit_log(
2517                    &state.system_configuration,
2518                    oracle_write_ts,
2519                    session,
2520                    tx,
2521                    audit_events,
2522                    EventType::Alter,
2523                    ObjectType::System,
2524                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2525                )?;
2526            }
2527            Op::ResetAllSystemConfiguration => {
2528                tx.clear_system_configs();
2529                tx.reset_0dt_deployment_max_wait()?;
2530                tx.reset_0dt_deployment_ddl_check_interval()?;
2531
2532                CatalogState::add_to_audit_log(
2533                    &state.system_configuration,
2534                    oracle_write_ts,
2535                    session,
2536                    tx,
2537                    audit_events,
2538                    EventType::Alter,
2539                    ObjectType::System,
2540                    EventDetails::ResetAllV1,
2541                )?;
2542            }
2543            Op::WeirdStorageUsageUpdates {
2544                object_id,
2545                size_bytes,
2546                collection_timestamp,
2547            } => {
2548                let id = tx.allocate_storage_usage_ids()?;
2549                let metric =
2550                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2551                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2552                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2553                weird_builtin_table_update = Some(builtin_table_update);
2554            }
2555        };
2556        Ok((weird_builtin_table_update, temporary_item_updates))
2557    }
2558
2559    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2560        let entry = state.get_entry(id);
2561        info!(
2562            "update {} {} ({})",
2563            entry.item_type(),
2564            state.resolve_full_name(entry.name(), entry.conn_id()),
2565            id
2566        );
2567    }
2568
2569    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2570    /// implementation:
2571    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2572    fn update_privilege_owners(
2573        privileges: &mut PrivilegeMap,
2574        old_owner: RoleId,
2575        new_owner: RoleId,
2576    ) {
2577        // TODO(jkosh44) Would be nice not to clone every privilege.
2578        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2579
2580        let mut new_present = false;
2581        for privilege in flat_privileges.iter_mut() {
2582            // Old owner's granted privilege are updated to be granted by the new
2583            // owner.
2584            if privilege.grantor == old_owner {
2585                privilege.grantor = new_owner;
2586            } else if privilege.grantor == new_owner {
2587                new_present = true;
2588            }
2589            // Old owner's privileges is given to the new owner.
2590            if privilege.grantee == old_owner {
2591                privilege.grantee = new_owner;
2592            } else if privilege.grantee == new_owner {
2593                new_present = true;
2594            }
2595        }
2596
2597        // If the old privilege list contained references to the new owner, we may
2598        // have created duplicate entries. Here we try and consolidate them. This
2599        // is inspired by PostgreSQL's algorithm but not identical.
2600        if new_present {
2601            // Group privileges by (grantee, grantor).
2602            let privilege_map: BTreeMap<_, Vec<_>> =
2603                flat_privileges
2604                    .into_iter()
2605                    .fold(BTreeMap::new(), |mut accum, privilege| {
2606                        accum
2607                            .entry((privilege.grantee, privilege.grantor))
2608                            .or_default()
2609                            .push(privilege);
2610                        accum
2611                    });
2612
2613            // Consolidate and update all privileges.
2614            flat_privileges = privilege_map
2615                .into_iter()
2616                .map(|((grantee, grantor), values)|
2617                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2618                    values.into_iter().fold(
2619                        MzAclItem::empty(grantee, grantor),
2620                        |mut accum, mz_aclitem| {
2621                            accum.acl_mode =
2622                                accum.acl_mode.union(mz_aclitem.acl_mode);
2623                            accum
2624                        },
2625                    ))
2626                .collect();
2627        }
2628
2629        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2630    }
2631}
2632
2633/// Prepare the given transaction for replacing a catalog item with a new version.
2634///
2635/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2636/// dependent objects to refer to that new ID (at a previous version).
2637///
2638/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2639/// for catalog items. We currently think that there are no use cases that require this assumption,
2640/// but no way to know for sure.
2641fn tx_replace_item(
2642    tx: &mut Transaction<'_>,
2643    state: &CatalogState,
2644    id: CatalogItemId,
2645    new_entry: CatalogEntry,
2646) -> Result<(), AdapterError> {
2647    let new_id = new_entry.id;
2648
2649    // Rewrite dependent objects to point to the new ID.
2650    for use_id in new_entry.referenced_by() {
2651        // The dependent might be dropped in the same tx, so check.
2652        if tx.get_item(use_id).is_none() {
2653            continue;
2654        }
2655
2656        let mut dependent = state.get_entry(use_id).clone();
2657        dependent.item = dependent.item.replace_item_refs(id, new_id);
2658        tx.update_item(*use_id, dependent.into())?;
2659    }
2660
2661    // Move comments to the new ID.
2662    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2663    let new_comment_id = new_entry.comment_object_id();
2664    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2665        tx.drop_comments(&[old_comment_id].into())?;
2666        for (sub, comment) in comments {
2667            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2668        }
2669    }
2670
2671    let mz_catalog::durable::Item {
2672        id: _,
2673        oid,
2674        global_id,
2675        schema_id,
2676        name,
2677        create_sql,
2678        owner_id,
2679        privileges,
2680        extra_versions,
2681    } = new_entry.into();
2682
2683    tx.remove_item(id)?;
2684    tx.insert_item(
2685        new_id,
2686        oid,
2687        global_id,
2688        schema_id,
2689        &name,
2690        create_sql,
2691        owner_id,
2692        privileges,
2693        extra_versions,
2694    )?;
2695
2696    Ok(())
2697}
2698
2699/// Generate audit events for a replacement apply operation.
2700fn apply_replacement_audit_events(
2701    state: &CatalogState,
2702    target: &CatalogEntry,
2703    replacement: &CatalogEntry,
2704) -> Vec<(EventType, EventDetails)> {
2705    let mut events = Vec::new();
2706
2707    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2708    let target_id_name = IdFullNameV1 {
2709        id: target.id().to_string(),
2710        name: Catalog::full_name_detail(&target_name),
2711    };
2712    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2713    let replacement_id_name = IdFullNameV1 {
2714        id: replacement.id().to_string(),
2715        name: Catalog::full_name_detail(&replacement_name),
2716    };
2717
2718    if Catalog::should_audit_log_item(&replacement.item) {
2719        events.push((
2720            EventType::Drop,
2721            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2722        ));
2723    }
2724
2725    if Catalog::should_audit_log_item(&target.item) {
2726        events.push((
2727            EventType::Alter,
2728            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2729                target: target_id_name.clone(),
2730                replacement: replacement_id_name,
2731            }),
2732        ));
2733
2734        if let Some(old_cluster_id) = target.cluster_id()
2735            && let Some(new_cluster_id) = replacement.cluster_id()
2736            && old_cluster_id != new_cluster_id
2737        {
2738            // When the replacement is applied, the target takes on the ID of the replacement, so
2739            // we should use that ID for subsequent events.
2740            events.push((
2741                EventType::Alter,
2742                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2743                    id: replacement.id().to_string(),
2744                    name: target_id_name.name,
2745                    old_cluster_id: old_cluster_id.to_string(),
2746                    new_cluster_id: new_cluster_id.to_string(),
2747                }),
2748            ));
2749        }
2750    }
2751
2752    events
2753}
2754
2755/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2756///
2757/// Note: Previously we used to omit a single `Op::DropObject` for every object
2758/// we needed to drop. But removing a batch of objects from a durable Catalog
2759/// Transaction is O(n) where `n` is the number of objects that exist in the
2760/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2761/// `DROP ... CASCADE` statement.
2762#[derive(Debug, Default)]
2763pub(crate) struct ObjectsToDrop {
2764    pub comments: BTreeSet<CommentObjectId>,
2765    pub databases: BTreeSet<DatabaseId>,
2766    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2767    pub clusters: BTreeSet<ClusterId>,
2768    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2769    pub roles: BTreeSet<RoleId>,
2770    pub items: Vec<CatalogItemId>,
2771    pub network_policies: BTreeSet<NetworkPolicyId>,
2772}
2773
2774impl ObjectsToDrop {
2775    pub fn generate(
2776        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2777        state: &CatalogState,
2778        session: Option<&ConnMeta>,
2779    ) -> Result<Self, AdapterError> {
2780        let mut delta = ObjectsToDrop::default();
2781
2782        for drop_object_info in drop_object_infos {
2783            delta.add_item(drop_object_info, state, session)?;
2784        }
2785
2786        Ok(delta)
2787    }
2788
2789    fn add_item(
2790        &mut self,
2791        drop_object_info: DropObjectInfo,
2792        state: &CatalogState,
2793        session: Option<&ConnMeta>,
2794    ) -> Result<(), AdapterError> {
2795        self.comments
2796            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2797
2798        match drop_object_info {
2799            DropObjectInfo::Database(database_id) => {
2800                let database = &state.database_by_id[&database_id];
2801                if database_id.is_system() {
2802                    return Err(AdapterError::Catalog(Error::new(
2803                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2804                    )));
2805                }
2806
2807                self.databases.insert(database_id);
2808            }
2809            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2810                let schema = state.get_schema(
2811                    &database_spec,
2812                    &schema_spec,
2813                    session
2814                        .map(|session| session.conn_id())
2815                        .unwrap_or(&SYSTEM_CONN_ID),
2816                );
2817                let schema_id: SchemaId = schema_spec.into();
2818                if schema_id.is_system() {
2819                    let name = schema.name();
2820                    let full_name = state.resolve_full_schema_name(name);
2821                    return Err(AdapterError::Catalog(Error::new(
2822                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2823                    )));
2824                }
2825
2826                self.schemas.insert(schema_spec, database_spec);
2827            }
2828            DropObjectInfo::Role(role_id) => {
2829                let name = state.get_role(&role_id).name().to_string();
2830                if role_id.is_system() || role_id.is_predefined() {
2831                    return Err(AdapterError::Catalog(Error::new(
2832                        ErrorKind::ReservedRoleName(name.clone()),
2833                    )));
2834                }
2835                state.ensure_not_reserved_role(&role_id)?;
2836
2837                self.roles.insert(role_id);
2838            }
2839            DropObjectInfo::Cluster(cluster_id) => {
2840                let cluster = state.get_cluster(cluster_id);
2841                let name = &cluster.name;
2842                if cluster_id.is_system() {
2843                    return Err(AdapterError::Catalog(Error::new(
2844                        ErrorKind::ReadOnlyCluster(name.clone()),
2845                    )));
2846                }
2847
2848                self.clusters.insert(cluster_id);
2849            }
2850            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2851                let cluster = state.get_cluster(cluster_id);
2852                let replica = cluster.replica(replica_id).expect("Must exist");
2853
2854                self.replicas
2855                    .insert(replica.replica_id, (cluster.id, reason));
2856            }
2857            DropObjectInfo::Item(item_id) => {
2858                let entry = state.get_entry(&item_id);
2859                if item_id.is_system() {
2860                    let name = entry.name();
2861                    let full_name =
2862                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2863                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2864                        full_name.to_string(),
2865                    ))));
2866                }
2867
2868                self.items.push(item_id);
2869            }
2870            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2871                let policy = state.get_network_policy(&network_policy_id);
2872                let name = &policy.name;
2873                if network_policy_id.is_system() {
2874                    return Err(AdapterError::Catalog(Error::new(
2875                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2876                    )));
2877                }
2878
2879                self.network_policies.insert(network_policy_id);
2880            }
2881        }
2882
2883        Ok(())
2884    }
2885}
2886
2887#[cfg(test)]
2888mod tests {
2889    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2890    use mz_repr::role_id::RoleId;
2891
2892    use crate::catalog::Catalog;
2893
2894    #[mz_ore::test]
2895    fn test_update_privilege_owners() {
2896        let old_owner = RoleId::User(1);
2897        let new_owner = RoleId::User(2);
2898        let other_role = RoleId::User(3);
2899
2900        // older owner exists as grantor.
2901        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2902            MzAclItem {
2903                grantee: other_role,
2904                grantor: old_owner,
2905                acl_mode: AclMode::UPDATE,
2906            },
2907            MzAclItem {
2908                grantee: other_role,
2909                grantor: new_owner,
2910                acl_mode: AclMode::SELECT,
2911            },
2912        ]);
2913        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2914        assert_eq!(1, privileges.all_values().count());
2915        assert_eq!(
2916            vec![MzAclItem {
2917                grantee: other_role,
2918                grantor: new_owner,
2919                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2920            }],
2921            privileges.all_values_owned().collect::<Vec<_>>()
2922        );
2923
2924        // older owner exists as grantee.
2925        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2926            MzAclItem {
2927                grantee: old_owner,
2928                grantor: other_role,
2929                acl_mode: AclMode::UPDATE,
2930            },
2931            MzAclItem {
2932                grantee: new_owner,
2933                grantor: other_role,
2934                acl_mode: AclMode::SELECT,
2935            },
2936        ]);
2937        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2938        assert_eq!(1, privileges.all_values().count());
2939        assert_eq!(
2940            vec![MzAclItem {
2941                grantee: new_owner,
2942                grantor: other_role,
2943                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2944            }],
2945            privileges.all_values_owned().collect::<Vec<_>>()
2946        );
2947
2948        // older owner exists as grantee and grantor.
2949        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2950            MzAclItem {
2951                grantee: old_owner,
2952                grantor: old_owner,
2953                acl_mode: AclMode::UPDATE,
2954            },
2955            MzAclItem {
2956                grantee: new_owner,
2957                grantor: new_owner,
2958                acl_mode: AclMode::SELECT,
2959            },
2960        ]);
2961        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2962        assert_eq!(1, privileges.all_values().count());
2963        assert_eq!(
2964            vec![MzAclItem {
2965                grantee: new_owner,
2966                grantor: new_owner,
2967                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2968            }],
2969            privileges.all_values_owned().collect::<Vec<_>>()
2970        );
2971    }
2972}