Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff,
35    StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52    RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72    is_reserved_role_name, object_type_to_audit_object_type,
73    system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82    AlterRetainHistory {
83        id: CatalogItemId,
84        value: Option<Value>,
85        window: CompactionWindow,
86    },
87    AlterRole {
88        id: RoleId,
89        name: String,
90        attributes: RoleAttributesRaw,
91        nopassword: bool,
92        vars: RoleVars,
93    },
94    AlterNetworkPolicy {
95        id: NetworkPolicyId,
96        rules: Vec<NetworkPolicyRule>,
97        name: String,
98        owner_id: RoleId,
99    },
100    AlterAddColumn {
101        id: CatalogItemId,
102        new_global_id: GlobalId,
103        name: ColumnName,
104        typ: SqlColumnType,
105        sql: RawDataType,
106    },
107    AlterMaterializedViewApplyReplacement {
108        id: CatalogItemId,
109        replacement_id: CatalogItemId,
110    },
111    CreateDatabase {
112        name: String,
113        owner_id: RoleId,
114    },
115    CreateSchema {
116        database_id: ResolvedDatabaseSpecifier,
117        schema_name: String,
118        owner_id: RoleId,
119    },
120    CreateRole {
121        name: String,
122        attributes: RoleAttributesRaw,
123    },
124    CreateCluster {
125        id: ClusterId,
126        name: String,
127        introspection_sources: Vec<&'static BuiltinLog>,
128        owner_id: RoleId,
129        config: ClusterConfig,
130    },
131    CreateClusterReplica {
132        cluster_id: ClusterId,
133        name: String,
134        config: ReplicaConfig,
135        owner_id: RoleId,
136        reason: ReplicaCreateDropReason,
137    },
138    CreateItem {
139        id: CatalogItemId,
140        name: QualifiedItemName,
141        item: CatalogItem,
142        owner_id: RoleId,
143    },
144    CreateNetworkPolicy {
145        rules: Vec<NetworkPolicyRule>,
146        name: String,
147        owner_id: RoleId,
148    },
149    Comment {
150        object_id: CommentObjectId,
151        sub_component: Option<usize>,
152        comment: Option<String>,
153    },
154    DropObjects(Vec<DropObjectInfo>),
155    GrantRole {
156        role_id: RoleId,
157        member_id: RoleId,
158        grantor_id: RoleId,
159    },
160    RenameCluster {
161        id: ClusterId,
162        name: String,
163        to_name: String,
164        check_reserved_names: bool,
165    },
166    RenameClusterReplica {
167        cluster_id: ClusterId,
168        replica_id: ReplicaId,
169        name: QualifiedReplica,
170        to_name: String,
171    },
172    RenameItem {
173        id: CatalogItemId,
174        current_full_name: FullItemName,
175        to_name: String,
176    },
177    RenameSchema {
178        database_spec: ResolvedDatabaseSpecifier,
179        schema_spec: SchemaSpecifier,
180        new_name: String,
181        check_reserved_names: bool,
182    },
183    UpdateOwner {
184        id: ObjectId,
185        new_owner: RoleId,
186    },
187    UpdatePrivilege {
188        target_id: SystemObjectId,
189        privilege: MzAclItem,
190        variant: UpdatePrivilegeVariant,
191    },
192    UpdateDefaultPrivilege {
193        privilege_object: DefaultPrivilegeObject,
194        privilege_acl_item: DefaultPrivilegeAclItem,
195        variant: UpdatePrivilegeVariant,
196    },
197    RevokeRole {
198        role_id: RoleId,
199        member_id: RoleId,
200        grantor_id: RoleId,
201    },
202    UpdateClusterConfig {
203        id: ClusterId,
204        name: String,
205        config: ClusterConfig,
206    },
207    UpdateClusterReplicaConfig {
208        cluster_id: ClusterId,
209        replica_id: ReplicaId,
210        config: ReplicaConfig,
211    },
212    UpdateItem {
213        id: CatalogItemId,
214        name: QualifiedItemName,
215        to_item: CatalogItem,
216    },
217    UpdateSourceReferences {
218        source_id: CatalogItemId,
219        references: SourceReferences,
220    },
221    UpdateSystemConfiguration {
222        name: String,
223        value: OwnedVarInput,
224    },
225    ResetSystemConfiguration {
226        name: String,
227    },
228    ResetAllSystemConfiguration,
229    /// Performs updates to the storage usage table, which probably should be a builtin source.
230    ///
231    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
232    /// might not work. If a process crashes after collecting storage usage events
233    /// but before updating the builtin table, then another listening catalog
234    /// will never know to update the builtin table.
235    WeirdStorageUsageUpdates {
236        object_id: Option<String>,
237        size_bytes: u64,
238        collection_timestamp: EpochMillis,
239    },
240    /// Performs a dry run of the commit, but errors with
241    /// [`AdapterError::TransactionDryRun`].
242    ///
243    /// When using this value, it should be included only as the last element of
244    /// the transaction and should not be the only value in the transaction.
245    TransactionDryRun,
246}
247
248/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
249/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
250/// the `Op::DropObjects`.
251#[derive(Debug, Clone)]
252pub enum DropObjectInfo {
253    Cluster(ClusterId),
254    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
255    Database(DatabaseId),
256    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
257    Role(RoleId),
258    Item(CatalogItemId),
259    NetworkPolicy(NetworkPolicyId),
260}
261
262impl DropObjectInfo {
263    /// Creates a `DropObjectInfo` from an `ObjectId`.
264    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
265    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
266        match id {
267            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
268            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
269                cluster_id,
270                replica_id,
271                ReplicaCreateDropReason::Manual,
272            )),
273            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
274            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
275            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
276            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
277            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
278        }
279    }
280
281    /// Creates an `ObjectId` from a `DropObjectInfo`.
282    /// Loses the `ReplicaCreateDropReason` if there is one!
283    fn to_object_id(&self) -> ObjectId {
284        match &self {
285            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
286            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
287                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
288            }
289            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
290            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
291            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
292            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
293            DropObjectInfo::NetworkPolicy(network_policy_id) => {
294                ObjectId::NetworkPolicy(network_policy_id.clone())
295            }
296        }
297    }
298}
299
300/// The reason for creating or dropping a replica.
301#[derive(Debug, Clone)]
302pub enum ReplicaCreateDropReason {
303    /// The user initiated the replica create or drop, e.g., by
304    /// - creating/dropping a cluster,
305    /// - ALTERing various options on a managed cluster,
306    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
307    Manual,
308    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
309    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
310    ClusterScheduling(Vec<SchedulingDecision>),
311}
312
313impl ReplicaCreateDropReason {
314    pub fn into_audit_log(
315        self,
316    ) -> (
317        CreateOrDropClusterReplicaReasonV1,
318        Option<SchedulingDecisionsWithReasonsV2>,
319    ) {
320        let (reason, scheduling_policies) = match self {
321            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
322            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
323                CreateOrDropClusterReplicaReasonV1::Schedule,
324                Some(scheduling_decisions),
325            ),
326        };
327        (
328            reason,
329            scheduling_policies
330                .as_ref()
331                .map(SchedulingDecision::reasons_to_audit_log_reasons),
332        )
333    }
334}
335
336pub struct TransactionResult {
337    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
338    /// Parsed catalog updates from which we will derive catalog implications.
339    pub catalog_updates: Vec<ParsedStateUpdate>,
340    pub audit_events: Vec<VersionedEvent>,
341}
342
343impl Catalog {
344    fn should_audit_log_item(item: &CatalogItem) -> bool {
345        !item.is_temporary()
346    }
347
348    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
349    /// within a connection id.
350    fn temporary_ids(
351        &self,
352        ops: &[Op],
353        temporary_drops: BTreeSet<(&ConnectionId, String)>,
354    ) -> Result<BTreeSet<CatalogItemId>, Error> {
355        let mut creating = BTreeSet::new();
356        let mut temporary_ids = BTreeSet::new();
357        for op in ops.iter() {
358            if let Op::CreateItem {
359                id,
360                name,
361                item,
362                owner_id: _,
363            } = op
364            {
365                if let Some(conn_id) = item.conn_id() {
366                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
367                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
368                        || creating.contains(&(conn_id, &name.item))
369                    {
370                        return Err(
371                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
372                        );
373                    } else {
374                        creating.insert((conn_id, &name.item));
375                        temporary_ids.insert(id.clone());
376                    }
377                }
378            }
379        }
380        Ok(temporary_ids)
381    }
382
383    #[instrument(name = "catalog::transact")]
384    pub async fn transact(
385        &mut self,
386        // n.b. this is an option to prevent us from needing to build out a
387        // dummy impl of `StorageController` for tests.
388        storage_collections: Option<
389            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
390        >,
391        oracle_write_ts: mz_repr::Timestamp,
392        session: Option<&ConnMeta>,
393        ops: Vec<Op>,
394    ) -> Result<TransactionResult, AdapterError> {
395        trace!("transact: {:?}", ops);
396        fail::fail_point!("catalog_transact", |arg| {
397            Err(AdapterError::Unstructured(anyhow::anyhow!(
398                "failpoint: {arg:?}"
399            )))
400        });
401
402        let drop_ids: BTreeSet<CatalogItemId> = ops
403            .iter()
404            .filter_map(|op| match op {
405                Op::DropObjects(drop_object_infos) => {
406                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
407                    let item_ids = ids.filter_map(|id| match id {
408                        ObjectId::Item(id) => Some(id),
409                        _ => None,
410                    });
411                    Some(item_ids)
412                }
413                _ => None,
414            })
415            .flatten()
416            .collect();
417        let temporary_drops = drop_ids
418            .iter()
419            .filter_map(|id| {
420                let entry = self.get_entry(id);
421                match entry.item().conn_id() {
422                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
423                    None => None,
424                }
425            })
426            .collect();
427        let dropped_global_ids = drop_ids
428            .iter()
429            .flat_map(|item_id| self.get_global_ids(item_id))
430            .collect();
431
432        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
433        let mut builtin_table_updates = vec![];
434        let mut catalog_updates = vec![];
435        let mut audit_events = vec![];
436        let mut storage = self.storage().await;
437        let mut tx = storage
438            .transaction()
439            .await
440            .unwrap_or_terminate("starting catalog transaction");
441
442        let new_state = Self::transact_inner(
443            storage_collections,
444            oracle_write_ts,
445            session,
446            ops,
447            temporary_ids,
448            &mut builtin_table_updates,
449            &mut catalog_updates,
450            &mut audit_events,
451            &mut tx,
452            &self.state,
453        )
454        .await?;
455
456        // The user closure was successful, apply the updates. Terminate the
457        // process if this fails, because we have to restart envd due to
458        // indeterminate catalog state, which we only reconcile during catalog
459        // init.
460        tx.commit(oracle_write_ts)
461            .await
462            .unwrap_or_terminate("catalog storage transaction commit must succeed");
463
464        // Dropping here keeps the mutable borrow on self, preventing us accidentally
465        // mutating anything until after f is executed.
466        drop(storage);
467        if let Some(new_state) = new_state {
468            self.transient_revision += 1;
469            self.state = new_state;
470        }
471
472        // Drop in-memory planning metadata.
473        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
474        if self.state.system_config().enable_mz_notices() {
475            // Generate retractions for the Builtin tables.
476            self.state().pack_optimizer_notices(
477                &mut builtin_table_updates,
478                dropped_notices.iter(),
479                Diff::MINUS_ONE,
480            );
481        }
482
483        Ok(TransactionResult {
484            builtin_table_updates,
485            catalog_updates,
486            audit_events,
487        })
488    }
489
490    /// Extracts optimized expressions from `Op::CreateItem` operations for views
491    /// and materialized views. These can be used to populate a `LocalExpressionCache`
492    /// to avoid re-optimization during `apply_updates`.
493    fn extract_expressions_from_ops(
494        ops: &[Op],
495        optimizer_features: &OptimizerFeatures,
496    ) -> BTreeMap<GlobalId, LocalExpressions> {
497        let mut exprs = BTreeMap::new();
498
499        for op in ops {
500            if let Op::CreateItem { item, .. } = op {
501                match item {
502                    CatalogItem::View(view) => {
503                        exprs.insert(
504                            view.global_id,
505                            LocalExpressions {
506                                local_mir: (*view.optimized_expr).clone(),
507                                optimizer_features: optimizer_features.clone(),
508                            },
509                        );
510                    }
511                    CatalogItem::MaterializedView(mv) => {
512                        exprs.insert(
513                            mv.global_id_writes(),
514                            LocalExpressions {
515                                local_mir: (*mv.optimized_expr).clone(),
516                                optimizer_features: optimizer_features.clone(),
517                            },
518                        );
519                    }
520                    _ => {}
521                }
522            }
523        }
524
525        exprs
526    }
527
528    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
529    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
530    ///
531    /// # Panics
532    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
533    ///   final element.
534    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
535    #[instrument(name = "catalog::transact_inner")]
536    async fn transact_inner(
537        storage_collections: Option<
538            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
539        >,
540        oracle_write_ts: mz_repr::Timestamp,
541        session: Option<&ConnMeta>,
542        mut ops: Vec<Op>,
543        temporary_ids: BTreeSet<CatalogItemId>,
544        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
545        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
546        audit_events: &mut Vec<VersionedEvent>,
547        tx: &mut Transaction<'_>,
548        state: &CatalogState,
549    ) -> Result<Option<CatalogState>, AdapterError> {
550        // We come up with new catalog state, builtin state updates, and parsed
551        // catalog updates (for deriving catalog implications) in two phases:
552        //
553        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
554        //    one-by-one. This will give us the full list of updates to apply to
555        //    the catalog, which will allow us to apply it in one batch, which
556        //    in turn will allow the apply machinery to consolidate the updates.
557        // 2. We do one final apply call with all updates, which gives us the
558        //    final builtin table updates and parsed catalog updates.
559        //
560        // The reason is that the loop that is working off ops first does a
561        // transact_op to derive the state updates for that op, and then calls
562        // apply_updates on the catalog state. And successive ops might expect
563        // the catalog state to reflect the modified state _after_ applying
564        // previous ops.
565        //
566        // We want to, however, have one final apply_state that takes all the
567        // accumulated updates to derive the required controller updates and the
568        // builtin table updates.
569        //
570        // We won't win any DDL throughput benchmarks, but so far that's not
571        // what we're optimizing for and there would probably be other
572        // bottlenecks before we hit this one as a bottleneck.
573        //
574        // We could work around this by refactoring how the interplay of
575        // transact_op and apply_updates works, but that's a larger undertaking.
576        let mut preliminary_state = Cow::Borrowed(state);
577
578        // The final state that we will return, if modified.
579        let mut state = Cow::Borrowed(state);
580
581        let dry_run_ops = match ops.last() {
582            Some(Op::TransactionDryRun) => {
583                // Remove dry run marker.
584                ops.pop();
585                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
586                ops.clone()
587            }
588            Some(_) => vec![],
589            None => return Ok(None),
590        };
591
592        // Extract optimized expressions from CreateItem ops to avoid re-optimization
593        // during apply_updates. We extract before the loop since `ops` is moved there.
594        let optimizer_features = OptimizerFeatures::from(state.system_config());
595        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
596
597        let mut storage_collections_to_create = BTreeSet::new();
598        let mut storage_collections_to_drop = BTreeSet::new();
599        let mut storage_collections_to_register = BTreeMap::new();
600
601        let mut updates = Vec::new();
602
603        for op in ops {
604            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
605                oracle_write_ts,
606                session,
607                op,
608                &temporary_ids,
609                audit_events,
610                tx,
611                &*preliminary_state,
612                &mut storage_collections_to_create,
613                &mut storage_collections_to_drop,
614                &mut storage_collections_to_register,
615            )
616            .await?;
617
618            // Certain builtin tables are not derived from the durable catalog state, so they need
619            // to be updated ad-hoc based on the current transaction. This is weird and will not
620            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
621            // this instance crashes after committing the durable catalog but before applying the
622            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
623            // world. Currently, this works fine and there are no correctness issues because
624            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
625            // state of all builtin tables to the correct values.
626            if let Some(builtin_table_update) = weird_builtin_table_update {
627                builtin_table_updates.push(builtin_table_update);
628            }
629
630            // Temporary items are not stored in the durable catalog, so they need to be handled
631            // separately for updating state and builtin tables.
632            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
633            // in a multi-subscriber catalog world.
634            let upper = tx.upper();
635            let temporary_item_updates =
636                temporary_item_updates
637                    .into_iter()
638                    .map(|(item, diff)| StateUpdate {
639                        kind: StateUpdateKind::TemporaryItem(item),
640                        ts: upper,
641                        diff,
642                    });
643
644            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
645            op_updates.extend(temporary_item_updates);
646            if !op_updates.is_empty() {
647                // Clone the cache so each apply_updates call has access to cached expressions.
648                // The cache uses `remove` semantics, so we need a fresh clone for each call.
649                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
650                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
651                    .to_mut()
652                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
653                    .await;
654            }
655            updates.append(&mut op_updates);
656        }
657
658        if !updates.is_empty() {
659            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
660            let (op_builtin_table_updates, op_catalog_updates) = state
661                .to_mut()
662                .apply_updates(updates.clone(), &mut local_expr_cache)
663                .await;
664            let op_builtin_table_updates = state
665                .to_mut()
666                .resolve_builtin_table_updates(op_builtin_table_updates);
667            builtin_table_updates.extend(op_builtin_table_updates);
668            parsed_catalog_updates.extend(op_catalog_updates);
669        }
670
671        if dry_run_ops.is_empty() {
672            // `storage_collections` should only be `None` for tests.
673            if let Some(c) = storage_collections {
674                c.prepare_state(
675                    tx,
676                    storage_collections_to_create,
677                    storage_collections_to_drop,
678                    storage_collections_to_register,
679                )
680                .await?;
681            }
682
683            let updates = tx.get_and_commit_op_updates();
684            if !updates.is_empty() {
685                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
686                let (op_builtin_table_updates, op_catalog_updates) = state
687                    .to_mut()
688                    .apply_updates(updates.clone(), &mut local_expr_cache)
689                    .await;
690                let op_builtin_table_updates = state
691                    .to_mut()
692                    .resolve_builtin_table_updates(op_builtin_table_updates);
693                builtin_table_updates.extend(op_builtin_table_updates);
694                parsed_catalog_updates.extend(op_catalog_updates);
695            }
696
697            match state {
698                Cow::Owned(state) => Ok(Some(state)),
699                Cow::Borrowed(_) => Ok(None),
700            }
701        } else {
702            Err(AdapterError::TransactionDryRun {
703                new_ops: dry_run_ops,
704                new_state: state.into_owned(),
705            })
706        }
707    }
708
709    /// Performs the transaction operation described by `op`. This function prepares the changes in
710    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
711    /// changes.
712    ///
713    /// Optionally returns a builtin table update for any builtin table updates than cannot be
714    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
715    /// scenarios and ideally in the future don't exist.
716    #[instrument]
717    async fn transact_op(
718        oracle_write_ts: mz_repr::Timestamp,
719        session: Option<&ConnMeta>,
720        op: Op,
721        temporary_ids: &BTreeSet<CatalogItemId>,
722        audit_events: &mut Vec<VersionedEvent>,
723        tx: &mut Transaction<'_>,
724        state: &CatalogState,
725        storage_collections_to_create: &mut BTreeSet<GlobalId>,
726        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
727        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
728    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
729        let mut weird_builtin_table_update = None;
730        let mut temporary_item_updates = Vec::new();
731
732        match op {
733            Op::TransactionDryRun => {
734                unreachable!("TransactionDryRun can only be used a final element of ops")
735            }
736            Op::AlterRetainHistory { id, value, window } => {
737                let entry = state.get_entry(&id);
738                if id.is_system() {
739                    let name = entry.name();
740                    let full_name =
741                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
742                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
743                        full_name.to_string(),
744                    ))));
745                }
746
747                let mut new_entry = entry.clone();
748                let previous = new_entry
749                    .item
750                    .update_retain_history(value.clone(), window)
751                    .map_err(|_| {
752                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
753                            "planner should have rejected invalid alter retain history item type"
754                                .to_string(),
755                        )))
756                    })?;
757
758                if Self::should_audit_log_item(new_entry.item()) {
759                    let details =
760                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
761                            id: id.to_string(),
762                            old_history: previous.map(|previous| previous.to_string()),
763                            new_history: value.map(|v| v.to_string()),
764                        });
765                    CatalogState::add_to_audit_log(
766                        &state.system_configuration,
767                        oracle_write_ts,
768                        session,
769                        tx,
770                        audit_events,
771                        EventType::Alter,
772                        catalog_type_to_audit_object_type(new_entry.item().typ()),
773                        details,
774                    )?;
775                }
776
777                tx.update_item(id, new_entry.into())?;
778
779                Self::log_update(state, &id);
780            }
781            Op::AlterRole {
782                id,
783                name,
784                attributes,
785                nopassword,
786                vars,
787            } => {
788                state.ensure_not_reserved_role(&id)?;
789
790                let mut existing_role = state.get_role(&id).clone();
791                let password = attributes.password.clone();
792                let scram_iterations = attributes
793                    .scram_iterations
794                    .unwrap_or_else(|| state.system_config().scram_iterations());
795                existing_role.attributes = attributes.into();
796                existing_role.vars = vars;
797                let password_action = if nopassword {
798                    PasswordAction::Clear
799                } else if let Some(password) = password {
800                    PasswordAction::Set(PasswordConfig {
801                        password,
802                        scram_iterations,
803                    })
804                } else {
805                    PasswordAction::NoChange
806                };
807                tx.update_role(id, existing_role.into(), password_action)?;
808
809                CatalogState::add_to_audit_log(
810                    &state.system_configuration,
811                    oracle_write_ts,
812                    session,
813                    tx,
814                    audit_events,
815                    EventType::Alter,
816                    ObjectType::Role,
817                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
818                        id: id.to_string(),
819                        name: name.clone(),
820                    }),
821                )?;
822
823                info!("update role {name} ({id})");
824            }
825            Op::AlterNetworkPolicy {
826                id,
827                rules,
828                name,
829                owner_id: _owner_id,
830            } => {
831                let existing_policy = state.get_network_policy(&id).clone();
832                let mut policy: NetworkPolicy = existing_policy.into();
833                policy.rules = rules;
834                if is_reserved_name(&name) {
835                    return Err(AdapterError::Catalog(Error::new(
836                        ErrorKind::ReservedNetworkPolicyName(name),
837                    )));
838                }
839                tx.update_network_policy(id, policy.clone())?;
840
841                CatalogState::add_to_audit_log(
842                    &state.system_configuration,
843                    oracle_write_ts,
844                    session,
845                    tx,
846                    audit_events,
847                    EventType::Alter,
848                    ObjectType::NetworkPolicy,
849                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
850                        id: id.to_string(),
851                        name: name.clone(),
852                    }),
853                )?;
854
855                info!("update network policy {name} ({id})");
856            }
857            Op::AlterAddColumn {
858                id,
859                new_global_id,
860                name,
861                typ,
862                sql,
863            } => {
864                let mut new_entry = state.get_entry(&id).clone();
865                let version = new_entry.item.add_column(name, typ, sql)?;
866                // All versions of a table share the same shard, so it shouldn't matter what
867                // GlobalId we use here.
868                let shard_id = state
869                    .storage_metadata()
870                    .get_collection_shard(new_entry.latest_global_id())?;
871
872                // TODO(alter_table): Support adding columns to sources.
873                let CatalogItem::Table(table) = &mut new_entry.item else {
874                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
875                };
876                table.collections.insert(version, new_global_id);
877
878                tx.update_item(id, new_entry.into())?;
879                storage_collections_to_register.insert(new_global_id, shard_id);
880            }
881            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
882                let mut new_entry = state.get_entry(&id).clone();
883                let replacement = state.get_entry(&replacement_id);
884
885                let new_audit_events =
886                    apply_replacement_audit_events(state, &new_entry, replacement);
887
888                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
889                    return Err(AdapterError::internal(
890                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
891                        "id must refer to a materialized view",
892                    ));
893                };
894                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
895                    return Err(AdapterError::internal(
896                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
897                        "replacement_id must refer to a materialized view",
898                    ));
899                };
900
901                mv.apply_replacement(replacement_mv.clone());
902
903                tx.remove_item(replacement_id)?;
904
905                new_entry.id = replacement_id;
906                tx_replace_item(tx, state, id, new_entry)?;
907
908                let comment_id = CommentObjectId::MaterializedView(replacement_id);
909                tx.drop_comments(&[comment_id].into())?;
910
911                for (event_type, details) in new_audit_events {
912                    CatalogState::add_to_audit_log(
913                        &state.system_configuration,
914                        oracle_write_ts,
915                        session,
916                        tx,
917                        audit_events,
918                        event_type,
919                        ObjectType::MaterializedView,
920                        details,
921                    )?;
922                }
923            }
924            Op::CreateDatabase { name, owner_id } => {
925                let database_owner_privileges = vec![rbac::owner_privilege(
926                    mz_sql::catalog::ObjectType::Database,
927                    owner_id,
928                )];
929                let database_default_privileges = state
930                    .default_privileges
931                    .get_applicable_privileges(
932                        owner_id,
933                        None,
934                        None,
935                        mz_sql::catalog::ObjectType::Database,
936                    )
937                    .map(|item| item.mz_acl_item(owner_id));
938                let database_privileges: Vec<_> = merge_mz_acl_items(
939                    database_owner_privileges
940                        .into_iter()
941                        .chain(database_default_privileges),
942                )
943                .collect();
944
945                let schema_owner_privileges = vec![rbac::owner_privilege(
946                    mz_sql::catalog::ObjectType::Schema,
947                    owner_id,
948                )];
949                let schema_default_privileges = state
950                    .default_privileges
951                    .get_applicable_privileges(
952                        owner_id,
953                        None,
954                        None,
955                        mz_sql::catalog::ObjectType::Schema,
956                    )
957                    .map(|item| item.mz_acl_item(owner_id))
958                    // Special default privilege on public schemas.
959                    .chain(std::iter::once(MzAclItem {
960                        grantee: RoleId::Public,
961                        grantor: owner_id,
962                        acl_mode: AclMode::USAGE,
963                    }));
964                let schema_privileges: Vec<_> = merge_mz_acl_items(
965                    schema_owner_privileges
966                        .into_iter()
967                        .chain(schema_default_privileges),
968                )
969                .collect();
970
971                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
972                let (database_id, _) = tx.insert_user_database(
973                    &name,
974                    owner_id,
975                    database_privileges.clone(),
976                    &temporary_oids,
977                )?;
978                let (schema_id, _) = tx.insert_user_schema(
979                    database_id,
980                    DEFAULT_SCHEMA,
981                    owner_id,
982                    schema_privileges.clone(),
983                    &temporary_oids,
984                )?;
985                CatalogState::add_to_audit_log(
986                    &state.system_configuration,
987                    oracle_write_ts,
988                    session,
989                    tx,
990                    audit_events,
991                    EventType::Create,
992                    ObjectType::Database,
993                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
994                        id: database_id.to_string(),
995                        name: name.clone(),
996                    }),
997                )?;
998                info!("create database {}", name);
999
1000                CatalogState::add_to_audit_log(
1001                    &state.system_configuration,
1002                    oracle_write_ts,
1003                    session,
1004                    tx,
1005                    audit_events,
1006                    EventType::Create,
1007                    ObjectType::Schema,
1008                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1009                        id: schema_id.to_string(),
1010                        name: DEFAULT_SCHEMA.to_string(),
1011                        database_name: Some(name),
1012                    }),
1013                )?;
1014            }
1015            Op::CreateSchema {
1016                database_id,
1017                schema_name,
1018                owner_id,
1019            } => {
1020                if is_reserved_name(&schema_name) {
1021                    return Err(AdapterError::Catalog(Error::new(
1022                        ErrorKind::ReservedSchemaName(schema_name),
1023                    )));
1024                }
1025                let database_id = match database_id {
1026                    ResolvedDatabaseSpecifier::Id(id) => id,
1027                    ResolvedDatabaseSpecifier::Ambient => {
1028                        return Err(AdapterError::Catalog(Error::new(
1029                            ErrorKind::ReadOnlySystemSchema(schema_name),
1030                        )));
1031                    }
1032                };
1033                let owner_privileges = vec![rbac::owner_privilege(
1034                    mz_sql::catalog::ObjectType::Schema,
1035                    owner_id,
1036                )];
1037                let default_privileges = state
1038                    .default_privileges
1039                    .get_applicable_privileges(
1040                        owner_id,
1041                        Some(database_id),
1042                        None,
1043                        mz_sql::catalog::ObjectType::Schema,
1044                    )
1045                    .map(|item| item.mz_acl_item(owner_id));
1046                let privileges: Vec<_> =
1047                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1048                        .collect();
1049                let (schema_id, _) = tx.insert_user_schema(
1050                    database_id,
1051                    &schema_name,
1052                    owner_id,
1053                    privileges.clone(),
1054                    &state.get_temporary_oids().collect(),
1055                )?;
1056                CatalogState::add_to_audit_log(
1057                    &state.system_configuration,
1058                    oracle_write_ts,
1059                    session,
1060                    tx,
1061                    audit_events,
1062                    EventType::Create,
1063                    ObjectType::Schema,
1064                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1065                        id: schema_id.to_string(),
1066                        name: schema_name.clone(),
1067                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1068                    }),
1069                )?;
1070            }
1071            Op::CreateRole { name, attributes } => {
1072                if is_reserved_role_name(&name) {
1073                    return Err(AdapterError::Catalog(Error::new(
1074                        ErrorKind::ReservedRoleName(name),
1075                    )));
1076                }
1077                let membership = RoleMembership::new();
1078                let vars = RoleVars::default();
1079                let (id, _) = tx.insert_user_role(
1080                    name.clone(),
1081                    attributes.clone(),
1082                    membership.clone(),
1083                    vars.clone(),
1084                    &state.get_temporary_oids().collect(),
1085                )?;
1086                CatalogState::add_to_audit_log(
1087                    &state.system_configuration,
1088                    oracle_write_ts,
1089                    session,
1090                    tx,
1091                    audit_events,
1092                    EventType::Create,
1093                    ObjectType::Role,
1094                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1095                        id: id.to_string(),
1096                        name: name.clone(),
1097                    }),
1098                )?;
1099                info!("create role {}", name);
1100            }
1101            Op::CreateCluster {
1102                id,
1103                name,
1104                introspection_sources,
1105                owner_id,
1106                config,
1107            } => {
1108                if is_reserved_name(&name) {
1109                    return Err(AdapterError::Catalog(Error::new(
1110                        ErrorKind::ReservedClusterName(name),
1111                    )));
1112                }
1113                let owner_privileges = vec![rbac::owner_privilege(
1114                    mz_sql::catalog::ObjectType::Cluster,
1115                    owner_id,
1116                )];
1117                let default_privileges = state
1118                    .default_privileges
1119                    .get_applicable_privileges(
1120                        owner_id,
1121                        None,
1122                        None,
1123                        mz_sql::catalog::ObjectType::Cluster,
1124                    )
1125                    .map(|item| item.mz_acl_item(owner_id));
1126                let privileges: Vec<_> =
1127                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1128                        .collect();
1129                let introspection_source_ids: Vec<_> = introspection_sources
1130                    .iter()
1131                    .map(|introspection_source| {
1132                        Transaction::allocate_introspection_source_index_id(
1133                            &id,
1134                            introspection_source.variant,
1135                        )
1136                    })
1137                    .collect();
1138
1139                let introspection_sources = introspection_sources
1140                    .into_iter()
1141                    .zip_eq(introspection_source_ids)
1142                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1143                    .collect();
1144
1145                tx.insert_user_cluster(
1146                    id,
1147                    &name,
1148                    introspection_sources,
1149                    owner_id,
1150                    privileges.clone(),
1151                    config.clone().into(),
1152                    &state.get_temporary_oids().collect(),
1153                )?;
1154                CatalogState::add_to_audit_log(
1155                    &state.system_configuration,
1156                    oracle_write_ts,
1157                    session,
1158                    tx,
1159                    audit_events,
1160                    EventType::Create,
1161                    ObjectType::Cluster,
1162                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1163                        id: id.to_string(),
1164                        name: name.clone(),
1165                    }),
1166                )?;
1167                info!("create cluster {}", name);
1168            }
1169            Op::CreateClusterReplica {
1170                cluster_id,
1171                name,
1172                config,
1173                owner_id,
1174                reason,
1175            } => {
1176                if is_reserved_name(&name) {
1177                    return Err(AdapterError::Catalog(Error::new(
1178                        ErrorKind::ReservedReplicaName(name),
1179                    )));
1180                }
1181                let cluster = state.get_cluster(cluster_id);
1182                let id =
1183                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1184                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1185                    size,
1186                    billed_as,
1187                    internal,
1188                    ..
1189                }) = &config.location
1190                {
1191                    let (reason, scheduling_policies) = reason.into_audit_log();
1192                    let details = EventDetails::CreateClusterReplicaV4(
1193                        mz_audit_log::CreateClusterReplicaV4 {
1194                            cluster_id: cluster_id.to_string(),
1195                            cluster_name: cluster.name.clone(),
1196                            replica_id: Some(id.to_string()),
1197                            replica_name: name.clone(),
1198                            logical_size: size.clone(),
1199                            billed_as: billed_as.clone(),
1200                            internal: *internal,
1201                            reason,
1202                            scheduling_policies,
1203                        },
1204                    );
1205                    CatalogState::add_to_audit_log(
1206                        &state.system_configuration,
1207                        oracle_write_ts,
1208                        session,
1209                        tx,
1210                        audit_events,
1211                        EventType::Create,
1212                        ObjectType::ClusterReplica,
1213                        details,
1214                    )?;
1215                }
1216            }
1217            Op::CreateItem {
1218                id,
1219                name,
1220                item,
1221                owner_id,
1222            } => {
1223                state.check_unstable_dependencies(&item)?;
1224
1225                match &item {
1226                    CatalogItem::Table(table) => {
1227                        let gids: Vec<_> = table.global_ids().collect();
1228                        assert_eq!(gids.len(), 1);
1229                        storage_collections_to_create.extend(gids);
1230                    }
1231                    CatalogItem::Source(source) => {
1232                        storage_collections_to_create.insert(source.global_id());
1233                    }
1234                    CatalogItem::MaterializedView(mv) => {
1235                        let mv_gid = mv.global_id_writes();
1236                        if let Some(target_id) = mv.replacement_target {
1237                            let target_gid = state.get_entry(&target_id).latest_global_id();
1238                            let shard_id =
1239                                state.storage_metadata().get_collection_shard(target_gid)?;
1240                            storage_collections_to_register.insert(mv_gid, shard_id);
1241                        } else {
1242                            storage_collections_to_create.insert(mv_gid);
1243                        }
1244                    }
1245                    CatalogItem::ContinualTask(ct) => {
1246                        storage_collections_to_create.insert(ct.global_id());
1247                    }
1248                    CatalogItem::Sink(sink) => {
1249                        storage_collections_to_create.insert(sink.global_id());
1250                    }
1251                    CatalogItem::Log(_)
1252                    | CatalogItem::View(_)
1253                    | CatalogItem::Index(_)
1254                    | CatalogItem::Type(_)
1255                    | CatalogItem::Func(_)
1256                    | CatalogItem::Secret(_)
1257                    | CatalogItem::Connection(_) => (),
1258                }
1259
1260                let system_user = session.map_or(false, |s| s.user().is_system_user());
1261                if !system_user {
1262                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1263                        let cluster_name = state.clusters_by_id[&id].name.clone();
1264                        return Err(AdapterError::Catalog(Error::new(
1265                            ErrorKind::ReadOnlyCluster(cluster_name),
1266                        )));
1267                    }
1268                }
1269
1270                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1271                let default_privileges = state
1272                    .default_privileges
1273                    .get_applicable_privileges(
1274                        owner_id,
1275                        name.qualifiers.database_spec.id(),
1276                        Some(name.qualifiers.schema_spec.into()),
1277                        item.typ().into(),
1278                    )
1279                    .map(|item| item.mz_acl_item(owner_id));
1280                // mz_support can read all progress sources.
1281                let progress_source_privilege = if item.is_progress_source() {
1282                    Some(MzAclItem {
1283                        grantee: MZ_SUPPORT_ROLE_ID,
1284                        grantor: owner_id,
1285                        acl_mode: AclMode::SELECT,
1286                    })
1287                } else {
1288                    None
1289                };
1290                let privileges: Vec<_> = merge_mz_acl_items(
1291                    owner_privileges
1292                        .into_iter()
1293                        .chain(default_privileges)
1294                        .chain(progress_source_privilege),
1295                )
1296                .collect();
1297
1298                let temporary_oids = state.get_temporary_oids().collect();
1299
1300                if item.is_temporary() {
1301                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1302                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1303                    {
1304                        return Err(AdapterError::Catalog(Error::new(
1305                            ErrorKind::InvalidTemporarySchema,
1306                        )));
1307                    }
1308                    let oid = tx.allocate_oid(&temporary_oids)?;
1309
1310                    let schema_id = name.qualifiers.schema_spec.clone().into();
1311                    let item_type = item.typ();
1312                    let (create_sql, global_id, versions) = item.to_serialized();
1313
1314                    let item = TemporaryItem {
1315                        id,
1316                        oid,
1317                        global_id,
1318                        schema_id,
1319                        name: name.item.clone(),
1320                        create_sql,
1321                        conn_id: item.conn_id().cloned(),
1322                        owner_id,
1323                        privileges: privileges.clone(),
1324                        extra_versions: versions,
1325                    };
1326                    temporary_item_updates.push((item, StateDiff::Addition));
1327
1328                    info!(
1329                        "create temporary {} {} ({})",
1330                        item_type,
1331                        state.resolve_full_name(&name, None),
1332                        id
1333                    );
1334                } else {
1335                    if let Some(temp_id) =
1336                        item.uses()
1337                            .iter()
1338                            .find(|id| match state.try_get_entry(*id) {
1339                                Some(entry) => entry.item().is_temporary(),
1340                                None => temporary_ids.contains(id),
1341                            })
1342                    {
1343                        let temp_item = state.get_entry(temp_id);
1344                        return Err(AdapterError::Catalog(Error::new(
1345                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1346                        )));
1347                    }
1348                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1349                        && !system_user
1350                    {
1351                        let schema_name = state
1352                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1353                            .schema;
1354                        return Err(AdapterError::Catalog(Error::new(
1355                            ErrorKind::ReadOnlySystemSchema(schema_name),
1356                        )));
1357                    }
1358                    let schema_id = name.qualifiers.schema_spec.clone().into();
1359                    let item_type = item.typ();
1360                    let (create_sql, global_id, versions) = item.to_serialized();
1361                    tx.insert_user_item(
1362                        id,
1363                        global_id,
1364                        schema_id,
1365                        &name.item,
1366                        create_sql,
1367                        owner_id,
1368                        privileges.clone(),
1369                        &temporary_oids,
1370                        versions,
1371                    )?;
1372                    info!(
1373                        "create {} {} ({})",
1374                        item_type,
1375                        state.resolve_full_name(&name, None),
1376                        id
1377                    );
1378                }
1379
1380                if Self::should_audit_log_item(&item) {
1381                    let name = Self::full_name_detail(
1382                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1383                    );
1384                    let details = match &item {
1385                        CatalogItem::Source(s) => {
1386                            let cluster_id = match s.data_source {
1387                                // Ingestion exports don't have their own cluster, but
1388                                // run on their ingestion's cluster.
1389                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1390                                    match state.get_entry(&ingestion_id).cluster_id() {
1391                                        Some(cluster_id) => Some(cluster_id.to_string()),
1392                                        None => None,
1393                                    }
1394                                }
1395                                _ => match item.cluster_id() {
1396                                    Some(cluster_id) => Some(cluster_id.to_string()),
1397                                    None => None,
1398                                },
1399                            };
1400
1401                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1402                                id: id.to_string(),
1403                                cluster_id,
1404                                name,
1405                                external_type: s.source_type().to_string(),
1406                            })
1407                        }
1408                        CatalogItem::Sink(s) => {
1409                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1410                                id: id.to_string(),
1411                                cluster_id: Some(s.cluster_id.to_string()),
1412                                name,
1413                                external_type: s.sink_type().to_string(),
1414                            })
1415                        }
1416                        CatalogItem::Index(i) => {
1417                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1418                                id: id.to_string(),
1419                                name,
1420                                cluster_id: i.cluster_id.to_string(),
1421                            })
1422                        }
1423                        CatalogItem::MaterializedView(mv) => {
1424                            EventDetails::CreateMaterializedViewV1(
1425                                mz_audit_log::CreateMaterializedViewV1 {
1426                                    id: id.to_string(),
1427                                    name,
1428                                    cluster_id: mv.cluster_id.to_string(),
1429                                    replacement_target_id: mv
1430                                        .replacement_target
1431                                        .map(|id| id.to_string()),
1432                                },
1433                            )
1434                        }
1435                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1436                            id: id.to_string(),
1437                            name,
1438                        }),
1439                    };
1440                    CatalogState::add_to_audit_log(
1441                        &state.system_configuration,
1442                        oracle_write_ts,
1443                        session,
1444                        tx,
1445                        audit_events,
1446                        EventType::Create,
1447                        catalog_type_to_audit_object_type(item.typ()),
1448                        details,
1449                    )?;
1450                }
1451            }
1452            Op::CreateNetworkPolicy {
1453                rules,
1454                name,
1455                owner_id,
1456            } => {
1457                if state.network_policies_by_name.contains_key(&name) {
1458                    return Err(AdapterError::PlanError(PlanError::Catalog(
1459                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1460                    )));
1461                }
1462                if is_reserved_name(&name) {
1463                    return Err(AdapterError::Catalog(Error::new(
1464                        ErrorKind::ReservedNetworkPolicyName(name),
1465                    )));
1466                }
1467
1468                let owner_privileges = vec![rbac::owner_privilege(
1469                    mz_sql::catalog::ObjectType::NetworkPolicy,
1470                    owner_id,
1471                )];
1472                let default_privileges = state
1473                    .default_privileges
1474                    .get_applicable_privileges(
1475                        owner_id,
1476                        None,
1477                        None,
1478                        mz_sql::catalog::ObjectType::NetworkPolicy,
1479                    )
1480                    .map(|item| item.mz_acl_item(owner_id));
1481                let privileges: Vec<_> =
1482                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1483                        .collect();
1484
1485                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1486                let id = tx.insert_user_network_policy(
1487                    name.clone(),
1488                    rules,
1489                    privileges,
1490                    owner_id,
1491                    &temporary_oids,
1492                )?;
1493
1494                CatalogState::add_to_audit_log(
1495                    &state.system_configuration,
1496                    oracle_write_ts,
1497                    session,
1498                    tx,
1499                    audit_events,
1500                    EventType::Create,
1501                    ObjectType::NetworkPolicy,
1502                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1503                        id: id.to_string(),
1504                        name: name.clone(),
1505                    }),
1506                )?;
1507
1508                info!("created network policy {name} ({id})");
1509            }
1510            Op::Comment {
1511                object_id,
1512                sub_component,
1513                comment,
1514            } => {
1515                tx.update_comment(object_id, sub_component, comment)?;
1516                let entry = state.get_comment_id_entry(&object_id);
1517                let should_log = entry
1518                    .map(|entry| Self::should_audit_log_item(entry.item()))
1519                    // Things that aren't catalog entries can't be temp, so should be logged.
1520                    .unwrap_or(true);
1521                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1522                // comments won't be logged for now.
1523                if let (Some(conn_id), true) =
1524                    (session.map(|session| session.conn_id()), should_log)
1525                {
1526                    CatalogState::add_to_audit_log(
1527                        &state.system_configuration,
1528                        oracle_write_ts,
1529                        session,
1530                        tx,
1531                        audit_events,
1532                        EventType::Comment,
1533                        comment_id_to_audit_object_type(object_id),
1534                        EventDetails::IdNameV1(IdNameV1 {
1535                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1536                            id: format!("{object_id:?}"),
1537                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1538                        }),
1539                    )?;
1540                }
1541            }
1542            Op::UpdateSourceReferences {
1543                source_id,
1544                references,
1545            } => {
1546                tx.update_source_references(
1547                    source_id,
1548                    references
1549                        .references
1550                        .into_iter()
1551                        .map(|reference| reference.into())
1552                        .collect(),
1553                    references.updated_at,
1554                )?;
1555            }
1556            Op::DropObjects(drop_object_infos) => {
1557                // Generate all of the objects that need to get dropped.
1558                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1559
1560                // Drop any associated comments.
1561                tx.drop_comments(&delta.comments)?;
1562
1563                // Drop any items.
1564                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1565                    delta
1566                        .items
1567                        .iter()
1568                        .map(|id| id)
1569                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1570                tx.remove_items(&durable_items_to_drop)?;
1571                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1572                    let entry = state.get_entry(&id);
1573                    (entry.clone().into(), StateDiff::Retraction)
1574                }));
1575
1576                for item_id in delta.items {
1577                    let entry = state.get_entry(&item_id);
1578
1579                    // Drop associated storage collections, unless the dropped item is a
1580                    // replacement, in which case the replacement target owns the storage
1581                    // collection.
1582                    if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1583                    {
1584                        storage_collections_to_drop.extend(entry.global_ids());
1585                    }
1586
1587                    if state.source_references.contains_key(&item_id) {
1588                        tx.remove_source_references(item_id)?;
1589                    }
1590
1591                    if Self::should_audit_log_item(entry.item()) {
1592                        CatalogState::add_to_audit_log(
1593                            &state.system_configuration,
1594                            oracle_write_ts,
1595                            session,
1596                            tx,
1597                            audit_events,
1598                            EventType::Drop,
1599                            catalog_type_to_audit_object_type(entry.item().typ()),
1600                            EventDetails::IdFullNameV1(IdFullNameV1 {
1601                                id: item_id.to_string(),
1602                                name: Self::full_name_detail(&state.resolve_full_name(
1603                                    entry.name(),
1604                                    session.map(|session| session.conn_id()),
1605                                )),
1606                            }),
1607                        )?;
1608                    }
1609                    info!(
1610                        "drop {} {} ({})",
1611                        entry.item_type(),
1612                        state.resolve_full_name(entry.name(), entry.conn_id()),
1613                        item_id
1614                    );
1615                }
1616
1617                // Drop any schemas.
1618                let schemas = delta
1619                    .schemas
1620                    .iter()
1621                    .map(|(schema_spec, database_spec)| {
1622                        (SchemaId::from(schema_spec), *database_spec)
1623                    })
1624                    .collect();
1625                tx.remove_schemas(&schemas)?;
1626
1627                for (schema_spec, database_spec) in delta.schemas {
1628                    let schema = state.get_schema(
1629                        &database_spec,
1630                        &schema_spec,
1631                        session
1632                            .map(|session| session.conn_id())
1633                            .unwrap_or(&SYSTEM_CONN_ID),
1634                    );
1635
1636                    let schema_id = SchemaId::from(schema_spec);
1637                    let database_id = match database_spec {
1638                        ResolvedDatabaseSpecifier::Ambient => None,
1639                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1640                    };
1641
1642                    CatalogState::add_to_audit_log(
1643                        &state.system_configuration,
1644                        oracle_write_ts,
1645                        session,
1646                        tx,
1647                        audit_events,
1648                        EventType::Drop,
1649                        ObjectType::Schema,
1650                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1651                            id: schema_id.to_string(),
1652                            name: schema.name.schema.to_string(),
1653                            database_name: database_id
1654                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1655                        }),
1656                    )?;
1657                }
1658
1659                // Drop any databases.
1660                tx.remove_databases(&delta.databases)?;
1661
1662                for database_id in delta.databases {
1663                    let database = state.get_database(&database_id).clone();
1664
1665                    CatalogState::add_to_audit_log(
1666                        &state.system_configuration,
1667                        oracle_write_ts,
1668                        session,
1669                        tx,
1670                        audit_events,
1671                        EventType::Drop,
1672                        ObjectType::Database,
1673                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1674                            id: database_id.to_string(),
1675                            name: database.name.clone(),
1676                        }),
1677                    )?;
1678                }
1679
1680                // Drop any roles.
1681                tx.remove_user_roles(&delta.roles)?;
1682
1683                for role_id in delta.roles {
1684                    let role = state
1685                        .roles_by_id
1686                        .get(&role_id)
1687                        .expect("catalog out of sync");
1688
1689                    CatalogState::add_to_audit_log(
1690                        &state.system_configuration,
1691                        oracle_write_ts,
1692                        session,
1693                        tx,
1694                        audit_events,
1695                        EventType::Drop,
1696                        ObjectType::Role,
1697                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1698                            id: role.id.to_string(),
1699                            name: role.name.clone(),
1700                        }),
1701                    )?;
1702                    info!("drop role {}", role.name());
1703                }
1704
1705                // Drop any network policies.
1706                tx.remove_network_policies(&delta.network_policies)?;
1707
1708                for network_policy_id in delta.network_policies {
1709                    let policy = state
1710                        .network_policies_by_id
1711                        .get(&network_policy_id)
1712                        .expect("catalog out of sync");
1713
1714                    CatalogState::add_to_audit_log(
1715                        &state.system_configuration,
1716                        oracle_write_ts,
1717                        session,
1718                        tx,
1719                        audit_events,
1720                        EventType::Drop,
1721                        ObjectType::NetworkPolicy,
1722                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1723                            id: policy.id.to_string(),
1724                            name: policy.name.clone(),
1725                        }),
1726                    )?;
1727                    info!("drop network policy {}", policy.name.clone());
1728                }
1729
1730                // Drop any replicas.
1731                let replicas = delta.replicas.keys().copied().collect();
1732                tx.remove_cluster_replicas(&replicas)?;
1733
1734                for (replica_id, (cluster_id, reason)) in delta.replicas {
1735                    let cluster = state.get_cluster(cluster_id);
1736                    let replica = cluster.replica(replica_id).expect("Must exist");
1737
1738                    let (reason, scheduling_policies) = reason.into_audit_log();
1739                    let details =
1740                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1741                            cluster_id: cluster_id.to_string(),
1742                            cluster_name: cluster.name.clone(),
1743                            replica_id: Some(replica_id.to_string()),
1744                            replica_name: replica.name.clone(),
1745                            reason,
1746                            scheduling_policies,
1747                        });
1748                    CatalogState::add_to_audit_log(
1749                        &state.system_configuration,
1750                        oracle_write_ts,
1751                        session,
1752                        tx,
1753                        audit_events,
1754                        EventType::Drop,
1755                        ObjectType::ClusterReplica,
1756                        details,
1757                    )?;
1758                }
1759
1760                // Drop any clusters.
1761                tx.remove_clusters(&delta.clusters)?;
1762
1763                for cluster_id in delta.clusters {
1764                    let cluster = state.get_cluster(cluster_id);
1765
1766                    CatalogState::add_to_audit_log(
1767                        &state.system_configuration,
1768                        oracle_write_ts,
1769                        session,
1770                        tx,
1771                        audit_events,
1772                        EventType::Drop,
1773                        ObjectType::Cluster,
1774                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1775                            id: cluster.id.to_string(),
1776                            name: cluster.name.clone(),
1777                        }),
1778                    )?;
1779                }
1780            }
1781            Op::GrantRole {
1782                role_id,
1783                member_id,
1784                grantor_id,
1785            } => {
1786                state.ensure_not_reserved_role(&member_id)?;
1787                state.ensure_grantable_role(&role_id)?;
1788                if state.collect_role_membership(&role_id).contains(&member_id) {
1789                    let group_role = state.get_role(&role_id);
1790                    let member_role = state.get_role(&member_id);
1791                    return Err(AdapterError::Catalog(Error::new(
1792                        ErrorKind::CircularRoleMembership {
1793                            role_name: group_role.name().to_string(),
1794                            member_name: member_role.name().to_string(),
1795                        },
1796                    )));
1797                }
1798                let mut member_role = state.get_role(&member_id).clone();
1799                member_role.membership.map.insert(role_id, grantor_id);
1800                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1801
1802                CatalogState::add_to_audit_log(
1803                    &state.system_configuration,
1804                    oracle_write_ts,
1805                    session,
1806                    tx,
1807                    audit_events,
1808                    EventType::Grant,
1809                    ObjectType::Role,
1810                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1811                        role_id: role_id.to_string(),
1812                        member_id: member_id.to_string(),
1813                        grantor_id: grantor_id.to_string(),
1814                        executed_by: session
1815                            .map(|session| session.authenticated_role_id())
1816                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1817                            .to_string(),
1818                    }),
1819                )?;
1820            }
1821            Op::RevokeRole {
1822                role_id,
1823                member_id,
1824                grantor_id,
1825            } => {
1826                state.ensure_not_reserved_role(&member_id)?;
1827                state.ensure_grantable_role(&role_id)?;
1828                let mut member_role = state.get_role(&member_id).clone();
1829                member_role.membership.map.remove(&role_id);
1830                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1831
1832                CatalogState::add_to_audit_log(
1833                    &state.system_configuration,
1834                    oracle_write_ts,
1835                    session,
1836                    tx,
1837                    audit_events,
1838                    EventType::Revoke,
1839                    ObjectType::Role,
1840                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1841                        role_id: role_id.to_string(),
1842                        member_id: member_id.to_string(),
1843                        grantor_id: grantor_id.to_string(),
1844                        executed_by: session
1845                            .map(|session| session.authenticated_role_id())
1846                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1847                            .to_string(),
1848                    }),
1849                )?;
1850            }
1851            Op::UpdatePrivilege {
1852                target_id,
1853                privilege,
1854                variant,
1855            } => {
1856                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1857                    UpdatePrivilegeVariant::Grant => {
1858                        privileges.grant(privilege);
1859                    }
1860                    UpdatePrivilegeVariant::Revoke => {
1861                        privileges.revoke(&privilege);
1862                    }
1863                };
1864                match &target_id {
1865                    SystemObjectId::Object(object_id) => match object_id {
1866                        ObjectId::Cluster(id) => {
1867                            let mut cluster = state.get_cluster(*id).clone();
1868                            update_privilege_fn(&mut cluster.privileges);
1869                            tx.update_cluster(*id, cluster.into())?;
1870                        }
1871                        ObjectId::Database(id) => {
1872                            let mut database = state.get_database(id).clone();
1873                            update_privilege_fn(&mut database.privileges);
1874                            tx.update_database(*id, database.into())?;
1875                        }
1876                        ObjectId::NetworkPolicy(id) => {
1877                            let mut policy = state.get_network_policy(id).clone();
1878                            update_privilege_fn(&mut policy.privileges);
1879                            tx.update_network_policy(*id, policy.into())?;
1880                        }
1881                        ObjectId::Schema((database_spec, schema_spec)) => {
1882                            let schema_id = schema_spec.clone().into();
1883                            let mut schema = state
1884                                .get_schema(
1885                                    database_spec,
1886                                    schema_spec,
1887                                    session
1888                                        .map(|session| session.conn_id())
1889                                        .unwrap_or(&SYSTEM_CONN_ID),
1890                                )
1891                                .clone();
1892                            update_privilege_fn(&mut schema.privileges);
1893                            tx.update_schema(schema_id, schema.into())?;
1894                        }
1895                        ObjectId::Item(id) => {
1896                            let entry = state.get_entry(id);
1897                            let mut new_entry = entry.clone();
1898                            update_privilege_fn(&mut new_entry.privileges);
1899                            if !new_entry.item().is_temporary() {
1900                                tx.update_item(*id, new_entry.into())?;
1901                            } else {
1902                                temporary_item_updates
1903                                    .push((entry.clone().into(), StateDiff::Retraction));
1904                                temporary_item_updates
1905                                    .push((new_entry.into(), StateDiff::Addition));
1906                            }
1907                        }
1908                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1909                    },
1910                    SystemObjectId::System => {
1911                        let mut system_privileges = state.system_privileges.clone();
1912                        update_privilege_fn(&mut system_privileges);
1913                        let new_privilege =
1914                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1915                        tx.set_system_privilege(
1916                            privilege.grantee,
1917                            privilege.grantor,
1918                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1919                        )?;
1920                    }
1921                }
1922                let object_type = state.get_system_object_type(&target_id);
1923                let object_id_str = match &target_id {
1924                    SystemObjectId::System => "SYSTEM".to_string(),
1925                    SystemObjectId::Object(id) => id.to_string(),
1926                };
1927                CatalogState::add_to_audit_log(
1928                    &state.system_configuration,
1929                    oracle_write_ts,
1930                    session,
1931                    tx,
1932                    audit_events,
1933                    variant.into(),
1934                    system_object_type_to_audit_object_type(&object_type),
1935                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1936                        object_id: object_id_str,
1937                        grantee_id: privilege.grantee.to_string(),
1938                        grantor_id: privilege.grantor.to_string(),
1939                        privileges: privilege.acl_mode.to_string(),
1940                    }),
1941                )?;
1942            }
1943            Op::UpdateDefaultPrivilege {
1944                privilege_object,
1945                privilege_acl_item,
1946                variant,
1947            } => {
1948                let mut default_privileges = state.default_privileges.clone();
1949                match variant {
1950                    UpdatePrivilegeVariant::Grant => default_privileges
1951                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1952                    UpdatePrivilegeVariant::Revoke => {
1953                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1954                    }
1955                }
1956                let new_acl_mode = default_privileges
1957                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1958                tx.set_default_privilege(
1959                    privilege_object.role_id,
1960                    privilege_object.database_id,
1961                    privilege_object.schema_id,
1962                    privilege_object.object_type,
1963                    privilege_acl_item.grantee,
1964                    new_acl_mode.cloned(),
1965                )?;
1966                CatalogState::add_to_audit_log(
1967                    &state.system_configuration,
1968                    oracle_write_ts,
1969                    session,
1970                    tx,
1971                    audit_events,
1972                    variant.into(),
1973                    object_type_to_audit_object_type(privilege_object.object_type),
1974                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1975                        role_id: privilege_object.role_id.to_string(),
1976                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1977                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1978                        grantee_id: privilege_acl_item.grantee.to_string(),
1979                        privileges: privilege_acl_item.acl_mode.to_string(),
1980                    }),
1981                )?;
1982            }
1983            Op::RenameCluster {
1984                id,
1985                name,
1986                to_name,
1987                check_reserved_names,
1988            } => {
1989                if id.is_system() {
1990                    return Err(AdapterError::Catalog(Error::new(
1991                        ErrorKind::ReadOnlyCluster(name.clone()),
1992                    )));
1993                }
1994                if check_reserved_names && is_reserved_name(&to_name) {
1995                    return Err(AdapterError::Catalog(Error::new(
1996                        ErrorKind::ReservedClusterName(to_name),
1997                    )));
1998                }
1999                tx.rename_cluster(id, &name, &to_name)?;
2000                CatalogState::add_to_audit_log(
2001                    &state.system_configuration,
2002                    oracle_write_ts,
2003                    session,
2004                    tx,
2005                    audit_events,
2006                    EventType::Alter,
2007                    ObjectType::Cluster,
2008                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2009                        id: id.to_string(),
2010                        old_name: name.clone(),
2011                        new_name: to_name.clone(),
2012                    }),
2013                )?;
2014                info!("rename cluster {name} to {to_name}");
2015            }
2016            Op::RenameClusterReplica {
2017                cluster_id,
2018                replica_id,
2019                name,
2020                to_name,
2021            } => {
2022                if is_reserved_name(&to_name) {
2023                    return Err(AdapterError::Catalog(Error::new(
2024                        ErrorKind::ReservedReplicaName(to_name),
2025                    )));
2026                }
2027                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2028                CatalogState::add_to_audit_log(
2029                    &state.system_configuration,
2030                    oracle_write_ts,
2031                    session,
2032                    tx,
2033                    audit_events,
2034                    EventType::Alter,
2035                    ObjectType::ClusterReplica,
2036                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2037                        cluster_id: cluster_id.to_string(),
2038                        replica_id: replica_id.to_string(),
2039                        old_name: name.replica.as_str().to_string(),
2040                        new_name: to_name.clone(),
2041                    }),
2042                )?;
2043                info!("rename cluster replica {name} to {to_name}");
2044            }
2045            Op::RenameItem {
2046                id,
2047                to_name,
2048                current_full_name,
2049            } => {
2050                let mut updates = Vec::new();
2051
2052                let entry = state.get_entry(&id);
2053                if let CatalogItem::Type(_) = entry.item() {
2054                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2055                        current_full_name.to_string(),
2056                    ))));
2057                }
2058
2059                if entry.id().is_system() {
2060                    let name = state
2061                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2062                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2063                        name.to_string(),
2064                    ))));
2065                }
2066
2067                let mut to_full_name = current_full_name.clone();
2068                to_full_name.item.clone_from(&to_name);
2069
2070                let mut to_qualified_name = entry.name().clone();
2071                to_qualified_name.item.clone_from(&to_name);
2072
2073                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2074                    id: id.to_string(),
2075                    old_name: Self::full_name_detail(&current_full_name),
2076                    new_name: Self::full_name_detail(&to_full_name),
2077                });
2078                if Self::should_audit_log_item(entry.item()) {
2079                    CatalogState::add_to_audit_log(
2080                        &state.system_configuration,
2081                        oracle_write_ts,
2082                        session,
2083                        tx,
2084                        audit_events,
2085                        EventType::Alter,
2086                        catalog_type_to_audit_object_type(entry.item().typ()),
2087                        details,
2088                    )?;
2089                }
2090
2091                // Rename item itself.
2092                let mut new_entry = entry.clone();
2093                new_entry.name.item.clone_from(&to_name);
2094                new_entry.item = entry
2095                    .item()
2096                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2097                    .map_err(|e| {
2098                        Error::new(ErrorKind::from(AmbiguousRename {
2099                            depender: state
2100                                .resolve_full_name(entry.name(), entry.conn_id())
2101                                .to_string(),
2102                            dependee: state
2103                                .resolve_full_name(entry.name(), entry.conn_id())
2104                                .to_string(),
2105                            message: e,
2106                        }))
2107                    })?;
2108
2109                for id in entry.referenced_by() {
2110                    let dependent_item = state.get_entry(id);
2111                    let mut to_entry = dependent_item.clone();
2112                    to_entry.item = dependent_item
2113                        .item()
2114                        .rename_item_refs(
2115                            current_full_name.clone(),
2116                            to_full_name.item.clone(),
2117                            false,
2118                        )
2119                        .map_err(|e| {
2120                            Error::new(ErrorKind::from(AmbiguousRename {
2121                                depender: state
2122                                    .resolve_full_name(
2123                                        dependent_item.name(),
2124                                        dependent_item.conn_id(),
2125                                    )
2126                                    .to_string(),
2127                                dependee: state
2128                                    .resolve_full_name(entry.name(), entry.conn_id())
2129                                    .to_string(),
2130                                message: e,
2131                            }))
2132                        })?;
2133
2134                    if !to_entry.item().is_temporary() {
2135                        tx.update_item(*id, to_entry.into())?;
2136                    } else {
2137                        temporary_item_updates
2138                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2139                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2140                    }
2141                    updates.push(*id);
2142                }
2143                if !new_entry.item().is_temporary() {
2144                    tx.update_item(id, new_entry.into())?;
2145                } else {
2146                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2147                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2148                }
2149
2150                updates.push(id);
2151                for id in updates {
2152                    Self::log_update(state, &id);
2153                }
2154            }
2155            Op::RenameSchema {
2156                database_spec,
2157                schema_spec,
2158                new_name,
2159                check_reserved_names,
2160            } => {
2161                if check_reserved_names && is_reserved_name(&new_name) {
2162                    return Err(AdapterError::Catalog(Error::new(
2163                        ErrorKind::ReservedSchemaName(new_name),
2164                    )));
2165                }
2166
2167                let conn_id = session
2168                    .map(|session| session.conn_id())
2169                    .unwrap_or(&SYSTEM_CONN_ID);
2170
2171                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2172                let cur_name = schema.name().schema.clone();
2173
2174                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2175                    return Err(AdapterError::Catalog(Error::new(
2176                        ErrorKind::AmbientSchemaRename(cur_name),
2177                    )));
2178                };
2179                let database = state.get_database(&database_id);
2180                let database_name = &database.name;
2181
2182                let mut updates = Vec::new();
2183                let mut items_to_update = BTreeMap::new();
2184
2185                let mut update_item = |id| {
2186                    if items_to_update.contains_key(id) {
2187                        return Ok(());
2188                    }
2189
2190                    let entry = state.get_entry(id);
2191
2192                    // Update our item.
2193                    let mut new_entry = entry.clone();
2194                    new_entry.item = entry
2195                        .item
2196                        .rename_schema_refs(database_name, &cur_name, &new_name)
2197                        .map_err(|(s, _i)| {
2198                            Error::new(ErrorKind::from(AmbiguousRename {
2199                                depender: state
2200                                    .resolve_full_name(entry.name(), entry.conn_id())
2201                                    .to_string(),
2202                                dependee: format!("{database_name}.{cur_name}"),
2203                                message: format!("ambiguous reference to schema named {s}"),
2204                            }))
2205                        })?;
2206
2207                    // Queue updates for Catalog storage and Builtin Tables.
2208                    if !new_entry.item().is_temporary() {
2209                        items_to_update.insert(*id, new_entry.into());
2210                    } else {
2211                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2212                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2213                    }
2214                    updates.push(id);
2215
2216                    Ok::<_, AdapterError>(())
2217                };
2218
2219                // Update all of the items in the schema.
2220                for (_name, item_id) in &schema.items {
2221                    // Update the item itself.
2222                    update_item(item_id)?;
2223
2224                    // Update everything that depends on this item.
2225                    for id in state.get_entry(item_id).referenced_by() {
2226                        update_item(id)?;
2227                    }
2228                }
2229                // Note: When updating the transaction it's very important that we update the
2230                // items as a whole group, otherwise we exhibit quadratic behavior.
2231                tx.update_items(items_to_update)?;
2232
2233                // Renaming temporary schemas is not supported.
2234                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2235                    let schema_name = schema.name().schema.clone();
2236                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2237                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2238                    )));
2239                };
2240
2241                // Add an entry to the audit log.
2242                let database_name = database_spec
2243                    .id()
2244                    .map(|id| state.get_database(&id).name.clone());
2245                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2246                    id: schema_id.to_string(),
2247                    old_name: schema.name().schema.clone(),
2248                    new_name: new_name.clone(),
2249                    database_name,
2250                });
2251                CatalogState::add_to_audit_log(
2252                    &state.system_configuration,
2253                    oracle_write_ts,
2254                    session,
2255                    tx,
2256                    audit_events,
2257                    EventType::Alter,
2258                    mz_audit_log::ObjectType::Schema,
2259                    details,
2260                )?;
2261
2262                // Update the schema itself.
2263                let mut new_schema = schema.clone();
2264                new_schema.name.schema.clone_from(&new_name);
2265                tx.update_schema(schema_id, new_schema.into())?;
2266
2267                for id in updates {
2268                    Self::log_update(state, id);
2269                }
2270            }
2271            Op::UpdateOwner { id, new_owner } => {
2272                let conn_id = session
2273                    .map(|session| session.conn_id())
2274                    .unwrap_or(&SYSTEM_CONN_ID);
2275                let old_owner = state
2276                    .get_owner_id(&id, conn_id)
2277                    .expect("cannot update the owner of an object without an owner");
2278                match &id {
2279                    ObjectId::Cluster(id) => {
2280                        let mut cluster = state.get_cluster(*id).clone();
2281                        if id.is_system() {
2282                            return Err(AdapterError::Catalog(Error::new(
2283                                ErrorKind::ReadOnlyCluster(cluster.name),
2284                            )));
2285                        }
2286                        Self::update_privilege_owners(
2287                            &mut cluster.privileges,
2288                            cluster.owner_id,
2289                            new_owner,
2290                        );
2291                        cluster.owner_id = new_owner;
2292                        tx.update_cluster(*id, cluster.into())?;
2293                    }
2294                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2295                        let cluster = state.get_cluster(*cluster_id);
2296                        let mut replica = cluster
2297                            .replica(*replica_id)
2298                            .expect("catalog out of sync")
2299                            .clone();
2300                        if replica_id.is_system() {
2301                            return Err(AdapterError::Catalog(Error::new(
2302                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2303                            )));
2304                        }
2305                        replica.owner_id = new_owner;
2306                        tx.update_cluster_replica(*replica_id, replica.into())?;
2307                    }
2308                    ObjectId::Database(id) => {
2309                        let mut database = state.get_database(id).clone();
2310                        if id.is_system() {
2311                            return Err(AdapterError::Catalog(Error::new(
2312                                ErrorKind::ReadOnlyDatabase(database.name),
2313                            )));
2314                        }
2315                        Self::update_privilege_owners(
2316                            &mut database.privileges,
2317                            database.owner_id,
2318                            new_owner,
2319                        );
2320                        database.owner_id = new_owner;
2321                        tx.update_database(*id, database.clone().into())?;
2322                    }
2323                    ObjectId::Schema((database_spec, schema_spec)) => {
2324                        let schema_id: SchemaId = schema_spec.clone().into();
2325                        let mut schema = state
2326                            .get_schema(database_spec, schema_spec, conn_id)
2327                            .clone();
2328                        if schema_id.is_system() {
2329                            let name = schema.name();
2330                            let full_name = state.resolve_full_schema_name(name);
2331                            return Err(AdapterError::Catalog(Error::new(
2332                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2333                            )));
2334                        }
2335                        Self::update_privilege_owners(
2336                            &mut schema.privileges,
2337                            schema.owner_id,
2338                            new_owner,
2339                        );
2340                        schema.owner_id = new_owner;
2341                        tx.update_schema(schema_id, schema.into())?;
2342                    }
2343                    ObjectId::Item(id) => {
2344                        let entry = state.get_entry(id);
2345                        let mut new_entry = entry.clone();
2346                        if id.is_system() {
2347                            let full_name = state.resolve_full_name(
2348                                new_entry.name(),
2349                                session.map(|session| session.conn_id()),
2350                            );
2351                            return Err(AdapterError::Catalog(Error::new(
2352                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2353                            )));
2354                        }
2355                        Self::update_privilege_owners(
2356                            &mut new_entry.privileges,
2357                            new_entry.owner_id,
2358                            new_owner,
2359                        );
2360                        new_entry.owner_id = new_owner;
2361                        if !new_entry.item().is_temporary() {
2362                            tx.update_item(*id, new_entry.into())?;
2363                        } else {
2364                            temporary_item_updates
2365                                .push((entry.clone().into(), StateDiff::Retraction));
2366                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2367                        }
2368                    }
2369                    ObjectId::NetworkPolicy(id) => {
2370                        let mut policy = state.get_network_policy(id).clone();
2371                        if id.is_system() {
2372                            return Err(AdapterError::Catalog(Error::new(
2373                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2374                            )));
2375                        }
2376                        Self::update_privilege_owners(
2377                            &mut policy.privileges,
2378                            policy.owner_id,
2379                            new_owner,
2380                        );
2381                        policy.owner_id = new_owner;
2382                        tx.update_network_policy(*id, policy.into())?;
2383                    }
2384                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2385                }
2386                let object_type = state.get_object_type(&id);
2387                CatalogState::add_to_audit_log(
2388                    &state.system_configuration,
2389                    oracle_write_ts,
2390                    session,
2391                    tx,
2392                    audit_events,
2393                    EventType::Alter,
2394                    object_type_to_audit_object_type(object_type),
2395                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2396                        object_id: id.to_string(),
2397                        old_owner_id: old_owner.to_string(),
2398                        new_owner_id: new_owner.to_string(),
2399                    }),
2400                )?;
2401            }
2402            Op::UpdateClusterConfig { id, name, config } => {
2403                let mut cluster = state.get_cluster(id).clone();
2404                cluster.config = config;
2405                tx.update_cluster(id, cluster.into())?;
2406                info!("update cluster {}", name);
2407
2408                CatalogState::add_to_audit_log(
2409                    &state.system_configuration,
2410                    oracle_write_ts,
2411                    session,
2412                    tx,
2413                    audit_events,
2414                    EventType::Alter,
2415                    ObjectType::Cluster,
2416                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2417                        id: id.to_string(),
2418                        name,
2419                    }),
2420                )?;
2421            }
2422            Op::UpdateClusterReplicaConfig {
2423                replica_id,
2424                cluster_id,
2425                config,
2426            } => {
2427                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2428                info!("update replica {}", replica.name);
2429                tx.update_cluster_replica(
2430                    replica_id,
2431                    mz_catalog::durable::ClusterReplica {
2432                        cluster_id,
2433                        replica_id,
2434                        name: replica.name.clone(),
2435                        config: config.clone().into(),
2436                        owner_id: replica.owner_id,
2437                    },
2438                )?;
2439            }
2440            Op::UpdateItem { id, name, to_item } => {
2441                let mut entry = state.get_entry(&id).clone();
2442                entry.name = name.clone();
2443                entry.item = to_item.clone();
2444                tx.update_item(id, entry.into())?;
2445
2446                if Self::should_audit_log_item(&to_item) {
2447                    let mut full_name = Self::full_name_detail(
2448                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2449                    );
2450                    full_name.item = name.item;
2451
2452                    CatalogState::add_to_audit_log(
2453                        &state.system_configuration,
2454                        oracle_write_ts,
2455                        session,
2456                        tx,
2457                        audit_events,
2458                        EventType::Alter,
2459                        catalog_type_to_audit_object_type(to_item.typ()),
2460                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2461                            id: id.to_string(),
2462                            name: full_name,
2463                        }),
2464                    )?;
2465                }
2466
2467                Self::log_update(state, &id);
2468            }
2469            Op::UpdateSystemConfiguration { name, value } => {
2470                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2471                tx.upsert_system_config(&name, parsed_value.clone())?;
2472                // This mirrors some "system vars" into the catalog storage
2473                // "config" collection so that we can toggle the flag with
2474                // Launch Darkly, but use it in boot before Launch Darkly is
2475                // available.
2476                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2477                    let with_0dt_deployment_max_wait =
2478                        Duration::parse(VarInput::Flat(&parsed_value))
2479                            .expect("parsing succeeded above");
2480                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2481                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2482                    let with_0dt_deployment_ddl_check_interval =
2483                        Duration::parse(VarInput::Flat(&parsed_value))
2484                            .expect("parsing succeeded above");
2485                    tx.set_0dt_deployment_ddl_check_interval(
2486                        with_0dt_deployment_ddl_check_interval,
2487                    )?;
2488                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2489                    let panic_after_timeout =
2490                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2491                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2492                }
2493
2494                CatalogState::add_to_audit_log(
2495                    &state.system_configuration,
2496                    oracle_write_ts,
2497                    session,
2498                    tx,
2499                    audit_events,
2500                    EventType::Alter,
2501                    ObjectType::System,
2502                    EventDetails::SetV1(mz_audit_log::SetV1 {
2503                        name,
2504                        value: Some(value.borrow().to_vec().join(", ")),
2505                    }),
2506                )?;
2507            }
2508            Op::ResetSystemConfiguration { name } => {
2509                tx.remove_system_config(&name);
2510                // This mirrors some "system vars" into the catalog storage
2511                // "config" collection so that we can toggle the flag with
2512                // Launch Darkly, but use it in boot before Launch Darkly is
2513                // available.
2514                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2515                    tx.reset_0dt_deployment_max_wait()?;
2516                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2517                    tx.reset_0dt_deployment_ddl_check_interval()?;
2518                }
2519
2520                CatalogState::add_to_audit_log(
2521                    &state.system_configuration,
2522                    oracle_write_ts,
2523                    session,
2524                    tx,
2525                    audit_events,
2526                    EventType::Alter,
2527                    ObjectType::System,
2528                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2529                )?;
2530            }
2531            Op::ResetAllSystemConfiguration => {
2532                tx.clear_system_configs();
2533                tx.reset_0dt_deployment_max_wait()?;
2534                tx.reset_0dt_deployment_ddl_check_interval()?;
2535
2536                CatalogState::add_to_audit_log(
2537                    &state.system_configuration,
2538                    oracle_write_ts,
2539                    session,
2540                    tx,
2541                    audit_events,
2542                    EventType::Alter,
2543                    ObjectType::System,
2544                    EventDetails::ResetAllV1,
2545                )?;
2546            }
2547            Op::WeirdStorageUsageUpdates {
2548                object_id,
2549                size_bytes,
2550                collection_timestamp,
2551            } => {
2552                let id = tx.allocate_storage_usage_ids()?;
2553                let metric =
2554                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2555                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2556                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2557                weird_builtin_table_update = Some(builtin_table_update);
2558            }
2559        };
2560        Ok((weird_builtin_table_update, temporary_item_updates))
2561    }
2562
2563    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2564        let entry = state.get_entry(id);
2565        info!(
2566            "update {} {} ({})",
2567            entry.item_type(),
2568            state.resolve_full_name(entry.name(), entry.conn_id()),
2569            id
2570        );
2571    }
2572
2573    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2574    /// implementation:
2575    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2576    fn update_privilege_owners(
2577        privileges: &mut PrivilegeMap,
2578        old_owner: RoleId,
2579        new_owner: RoleId,
2580    ) {
2581        // TODO(jkosh44) Would be nice not to clone every privilege.
2582        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2583
2584        let mut new_present = false;
2585        for privilege in flat_privileges.iter_mut() {
2586            // Old owner's granted privilege are updated to be granted by the new
2587            // owner.
2588            if privilege.grantor == old_owner {
2589                privilege.grantor = new_owner;
2590            } else if privilege.grantor == new_owner {
2591                new_present = true;
2592            }
2593            // Old owner's privileges is given to the new owner.
2594            if privilege.grantee == old_owner {
2595                privilege.grantee = new_owner;
2596            } else if privilege.grantee == new_owner {
2597                new_present = true;
2598            }
2599        }
2600
2601        // If the old privilege list contained references to the new owner, we may
2602        // have created duplicate entries. Here we try and consolidate them. This
2603        // is inspired by PostgreSQL's algorithm but not identical.
2604        if new_present {
2605            // Group privileges by (grantee, grantor).
2606            let privilege_map: BTreeMap<_, Vec<_>> =
2607                flat_privileges
2608                    .into_iter()
2609                    .fold(BTreeMap::new(), |mut accum, privilege| {
2610                        accum
2611                            .entry((privilege.grantee, privilege.grantor))
2612                            .or_default()
2613                            .push(privilege);
2614                        accum
2615                    });
2616
2617            // Consolidate and update all privileges.
2618            flat_privileges = privilege_map
2619                .into_iter()
2620                .map(|((grantee, grantor), values)|
2621                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2622                    values.into_iter().fold(
2623                        MzAclItem::empty(grantee, grantor),
2624                        |mut accum, mz_aclitem| {
2625                            accum.acl_mode =
2626                                accum.acl_mode.union(mz_aclitem.acl_mode);
2627                            accum
2628                        },
2629                    ))
2630                .collect();
2631        }
2632
2633        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2634    }
2635}
2636
2637/// Prepare the given transaction for replacing a catalog item with a new version.
2638///
2639/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2640/// dependent objects to refer to that new ID (at a previous version).
2641///
2642/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2643/// for catalog items. We currently think that there are no use cases that require this assumption,
2644/// but no way to know for sure.
2645fn tx_replace_item(
2646    tx: &mut Transaction<'_>,
2647    state: &CatalogState,
2648    id: CatalogItemId,
2649    new_entry: CatalogEntry,
2650) -> Result<(), AdapterError> {
2651    let new_id = new_entry.id;
2652
2653    // Rewrite dependent objects to point to the new ID.
2654    for use_id in new_entry.referenced_by() {
2655        // The dependent might be dropped in the same tx, so check.
2656        if tx.get_item(use_id).is_none() {
2657            continue;
2658        }
2659
2660        let mut dependent = state.get_entry(use_id).clone();
2661        dependent.item = dependent.item.replace_item_refs(id, new_id);
2662        tx.update_item(*use_id, dependent.into())?;
2663    }
2664
2665    // Move comments to the new ID.
2666    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2667    let new_comment_id = new_entry.comment_object_id();
2668    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2669        tx.drop_comments(&[old_comment_id].into())?;
2670        for (sub, comment) in comments {
2671            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2672        }
2673    }
2674
2675    let mz_catalog::durable::Item {
2676        id: _,
2677        oid,
2678        global_id,
2679        schema_id,
2680        name,
2681        create_sql,
2682        owner_id,
2683        privileges,
2684        extra_versions,
2685    } = new_entry.into();
2686
2687    tx.remove_item(id)?;
2688    tx.insert_item(
2689        new_id,
2690        oid,
2691        global_id,
2692        schema_id,
2693        &name,
2694        create_sql,
2695        owner_id,
2696        privileges,
2697        extra_versions,
2698    )?;
2699
2700    Ok(())
2701}
2702
2703/// Generate audit events for a replacement apply operation.
2704fn apply_replacement_audit_events(
2705    state: &CatalogState,
2706    target: &CatalogEntry,
2707    replacement: &CatalogEntry,
2708) -> Vec<(EventType, EventDetails)> {
2709    let mut events = Vec::new();
2710
2711    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2712    let target_id_name = IdFullNameV1 {
2713        id: target.id().to_string(),
2714        name: Catalog::full_name_detail(&target_name),
2715    };
2716    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2717    let replacement_id_name = IdFullNameV1 {
2718        id: replacement.id().to_string(),
2719        name: Catalog::full_name_detail(&replacement_name),
2720    };
2721
2722    if Catalog::should_audit_log_item(&replacement.item) {
2723        events.push((
2724            EventType::Drop,
2725            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2726        ));
2727    }
2728
2729    if Catalog::should_audit_log_item(&target.item) {
2730        events.push((
2731            EventType::Alter,
2732            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2733                target: target_id_name.clone(),
2734                replacement: replacement_id_name,
2735            }),
2736        ));
2737
2738        if let Some(old_cluster_id) = target.cluster_id()
2739            && let Some(new_cluster_id) = replacement.cluster_id()
2740            && old_cluster_id != new_cluster_id
2741        {
2742            // When the replacement is applied, the target takes on the ID of the replacement, so
2743            // we should use that ID for subsequent events.
2744            events.push((
2745                EventType::Alter,
2746                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2747                    id: replacement.id().to_string(),
2748                    name: target_id_name.name,
2749                    old_cluster_id: old_cluster_id.to_string(),
2750                    new_cluster_id: new_cluster_id.to_string(),
2751                }),
2752            ));
2753        }
2754    }
2755
2756    events
2757}
2758
2759/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2760///
2761/// Note: Previously we used to omit a single `Op::DropObject` for every object
2762/// we needed to drop. But removing a batch of objects from a durable Catalog
2763/// Transaction is O(n) where `n` is the number of objects that exist in the
2764/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2765/// `DROP ... CASCADE` statement.
2766#[derive(Debug, Default)]
2767pub(crate) struct ObjectsToDrop {
2768    pub comments: BTreeSet<CommentObjectId>,
2769    pub databases: BTreeSet<DatabaseId>,
2770    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2771    pub clusters: BTreeSet<ClusterId>,
2772    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2773    pub roles: BTreeSet<RoleId>,
2774    pub items: Vec<CatalogItemId>,
2775    pub network_policies: BTreeSet<NetworkPolicyId>,
2776}
2777
2778impl ObjectsToDrop {
2779    pub fn generate(
2780        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2781        state: &CatalogState,
2782        session: Option<&ConnMeta>,
2783    ) -> Result<Self, AdapterError> {
2784        let mut delta = ObjectsToDrop::default();
2785
2786        for drop_object_info in drop_object_infos {
2787            delta.add_item(drop_object_info, state, session)?;
2788        }
2789
2790        Ok(delta)
2791    }
2792
2793    fn add_item(
2794        &mut self,
2795        drop_object_info: DropObjectInfo,
2796        state: &CatalogState,
2797        session: Option<&ConnMeta>,
2798    ) -> Result<(), AdapterError> {
2799        self.comments
2800            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2801
2802        match drop_object_info {
2803            DropObjectInfo::Database(database_id) => {
2804                let database = &state.database_by_id[&database_id];
2805                if database_id.is_system() {
2806                    return Err(AdapterError::Catalog(Error::new(
2807                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2808                    )));
2809                }
2810
2811                self.databases.insert(database_id);
2812            }
2813            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2814                let schema = state.get_schema(
2815                    &database_spec,
2816                    &schema_spec,
2817                    session
2818                        .map(|session| session.conn_id())
2819                        .unwrap_or(&SYSTEM_CONN_ID),
2820                );
2821                let schema_id: SchemaId = schema_spec.into();
2822                if schema_id.is_system() {
2823                    let name = schema.name();
2824                    let full_name = state.resolve_full_schema_name(name);
2825                    return Err(AdapterError::Catalog(Error::new(
2826                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2827                    )));
2828                }
2829
2830                self.schemas.insert(schema_spec, database_spec);
2831            }
2832            DropObjectInfo::Role(role_id) => {
2833                let name = state.get_role(&role_id).name().to_string();
2834                if role_id.is_system() || role_id.is_predefined() {
2835                    return Err(AdapterError::Catalog(Error::new(
2836                        ErrorKind::ReservedRoleName(name.clone()),
2837                    )));
2838                }
2839                state.ensure_not_reserved_role(&role_id)?;
2840
2841                self.roles.insert(role_id);
2842            }
2843            DropObjectInfo::Cluster(cluster_id) => {
2844                let cluster = state.get_cluster(cluster_id);
2845                let name = &cluster.name;
2846                if cluster_id.is_system() {
2847                    return Err(AdapterError::Catalog(Error::new(
2848                        ErrorKind::ReadOnlyCluster(name.clone()),
2849                    )));
2850                }
2851
2852                self.clusters.insert(cluster_id);
2853            }
2854            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2855                let cluster = state.get_cluster(cluster_id);
2856                let replica = cluster.replica(replica_id).expect("Must exist");
2857
2858                self.replicas
2859                    .insert(replica.replica_id, (cluster.id, reason));
2860            }
2861            DropObjectInfo::Item(item_id) => {
2862                let entry = state.get_entry(&item_id);
2863                if item_id.is_system() {
2864                    let name = entry.name();
2865                    let full_name =
2866                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2867                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2868                        full_name.to_string(),
2869                    ))));
2870                }
2871
2872                self.items.push(item_id);
2873            }
2874            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2875                let policy = state.get_network_policy(&network_policy_id);
2876                let name = &policy.name;
2877                if network_policy_id.is_system() {
2878                    return Err(AdapterError::Catalog(Error::new(
2879                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2880                    )));
2881                }
2882
2883                self.network_policies.insert(network_policy_id);
2884            }
2885        }
2886
2887        Ok(())
2888    }
2889}
2890
2891#[cfg(test)]
2892mod tests {
2893    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2894    use mz_repr::role_id::RoleId;
2895
2896    use crate::catalog::Catalog;
2897
2898    #[mz_ore::test]
2899    fn test_update_privilege_owners() {
2900        let old_owner = RoleId::User(1);
2901        let new_owner = RoleId::User(2);
2902        let other_role = RoleId::User(3);
2903
2904        // older owner exists as grantor.
2905        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2906            MzAclItem {
2907                grantee: other_role,
2908                grantor: old_owner,
2909                acl_mode: AclMode::UPDATE,
2910            },
2911            MzAclItem {
2912                grantee: other_role,
2913                grantor: new_owner,
2914                acl_mode: AclMode::SELECT,
2915            },
2916        ]);
2917        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2918        assert_eq!(1, privileges.all_values().count());
2919        assert_eq!(
2920            vec![MzAclItem {
2921                grantee: other_role,
2922                grantor: new_owner,
2923                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2924            }],
2925            privileges.all_values_owned().collect::<Vec<_>>()
2926        );
2927
2928        // older owner exists as grantee.
2929        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2930            MzAclItem {
2931                grantee: old_owner,
2932                grantor: other_role,
2933                acl_mode: AclMode::UPDATE,
2934            },
2935            MzAclItem {
2936                grantee: new_owner,
2937                grantor: other_role,
2938                acl_mode: AclMode::SELECT,
2939            },
2940        ]);
2941        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2942        assert_eq!(1, privileges.all_values().count());
2943        assert_eq!(
2944            vec![MzAclItem {
2945                grantee: new_owner,
2946                grantor: other_role,
2947                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2948            }],
2949            privileges.all_values_owned().collect::<Vec<_>>()
2950        );
2951
2952        // older owner exists as grantee and grantor.
2953        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2954            MzAclItem {
2955                grantee: old_owner,
2956                grantor: old_owner,
2957                acl_mode: AclMode::UPDATE,
2958            },
2959            MzAclItem {
2960                grantee: new_owner,
2961                grantor: new_owner,
2962                acl_mode: AclMode::SELECT,
2963            },
2964        ]);
2965        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2966        assert_eq!(1, privileges.all_values().count());
2967        assert_eq!(
2968            vec![MzAclItem {
2969                grantee: new_owner,
2970                grantor: new_owner,
2971                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2972            }],
2973            privileges.all_values_owned().collect::<Vec<_>>()
2974        );
2975    }
2976}