Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::optimize::OptimizerFeatures;
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, ColumnName, GlobalId, SqlColumnType, strconv};
47use mz_sql::ast::RawDataType;
48use mz_sql::catalog::{
49    AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
50    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
51    DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
52    RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use serde::{Deserialize, Serialize};
66use tracing::{info, trace};
67
68use crate::AdapterError;
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
72    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
73    is_reserved_role_name, object_type_to_audit_object_type,
74    system_object_type_to_audit_object_type,
75};
76use crate::coord::ConnMeta;
77use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
78use crate::coord::cluster_scheduling::SchedulingDecision;
79use crate::util::ResultExt;
80
81/// A manually injected audit event.
82///
83/// Matches [`mz_audit_log::EventV1`], but without the `id` and `occurred_at` fields -- both are
84/// filled in automatically.
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct InjectedAuditEvent {
87    pub event_type: EventType,
88    pub object_type: ObjectType,
89    pub details: EventDetails,
90    pub user: Option<String>,
91}
92
93#[derive(Debug, Clone)]
94pub enum Op {
95    AlterRetainHistory {
96        id: CatalogItemId,
97        value: Option<Value>,
98        window: CompactionWindow,
99    },
100    AlterSourceTimestampInterval {
101        id: CatalogItemId,
102        value: Option<Value>,
103        interval: Duration,
104    },
105    AlterRole {
106        id: RoleId,
107        name: String,
108        attributes: RoleAttributesRaw,
109        nopassword: bool,
110        vars: RoleVars,
111    },
112    AlterNetworkPolicy {
113        id: NetworkPolicyId,
114        rules: Vec<NetworkPolicyRule>,
115        name: String,
116        owner_id: RoleId,
117    },
118    AlterAddColumn {
119        id: CatalogItemId,
120        new_global_id: GlobalId,
121        name: ColumnName,
122        typ: SqlColumnType,
123        sql: RawDataType,
124    },
125    AlterMaterializedViewApplyReplacement {
126        id: CatalogItemId,
127        replacement_id: CatalogItemId,
128    },
129    CreateDatabase {
130        name: String,
131        owner_id: RoleId,
132    },
133    CreateSchema {
134        database_id: ResolvedDatabaseSpecifier,
135        schema_name: String,
136        owner_id: RoleId,
137    },
138    CreateRole {
139        name: String,
140        attributes: RoleAttributesRaw,
141    },
142    CreateCluster {
143        id: ClusterId,
144        name: String,
145        introspection_sources: Vec<&'static BuiltinLog>,
146        owner_id: RoleId,
147        config: ClusterConfig,
148    },
149    CreateClusterReplica {
150        cluster_id: ClusterId,
151        name: String,
152        config: ReplicaConfig,
153        owner_id: RoleId,
154        reason: ReplicaCreateDropReason,
155    },
156    CreateItem {
157        id: CatalogItemId,
158        name: QualifiedItemName,
159        item: CatalogItem,
160        owner_id: RoleId,
161    },
162    CreateNetworkPolicy {
163        rules: Vec<NetworkPolicyRule>,
164        name: String,
165        owner_id: RoleId,
166    },
167    Comment {
168        object_id: CommentObjectId,
169        sub_component: Option<usize>,
170        comment: Option<String>,
171    },
172    DropObjects(Vec<DropObjectInfo>),
173    GrantRole {
174        role_id: RoleId,
175        member_id: RoleId,
176        grantor_id: RoleId,
177    },
178    RenameCluster {
179        id: ClusterId,
180        name: String,
181        to_name: String,
182        check_reserved_names: bool,
183    },
184    RenameClusterReplica {
185        cluster_id: ClusterId,
186        replica_id: ReplicaId,
187        name: QualifiedReplica,
188        to_name: String,
189    },
190    RenameItem {
191        id: CatalogItemId,
192        current_full_name: FullItemName,
193        to_name: String,
194    },
195    RenameSchema {
196        database_spec: ResolvedDatabaseSpecifier,
197        schema_spec: SchemaSpecifier,
198        new_name: String,
199        check_reserved_names: bool,
200    },
201    UpdateOwner {
202        id: ObjectId,
203        new_owner: RoleId,
204    },
205    UpdatePrivilege {
206        target_id: SystemObjectId,
207        privilege: MzAclItem,
208        variant: UpdatePrivilegeVariant,
209    },
210    UpdateDefaultPrivilege {
211        privilege_object: DefaultPrivilegeObject,
212        privilege_acl_item: DefaultPrivilegeAclItem,
213        variant: UpdatePrivilegeVariant,
214    },
215    RevokeRole {
216        role_id: RoleId,
217        member_id: RoleId,
218        grantor_id: RoleId,
219    },
220    UpdateClusterConfig {
221        id: ClusterId,
222        name: String,
223        config: ClusterConfig,
224    },
225    UpdateClusterReplicaConfig {
226        cluster_id: ClusterId,
227        replica_id: ReplicaId,
228        config: ReplicaConfig,
229    },
230    UpdateItem {
231        id: CatalogItemId,
232        name: QualifiedItemName,
233        to_item: CatalogItem,
234    },
235    UpdateSourceReferences {
236        source_id: CatalogItemId,
237        references: SourceReferences,
238    },
239    UpdateSystemConfiguration {
240        name: String,
241        value: OwnedVarInput,
242    },
243    ResetSystemConfiguration {
244        name: String,
245    },
246    ResetAllSystemConfiguration,
247    /// Injects audit events into the catalog.
248    ///
249    /// This is a nonstandard path used for manually appending audit events at the current time.
250    /// Mainly useful for correcting the audit log in the face of bugs that made us forget to emit
251    /// audit events.
252    InjectAuditEvents {
253        events: Vec<InjectedAuditEvent>,
254    },
255}
256
257/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
258/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
259/// the `Op::DropObjects`.
260#[derive(Debug, Clone)]
261pub enum DropObjectInfo {
262    Cluster(ClusterId),
263    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
264    Database(DatabaseId),
265    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
266    Role(RoleId),
267    Item(CatalogItemId),
268    NetworkPolicy(NetworkPolicyId),
269}
270
271impl DropObjectInfo {
272    /// Creates a `DropObjectInfo` from an `ObjectId`.
273    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
274    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
275        match id {
276            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
277            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
278                cluster_id,
279                replica_id,
280                ReplicaCreateDropReason::Manual,
281            )),
282            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
283            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
284            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
285            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
286            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
287        }
288    }
289
290    /// Creates an `ObjectId` from a `DropObjectInfo`.
291    /// Loses the `ReplicaCreateDropReason` if there is one!
292    fn to_object_id(&self) -> ObjectId {
293        match &self {
294            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
295            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
296                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
297            }
298            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
299            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
300            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
301            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
302            DropObjectInfo::NetworkPolicy(network_policy_id) => {
303                ObjectId::NetworkPolicy(network_policy_id.clone())
304            }
305        }
306    }
307}
308
309/// The reason for creating or dropping a replica.
310#[derive(Debug, Clone)]
311pub enum ReplicaCreateDropReason {
312    /// The user initiated the replica create or drop, e.g., by
313    /// - creating/dropping a cluster,
314    /// - ALTERing various options on a managed cluster,
315    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
316    Manual,
317    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
318    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
319    ClusterScheduling(Vec<SchedulingDecision>),
320}
321
322impl ReplicaCreateDropReason {
323    pub fn into_audit_log(
324        self,
325    ) -> (
326        CreateOrDropClusterReplicaReasonV1,
327        Option<SchedulingDecisionsWithReasonsV2>,
328    ) {
329        let (reason, scheduling_policies) = match self {
330            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
331            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
332                CreateOrDropClusterReplicaReasonV1::Schedule,
333                Some(scheduling_decisions),
334            ),
335        };
336        (
337            reason,
338            scheduling_policies
339                .as_ref()
340                .map(SchedulingDecision::reasons_to_audit_log_reasons),
341        )
342    }
343}
344
345pub struct TransactionResult {
346    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
347    /// Parsed catalog updates from which we will derive catalog implications.
348    pub catalog_updates: Vec<ParsedStateUpdate>,
349    pub audit_events: Vec<VersionedEvent>,
350}
351
352#[derive(Debug, Clone, Copy)]
353enum TransactInnerMode {
354    /// Prepare storage state and return a commit-ready transaction state.
355    ///
356    /// This mode is used by durable catalog transactions.
357    Commit,
358    /// Execute a dry run against a durable transaction that will not be
359    /// committed.
360    ///
361    /// This mode still validates and applies updates to an in-memory
362    /// `CatalogState`, but it must not call `prepare_state` because dry runs
363    /// must not trigger controller side effects.
364    DryRun,
365}
366
367impl Catalog {
368    fn should_audit_log_item(item: &CatalogItem) -> bool {
369        !item.is_temporary()
370    }
371
372    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
373    /// within a connection id.
374    fn temporary_ids(
375        &self,
376        ops: &[Op],
377        temporary_drops: BTreeSet<(&ConnectionId, String)>,
378    ) -> Result<BTreeSet<CatalogItemId>, Error> {
379        let mut creating = BTreeSet::new();
380        let mut temporary_ids = BTreeSet::new();
381        for op in ops.iter() {
382            if let Op::CreateItem {
383                id,
384                name,
385                item,
386                owner_id: _,
387            } = op
388            {
389                if let Some(conn_id) = item.conn_id() {
390                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
391                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
392                        || creating.contains(&(conn_id, &name.item))
393                    {
394                        return Err(
395                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
396                        );
397                    } else {
398                        creating.insert((conn_id, &name.item));
399                        temporary_ids.insert(id.clone());
400                    }
401                }
402            }
403        }
404        Ok(temporary_ids)
405    }
406
407    #[instrument(name = "catalog::transact")]
408    pub async fn transact(
409        &mut self,
410        // n.b. this is an option to prevent us from needing to build out a
411        // dummy impl of `StorageController` for tests.
412        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
413        oracle_write_ts: mz_repr::Timestamp,
414        session: Option<&ConnMeta>,
415        ops: Vec<Op>,
416    ) -> Result<TransactionResult, AdapterError> {
417        trace!("transact: {:?}", ops);
418        fail::fail_point!("catalog_transact", |arg| {
419            Err(AdapterError::Unstructured(anyhow::anyhow!(
420                "failpoint: {arg:?}"
421            )))
422        });
423
424        let drop_ids: BTreeSet<CatalogItemId> = ops
425            .iter()
426            .filter_map(|op| match op {
427                Op::DropObjects(drop_object_infos) => {
428                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
429                    let item_ids = ids.filter_map(|id| match id {
430                        ObjectId::Item(id) => Some(id),
431                        _ => None,
432                    });
433                    Some(item_ids)
434                }
435                _ => None,
436            })
437            .flatten()
438            .collect();
439        let temporary_drops = drop_ids
440            .iter()
441            .filter_map(|id| {
442                let entry = self.get_entry(id);
443                match entry.item().conn_id() {
444                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
445                    None => None,
446                }
447            })
448            .collect();
449
450        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
451        let mut builtin_table_updates = vec![];
452        let mut catalog_updates = vec![];
453        let mut audit_events = vec![];
454        let mut storage = self.storage().await;
455        let mut tx = storage
456            .transaction()
457            .await
458            .unwrap_or_terminate("starting catalog transaction");
459
460        let new_state = Self::transact_inner(
461            TransactInnerMode::Commit,
462            storage_collections,
463            oracle_write_ts,
464            session,
465            ops,
466            temporary_ids,
467            &mut builtin_table_updates,
468            &mut catalog_updates,
469            &mut audit_events,
470            &mut tx,
471            &self.state,
472        )
473        .await?;
474
475        // The user closure was successful, apply the updates. Terminate the
476        // process if this fails, because we have to restart envd due to
477        // indeterminate catalog state, which we only reconcile during catalog
478        // init.
479        tx.commit(oracle_write_ts)
480            .await
481            .unwrap_or_terminate("catalog storage transaction commit must succeed");
482
483        // Dropping here keeps the mutable borrow on self, preventing us accidentally
484        // mutating anything until after f is executed.
485        drop(storage);
486        if let Some(new_state) = new_state {
487            self.transient_revision += 1;
488            self.state = new_state;
489        }
490
491        Ok(TransactionResult {
492            builtin_table_updates,
493            catalog_updates,
494            audit_events,
495        })
496    }
497
498    /// Performs an incremental dry-run catalog transaction: processes only the
499    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
500    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
501    ///
502    /// The durable transaction is intentionally never committed and no storage
503    /// controller prepare-state side effects are run.
504    ///
505    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
506    /// snapshot (which represents the tx state after the previous dry run),
507    /// ensuring it starts in sync with `base_state`. If `None` (first
508    /// statement), a fresh transaction is loaded from durable storage.
509    ///
510    /// Returns the new accumulated state and a snapshot of the transaction's
511    /// state for use in subsequent incremental dry runs.
512    pub async fn transact_incremental_dry_run(
513        &self,
514        base_state: &CatalogState,
515        ops: Vec<Op>,
516        session: Option<&ConnMeta>,
517        prev_snapshot: Option<Snapshot>,
518        oracle_write_ts: mz_repr::Timestamp,
519    ) -> Result<(CatalogState, Snapshot), AdapterError> {
520        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
521        // but we still need to check for collisions.
522        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
523
524        let mut builtin_table_updates = vec![];
525        let mut catalog_updates = vec![];
526        let mut audit_events = vec![];
527        let mut storage = self.storage().await;
528        let mut tx = if let Some(snapshot) = prev_snapshot {
529            // Restore transaction from saved snapshot so it starts in sync
530            // with the accumulated CatalogState from previous dry runs.
531            storage
532                .transaction_from_snapshot(snapshot)
533                .unwrap_or_terminate("starting catalog transaction from snapshot")
534        } else {
535            // First statement: fresh transaction from durable storage, which
536            // is in sync with the real catalog state.
537            storage
538                .transaction()
539                .await
540                .unwrap_or_terminate("starting catalog transaction")
541        };
542
543        // Process only the new ops against the accumulated state in dry-run mode.
544        let new_state = Self::transact_inner(
545            TransactInnerMode::DryRun,
546            None,
547            oracle_write_ts,
548            session,
549            ops,
550            temporary_ids,
551            &mut builtin_table_updates,
552            &mut catalog_updates,
553            &mut audit_events,
554            &mut tx,
555            base_state,
556        )
557        .await?;
558
559        // Save the transaction's current state as a snapshot for the next
560        // incremental dry run.
561        let new_snapshot = tx.current_snapshot();
562
563        // Transaction is NOT committed — drop it.
564        drop(storage);
565
566        // transact_inner returns Some(state) when ops produced changes.
567        let state = new_state.unwrap_or_else(|| base_state.clone());
568        Ok((state, new_snapshot))
569    }
570
571    /// Extracts optimized expressions from `Op::CreateItem` operations for views
572    /// and materialized views. These can be used to populate a `LocalExpressionCache`
573    /// to avoid re-optimization during `apply_updates`.
574    fn extract_expressions_from_ops(
575        ops: &[Op],
576        optimizer_features: &OptimizerFeatures,
577    ) -> BTreeMap<GlobalId, LocalExpressions> {
578        let mut exprs = BTreeMap::new();
579
580        for op in ops {
581            if let Op::CreateItem { item, .. } = op {
582                match item {
583                    CatalogItem::View(view) => {
584                        exprs.insert(
585                            view.global_id,
586                            LocalExpressions {
587                                local_mir: (*view.locally_optimized_expr).clone(),
588                                optimizer_features: optimizer_features.clone(),
589                            },
590                        );
591                    }
592                    CatalogItem::MaterializedView(mv) => {
593                        exprs.insert(
594                            mv.global_id_writes(),
595                            LocalExpressions {
596                                local_mir: (*mv.locally_optimized_expr).clone(),
597                                optimizer_features: optimizer_features.clone(),
598                            },
599                        );
600                    }
601                    CatalogItem::Table(_)
602                    | CatalogItem::Source(_)
603                    | CatalogItem::Log(_)
604                    | CatalogItem::Sink(_)
605                    | CatalogItem::Index(_)
606                    | CatalogItem::Type(_)
607                    | CatalogItem::Func(_)
608                    | CatalogItem::Secret(_)
609                    | CatalogItem::Connection(_) => {}
610                }
611            }
612        }
613
614        exprs
615    }
616
617    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
618    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
619    ///
620    /// `mode` controls whether storage prepare-state side effects are allowed.
621    /// In `DryRun` mode, this method may update the in-memory state returned to
622    /// the caller, but it must not trigger controller side effects.
623    ///
624    #[instrument(name = "catalog::transact_inner")]
625    async fn transact_inner(
626        mode: TransactInnerMode,
627        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
628        oracle_write_ts: mz_repr::Timestamp,
629        session: Option<&ConnMeta>,
630        ops: Vec<Op>,
631        temporary_ids: BTreeSet<CatalogItemId>,
632        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
633        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
634        audit_events: &mut Vec<VersionedEvent>,
635        tx: &mut Transaction<'_>,
636        state: &CatalogState,
637    ) -> Result<Option<CatalogState>, AdapterError> {
638        // We come up with new catalog state, builtin state updates, and parsed
639        // catalog updates (for deriving catalog implications) in two phases:
640        //
641        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
642        //    one-by-one. This will give us the full list of updates to apply to
643        //    the catalog, which will allow us to apply it in one batch, which
644        //    in turn will allow the apply machinery to consolidate the updates.
645        // 2. We do one final apply call with all updates, which gives us the
646        //    final builtin table updates and parsed catalog updates.
647        //
648        // The reason is that the loop that is working off ops first does a
649        // transact_op to derive the state updates for that op, and then calls
650        // apply_updates on the catalog state. And successive ops might expect
651        // the catalog state to reflect the modified state _after_ applying
652        // previous ops.
653        //
654        // We want to, however, have one final apply_state that takes all the
655        // accumulated updates to derive the required controller updates and the
656        // builtin table updates.
657        //
658        // We won't win any DDL throughput benchmarks, but so far that's not
659        // what we're optimizing for and there would probably be other
660        // bottlenecks before we hit this one as a bottleneck.
661        //
662        // We could work around this by refactoring how the interplay of
663        // transact_op and apply_updates works, but that's a larger undertaking.
664        let mut preliminary_state = Cow::Borrowed(state);
665
666        // The final state that we will return, if modified.
667        let mut state = Cow::Borrowed(state);
668
669        if ops.is_empty() {
670            return Ok(None);
671        }
672
673        // Extract optimized expressions from CreateItem ops to avoid re-optimization
674        // during apply_updates. We extract before the loop since `ops` is moved there.
675        let optimizer_features = OptimizerFeatures::from(state.system_config());
676        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
677
678        let mut storage_collections_to_create = BTreeSet::new();
679        let mut storage_collections_to_drop = BTreeSet::new();
680        let mut storage_collections_to_register = BTreeMap::new();
681
682        let mut updates = Vec::new();
683
684        for op in ops {
685            let temporary_item_updates = Self::transact_op(
686                oracle_write_ts,
687                session,
688                op,
689                &temporary_ids,
690                audit_events,
691                tx,
692                &*preliminary_state,
693                &mut storage_collections_to_create,
694                &mut storage_collections_to_drop,
695                &mut storage_collections_to_register,
696            )
697            .await?;
698
699            // Temporary items are not stored in the durable catalog, so they need to be handled
700            // separately for updating state and builtin tables.
701            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
702            // in a multi-subscriber catalog world.
703            let upper = tx.upper();
704            let temporary_item_updates =
705                temporary_item_updates
706                    .into_iter()
707                    .map(|(item, diff)| StateUpdate {
708                        kind: StateUpdateKind::TemporaryItem(item),
709                        ts: upper,
710                        diff,
711                    });
712
713            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
714            op_updates.extend(temporary_item_updates);
715            if !op_updates.is_empty() {
716                // Clone the cache so each apply_updates call has access to cached expressions.
717                // The cache uses `remove` semantics, so we need a fresh clone for each call.
718                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
719                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
720                    .to_mut()
721                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
722                    .await;
723            }
724            updates.append(&mut op_updates);
725        }
726
727        if !updates.is_empty() {
728            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
729            let (op_builtin_table_updates, op_catalog_updates) = state
730                .to_mut()
731                .apply_updates(updates.clone(), &mut local_expr_cache)
732                .await;
733            let op_builtin_table_updates = state
734                .to_mut()
735                .resolve_builtin_table_updates(op_builtin_table_updates);
736            builtin_table_updates.extend(op_builtin_table_updates);
737            parsed_catalog_updates.extend(op_catalog_updates);
738        }
739
740        match mode {
741            TransactInnerMode::Commit => {
742                // `storage_collections` can be `None` in tests.
743                if let Some(c) = storage_collections {
744                    c.prepare_state(
745                        tx,
746                        storage_collections_to_create,
747                        storage_collections_to_drop,
748                        storage_collections_to_register,
749                    )
750                    .await?;
751                }
752            }
753            TransactInnerMode::DryRun => {
754                debug_assert!(
755                    storage_collections.is_none(),
756                    "dry-run mode must not prepare storage state"
757                );
758            }
759        }
760
761        let updates = tx.get_and_commit_op_updates();
762        if !updates.is_empty() {
763            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
764            let (op_builtin_table_updates, op_catalog_updates) = state
765                .to_mut()
766                .apply_updates(updates.clone(), &mut local_expr_cache)
767                .await;
768            let op_builtin_table_updates = state
769                .to_mut()
770                .resolve_builtin_table_updates(op_builtin_table_updates);
771            builtin_table_updates.extend(op_builtin_table_updates);
772            parsed_catalog_updates.extend(op_catalog_updates);
773        }
774
775        match state {
776            Cow::Owned(state) => Ok(Some(state)),
777            Cow::Borrowed(_) => Ok(None),
778        }
779    }
780
781    /// Performs the transaction operation described by `op`. This function prepares the changes in
782    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
783    /// changes.
784    ///
785    /// Optionally returns a builtin table update for any builtin table updates than cannot be
786    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
787    /// scenarios and ideally in the future don't exist.
788    #[instrument]
789    async fn transact_op(
790        oracle_write_ts: mz_repr::Timestamp,
791        session: Option<&ConnMeta>,
792        op: Op,
793        temporary_ids: &BTreeSet<CatalogItemId>,
794        audit_events: &mut Vec<VersionedEvent>,
795        tx: &mut Transaction<'_>,
796        state: &CatalogState,
797        storage_collections_to_create: &mut BTreeSet<GlobalId>,
798        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
799        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
800    ) -> Result<Vec<(TemporaryItem, StateDiff)>, AdapterError> {
801        let mut temporary_item_updates = Vec::new();
802
803        match op {
804            Op::AlterRetainHistory { id, value, window } => {
805                let entry = state.get_entry(&id);
806                if id.is_system() {
807                    let name = entry.name();
808                    let full_name =
809                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
810                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
811                        full_name.to_string(),
812                    ))));
813                }
814
815                let mut new_entry = entry.clone();
816                let previous = new_entry
817                    .item
818                    .update_retain_history(value.clone(), window)
819                    .map_err(|_| {
820                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
821                            "planner should have rejected invalid alter retain history item type"
822                                .to_string(),
823                        )))
824                    })?;
825
826                if Self::should_audit_log_item(new_entry.item()) {
827                    let details =
828                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
829                            id: id.to_string(),
830                            old_history: previous.map(|previous| previous.to_string()),
831                            new_history: value.map(|v| v.to_string()),
832                        });
833                    CatalogState::add_to_audit_log(
834                        &state.system_configuration,
835                        oracle_write_ts,
836                        session,
837                        tx,
838                        audit_events,
839                        EventType::Alter,
840                        catalog_type_to_audit_object_type(new_entry.item().typ()),
841                        details,
842                    )?;
843                }
844
845                tx.update_item(id, new_entry.into())?;
846
847                Self::log_update(state, &id);
848            }
849            Op::AlterSourceTimestampInterval {
850                id,
851                value,
852                interval,
853            } => {
854                let entry = state.get_entry(&id);
855                if id.is_system() {
856                    let name = entry.name();
857                    let full_name =
858                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
859                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
860                        full_name.to_string(),
861                    ))));
862                }
863
864                let mut new_entry = entry.clone();
865                let previous = new_entry
866                    .item
867                    .update_timestamp_interval(value.clone(), interval)
868                    .map_err(|_| {
869                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
870                            "planner should have rejected invalid alter timestamp interval item type"
871                                .to_string(),
872                        )))
873                    })?;
874
875                if Self::should_audit_log_item(new_entry.item()) {
876                    let details = EventDetails::AlterSourceTimestampIntervalV1(
877                        mz_audit_log::AlterSourceTimestampIntervalV1 {
878                            id: id.to_string(),
879                            old_interval: previous.map(|previous| previous.to_string()),
880                            new_interval: value.map(|v| v.to_string()),
881                        },
882                    );
883                    CatalogState::add_to_audit_log(
884                        &state.system_configuration,
885                        oracle_write_ts,
886                        session,
887                        tx,
888                        audit_events,
889                        EventType::Alter,
890                        catalog_type_to_audit_object_type(new_entry.item().typ()),
891                        details,
892                    )?;
893                }
894
895                tx.update_item(id, new_entry.into())?;
896
897                Self::log_update(state, &id);
898            }
899            Op::AlterRole {
900                id,
901                name,
902                attributes,
903                nopassword,
904                vars,
905            } => {
906                state.ensure_not_reserved_role(&id)?;
907
908                let mut existing_role = state.get_role(&id).clone();
909                let password = attributes.password.clone();
910                let scram_iterations = attributes
911                    .scram_iterations
912                    .unwrap_or_else(|| state.system_config().scram_iterations());
913                existing_role.attributes = attributes.into();
914                existing_role.vars = vars;
915                let password_action = if nopassword {
916                    PasswordAction::Clear
917                } else if let Some(password) = password {
918                    PasswordAction::Set(PasswordConfig {
919                        password,
920                        scram_iterations,
921                    })
922                } else {
923                    PasswordAction::NoChange
924                };
925                tx.update_role(id, existing_role.into(), password_action)?;
926
927                CatalogState::add_to_audit_log(
928                    &state.system_configuration,
929                    oracle_write_ts,
930                    session,
931                    tx,
932                    audit_events,
933                    EventType::Alter,
934                    ObjectType::Role,
935                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
936                        id: id.to_string(),
937                        name: name.clone(),
938                    }),
939                )?;
940
941                info!("update role {name} ({id})");
942            }
943            Op::AlterNetworkPolicy {
944                id,
945                rules,
946                name,
947                owner_id: _owner_id,
948            } => {
949                let existing_policy = state.get_network_policy(&id).clone();
950                let mut policy: NetworkPolicy = existing_policy.into();
951                policy.rules = rules;
952                if is_reserved_name(&name) {
953                    return Err(AdapterError::Catalog(Error::new(
954                        ErrorKind::ReservedNetworkPolicyName(name),
955                    )));
956                }
957                tx.update_network_policy(id, policy.clone())?;
958
959                CatalogState::add_to_audit_log(
960                    &state.system_configuration,
961                    oracle_write_ts,
962                    session,
963                    tx,
964                    audit_events,
965                    EventType::Alter,
966                    ObjectType::NetworkPolicy,
967                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
968                        id: id.to_string(),
969                        name: name.clone(),
970                    }),
971                )?;
972
973                info!("update network policy {name} ({id})");
974            }
975            Op::AlterAddColumn {
976                id,
977                new_global_id,
978                name,
979                typ,
980                sql,
981            } => {
982                let column_name = name.to_string();
983                let column_type = typ.to_string();
984                let nullable = typ.nullable;
985                let mut new_entry = state.get_entry(&id).clone();
986                let version = new_entry.item.add_column(name, typ, sql)?;
987                // All versions of a table share the same shard, so it shouldn't matter what
988                // GlobalId we use here.
989                let shard_id = state
990                    .storage_metadata()
991                    .get_collection_shard(new_entry.latest_global_id())?;
992
993                // TODO(alter_table): Support adding columns to sources.
994                let CatalogItem::Table(table) = &mut new_entry.item else {
995                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
996                };
997                table.collections.insert(version, new_global_id);
998
999                if Self::should_audit_log_item(new_entry.item()) {
1000                    let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
1001                        id: id.to_string(),
1002                        column: column_name,
1003                        column_type,
1004                        nullable,
1005                    });
1006                    CatalogState::add_to_audit_log(
1007                        &state.system_configuration,
1008                        oracle_write_ts,
1009                        session,
1010                        tx,
1011                        audit_events,
1012                        EventType::Alter,
1013                        catalog_type_to_audit_object_type(new_entry.item().typ()),
1014                        details,
1015                    )?;
1016                }
1017
1018                tx.update_item(id, new_entry.into())?;
1019                storage_collections_to_register.insert(new_global_id, shard_id);
1020            }
1021            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1022                let mut new_entry = state.get_entry(&id).clone();
1023                let replacement = state.get_entry(&replacement_id);
1024
1025                let new_audit_events =
1026                    apply_replacement_audit_events(state, &new_entry, replacement);
1027
1028                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1029                    return Err(AdapterError::internal(
1030                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1031                        "id must refer to a materialized view",
1032                    ));
1033                };
1034                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1035                    return Err(AdapterError::internal(
1036                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1037                        "replacement_id must refer to a materialized view",
1038                    ));
1039                };
1040
1041                mv.apply_replacement(replacement_mv.clone());
1042
1043                tx.remove_item(replacement_id)?;
1044
1045                new_entry.id = replacement_id;
1046                tx_replace_item(tx, state, id, new_entry)?;
1047
1048                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1049                tx.drop_comments(&[comment_id].into())?;
1050
1051                for (event_type, details) in new_audit_events {
1052                    CatalogState::add_to_audit_log(
1053                        &state.system_configuration,
1054                        oracle_write_ts,
1055                        session,
1056                        tx,
1057                        audit_events,
1058                        event_type,
1059                        ObjectType::MaterializedView,
1060                        details,
1061                    )?;
1062                }
1063            }
1064            Op::CreateDatabase { name, owner_id } => {
1065                let database_owner_privileges = vec![rbac::owner_privilege(
1066                    mz_sql::catalog::ObjectType::Database,
1067                    owner_id,
1068                )];
1069                let database_default_privileges = state
1070                    .default_privileges
1071                    .get_applicable_privileges(
1072                        owner_id,
1073                        None,
1074                        None,
1075                        mz_sql::catalog::ObjectType::Database,
1076                    )
1077                    .map(|item| item.mz_acl_item(owner_id));
1078                let database_privileges: Vec<_> = merge_mz_acl_items(
1079                    database_owner_privileges
1080                        .into_iter()
1081                        .chain(database_default_privileges),
1082                )
1083                .collect();
1084
1085                let schema_owner_privileges = vec![rbac::owner_privilege(
1086                    mz_sql::catalog::ObjectType::Schema,
1087                    owner_id,
1088                )];
1089                let schema_default_privileges = state
1090                    .default_privileges
1091                    .get_applicable_privileges(
1092                        owner_id,
1093                        None,
1094                        None,
1095                        mz_sql::catalog::ObjectType::Schema,
1096                    )
1097                    .map(|item| item.mz_acl_item(owner_id))
1098                    // Special default privilege on public schemas.
1099                    .chain(std::iter::once(MzAclItem {
1100                        grantee: RoleId::Public,
1101                        grantor: owner_id,
1102                        acl_mode: AclMode::USAGE,
1103                    }));
1104                let schema_privileges: Vec<_> = merge_mz_acl_items(
1105                    schema_owner_privileges
1106                        .into_iter()
1107                        .chain(schema_default_privileges),
1108                )
1109                .collect();
1110
1111                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1112                let (database_id, _) = tx.insert_user_database(
1113                    &name,
1114                    owner_id,
1115                    database_privileges.clone(),
1116                    &temporary_oids,
1117                )?;
1118                let (schema_id, _) = tx.insert_user_schema(
1119                    database_id,
1120                    DEFAULT_SCHEMA,
1121                    owner_id,
1122                    schema_privileges.clone(),
1123                    &temporary_oids,
1124                )?;
1125                CatalogState::add_to_audit_log(
1126                    &state.system_configuration,
1127                    oracle_write_ts,
1128                    session,
1129                    tx,
1130                    audit_events,
1131                    EventType::Create,
1132                    ObjectType::Database,
1133                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1134                        id: database_id.to_string(),
1135                        name: name.clone(),
1136                    }),
1137                )?;
1138                info!("create database {}", name);
1139
1140                CatalogState::add_to_audit_log(
1141                    &state.system_configuration,
1142                    oracle_write_ts,
1143                    session,
1144                    tx,
1145                    audit_events,
1146                    EventType::Create,
1147                    ObjectType::Schema,
1148                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1149                        id: schema_id.to_string(),
1150                        name: DEFAULT_SCHEMA.to_string(),
1151                        database_name: Some(name),
1152                    }),
1153                )?;
1154            }
1155            Op::CreateSchema {
1156                database_id,
1157                schema_name,
1158                owner_id,
1159            } => {
1160                if is_reserved_name(&schema_name) {
1161                    return Err(AdapterError::Catalog(Error::new(
1162                        ErrorKind::ReservedSchemaName(schema_name),
1163                    )));
1164                }
1165                let database_id = match database_id {
1166                    ResolvedDatabaseSpecifier::Id(id) => id,
1167                    ResolvedDatabaseSpecifier::Ambient => {
1168                        return Err(AdapterError::Catalog(Error::new(
1169                            ErrorKind::ReadOnlySystemSchema(schema_name),
1170                        )));
1171                    }
1172                };
1173                let owner_privileges = vec![rbac::owner_privilege(
1174                    mz_sql::catalog::ObjectType::Schema,
1175                    owner_id,
1176                )];
1177                let default_privileges = state
1178                    .default_privileges
1179                    .get_applicable_privileges(
1180                        owner_id,
1181                        Some(database_id),
1182                        None,
1183                        mz_sql::catalog::ObjectType::Schema,
1184                    )
1185                    .map(|item| item.mz_acl_item(owner_id));
1186                let privileges: Vec<_> =
1187                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1188                        .collect();
1189                let (schema_id, _) = tx.insert_user_schema(
1190                    database_id,
1191                    &schema_name,
1192                    owner_id,
1193                    privileges.clone(),
1194                    &state.get_temporary_oids().collect(),
1195                )?;
1196                CatalogState::add_to_audit_log(
1197                    &state.system_configuration,
1198                    oracle_write_ts,
1199                    session,
1200                    tx,
1201                    audit_events,
1202                    EventType::Create,
1203                    ObjectType::Schema,
1204                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1205                        id: schema_id.to_string(),
1206                        name: schema_name.clone(),
1207                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1208                    }),
1209                )?;
1210            }
1211            Op::CreateRole { name, attributes } => {
1212                if is_reserved_role_name(&name) {
1213                    return Err(AdapterError::Catalog(Error::new(
1214                        ErrorKind::ReservedRoleName(name),
1215                    )));
1216                }
1217                let membership = RoleMembership::new();
1218                let vars = RoleVars::default();
1219                let (id, _) = tx.insert_user_role(
1220                    name.clone(),
1221                    attributes.clone(),
1222                    membership.clone(),
1223                    vars.clone(),
1224                    &state.get_temporary_oids().collect(),
1225                )?;
1226                CatalogState::add_to_audit_log(
1227                    &state.system_configuration,
1228                    oracle_write_ts,
1229                    session,
1230                    tx,
1231                    audit_events,
1232                    EventType::Create,
1233                    ObjectType::Role,
1234                    EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1235                        id: id.to_string(),
1236                        name: name.clone(),
1237                        auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1238                            AutoProvisionSource::Oidc => "oidc".to_string(),
1239                            AutoProvisionSource::Frontegg => "frontegg".to_string(),
1240                            AutoProvisionSource::None => "none".to_string(),
1241                        }),
1242                    }),
1243                )?;
1244                info!("create role {}", name);
1245            }
1246            Op::CreateCluster {
1247                id,
1248                name,
1249                introspection_sources,
1250                owner_id,
1251                config,
1252            } => {
1253                if is_reserved_name(&name) {
1254                    return Err(AdapterError::Catalog(Error::new(
1255                        ErrorKind::ReservedClusterName(name),
1256                    )));
1257                }
1258                let owner_privileges = vec![rbac::owner_privilege(
1259                    mz_sql::catalog::ObjectType::Cluster,
1260                    owner_id,
1261                )];
1262                let default_privileges = state
1263                    .default_privileges
1264                    .get_applicable_privileges(
1265                        owner_id,
1266                        None,
1267                        None,
1268                        mz_sql::catalog::ObjectType::Cluster,
1269                    )
1270                    .map(|item| item.mz_acl_item(owner_id));
1271                let privileges: Vec<_> =
1272                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1273                        .collect();
1274                let introspection_source_ids: Vec<_> = introspection_sources
1275                    .iter()
1276                    .map(|introspection_source| {
1277                        Transaction::allocate_introspection_source_index_id(
1278                            &id,
1279                            introspection_source.variant,
1280                        )
1281                    })
1282                    .collect();
1283
1284                let introspection_sources = introspection_sources
1285                    .into_iter()
1286                    .zip_eq(introspection_source_ids)
1287                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1288                    .collect();
1289
1290                tx.insert_user_cluster(
1291                    id,
1292                    &name,
1293                    introspection_sources,
1294                    owner_id,
1295                    privileges.clone(),
1296                    config.clone().into(),
1297                    &state.get_temporary_oids().collect(),
1298                )?;
1299                CatalogState::add_to_audit_log(
1300                    &state.system_configuration,
1301                    oracle_write_ts,
1302                    session,
1303                    tx,
1304                    audit_events,
1305                    EventType::Create,
1306                    ObjectType::Cluster,
1307                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1308                        id: id.to_string(),
1309                        name: name.clone(),
1310                    }),
1311                )?;
1312                info!("create cluster {}", name);
1313            }
1314            Op::CreateClusterReplica {
1315                cluster_id,
1316                name,
1317                config,
1318                owner_id,
1319                reason,
1320            } => {
1321                if is_reserved_name(&name) {
1322                    return Err(AdapterError::Catalog(Error::new(
1323                        ErrorKind::ReservedReplicaName(name),
1324                    )));
1325                }
1326                let cluster = state.get_cluster(cluster_id);
1327                let id =
1328                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1329                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1330                    size,
1331                    billed_as,
1332                    internal,
1333                    ..
1334                }) = &config.location
1335                {
1336                    let (reason, scheduling_policies) = reason.into_audit_log();
1337                    let details = EventDetails::CreateClusterReplicaV4(
1338                        mz_audit_log::CreateClusterReplicaV4 {
1339                            cluster_id: cluster_id.to_string(),
1340                            cluster_name: cluster.name.clone(),
1341                            replica_id: Some(id.to_string()),
1342                            replica_name: name.clone(),
1343                            logical_size: size.clone(),
1344                            billed_as: billed_as.clone(),
1345                            internal: *internal,
1346                            reason,
1347                            scheduling_policies,
1348                        },
1349                    );
1350                    CatalogState::add_to_audit_log(
1351                        &state.system_configuration,
1352                        oracle_write_ts,
1353                        session,
1354                        tx,
1355                        audit_events,
1356                        EventType::Create,
1357                        ObjectType::ClusterReplica,
1358                        details,
1359                    )?;
1360                }
1361            }
1362            Op::CreateItem {
1363                id,
1364                name,
1365                item,
1366                owner_id,
1367            } => {
1368                state.check_unstable_dependencies(&item)?;
1369
1370                match &item {
1371                    CatalogItem::Table(table) => {
1372                        let gids: Vec<_> = table.global_ids().collect();
1373                        assert_eq!(gids.len(), 1);
1374                        storage_collections_to_create.extend(gids);
1375                    }
1376                    CatalogItem::Source(source) => {
1377                        storage_collections_to_create.insert(source.global_id());
1378                    }
1379                    CatalogItem::MaterializedView(mv) => {
1380                        let mv_gid = mv.global_id_writes();
1381                        if let Some(target_id) = mv.replacement_target {
1382                            let target_gid = state.get_entry(&target_id).latest_global_id();
1383                            let shard_id =
1384                                state.storage_metadata().get_collection_shard(target_gid)?;
1385                            storage_collections_to_register.insert(mv_gid, shard_id);
1386                        } else {
1387                            storage_collections_to_create.insert(mv_gid);
1388                        }
1389                    }
1390                    CatalogItem::Sink(sink) => {
1391                        storage_collections_to_create.insert(sink.global_id());
1392                    }
1393                    CatalogItem::Log(_)
1394                    | CatalogItem::View(_)
1395                    | CatalogItem::Index(_)
1396                    | CatalogItem::Type(_)
1397                    | CatalogItem::Func(_)
1398                    | CatalogItem::Secret(_)
1399                    | CatalogItem::Connection(_) => (),
1400                }
1401
1402                let system_user = session.map_or(false, |s| s.user().is_system_user());
1403                if !system_user {
1404                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1405                        let cluster_name = state.clusters_by_id[&id].name.clone();
1406                        return Err(AdapterError::Catalog(Error::new(
1407                            ErrorKind::ReadOnlyCluster(cluster_name),
1408                        )));
1409                    }
1410                }
1411
1412                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1413                let default_privileges = state
1414                    .default_privileges
1415                    .get_applicable_privileges(
1416                        owner_id,
1417                        name.qualifiers.database_spec.id(),
1418                        Some(name.qualifiers.schema_spec.into()),
1419                        item.typ().into(),
1420                    )
1421                    .map(|item| item.mz_acl_item(owner_id));
1422                // mz_support can read all progress sources.
1423                let progress_source_privilege = if item.is_progress_source() {
1424                    Some(MzAclItem {
1425                        grantee: MZ_SUPPORT_ROLE_ID,
1426                        grantor: owner_id,
1427                        acl_mode: AclMode::SELECT,
1428                    })
1429                } else {
1430                    None
1431                };
1432                let privileges: Vec<_> = merge_mz_acl_items(
1433                    owner_privileges
1434                        .into_iter()
1435                        .chain(default_privileges)
1436                        .chain(progress_source_privilege),
1437                )
1438                .collect();
1439
1440                let temporary_oids = state.get_temporary_oids().collect();
1441
1442                if item.is_temporary() {
1443                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1444                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1445                    {
1446                        return Err(AdapterError::Catalog(Error::new(
1447                            ErrorKind::InvalidTemporarySchema,
1448                        )));
1449                    }
1450                    let oid = tx.allocate_oid(&temporary_oids)?;
1451
1452                    let schema_id = name.qualifiers.schema_spec.clone().into();
1453                    let item_type = item.typ();
1454                    let (create_sql, global_id, versions) = item.to_serialized();
1455
1456                    let item = TemporaryItem {
1457                        id,
1458                        oid,
1459                        global_id,
1460                        schema_id,
1461                        name: name.item.clone(),
1462                        create_sql,
1463                        conn_id: item.conn_id().cloned(),
1464                        owner_id,
1465                        privileges: privileges.clone(),
1466                        extra_versions: versions,
1467                    };
1468                    temporary_item_updates.push((item, StateDiff::Addition));
1469
1470                    info!(
1471                        "create temporary {} {} ({})",
1472                        item_type,
1473                        state.resolve_full_name(&name, None),
1474                        id
1475                    );
1476                } else {
1477                    if let Some(temp_id) =
1478                        item.uses()
1479                            .iter()
1480                            .find(|id| match state.try_get_entry(*id) {
1481                                Some(entry) => entry.item().is_temporary(),
1482                                None => temporary_ids.contains(id),
1483                            })
1484                    {
1485                        let temp_item = state.get_entry(temp_id);
1486                        return Err(AdapterError::Catalog(Error::new(
1487                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1488                        )));
1489                    }
1490                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1491                        && !system_user
1492                    {
1493                        let schema_name = state
1494                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1495                            .schema;
1496                        return Err(AdapterError::Catalog(Error::new(
1497                            ErrorKind::ReadOnlySystemSchema(schema_name),
1498                        )));
1499                    }
1500                    let schema_id = name.qualifiers.schema_spec.clone().into();
1501                    let item_type = item.typ();
1502                    let (create_sql, global_id, versions) = item.to_serialized();
1503                    tx.insert_user_item(
1504                        id,
1505                        global_id,
1506                        schema_id,
1507                        &name.item,
1508                        create_sql,
1509                        owner_id,
1510                        privileges.clone(),
1511                        &temporary_oids,
1512                        versions,
1513                    )?;
1514                    info!(
1515                        "create {} {} ({})",
1516                        item_type,
1517                        state.resolve_full_name(&name, None),
1518                        id
1519                    );
1520                }
1521
1522                if Self::should_audit_log_item(&item) {
1523                    let name = Self::full_name_detail(
1524                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1525                    );
1526                    let details = match &item {
1527                        CatalogItem::Source(s) => {
1528                            let cluster_id = match s.data_source {
1529                                // Ingestion exports don't have their own cluster, but
1530                                // run on their ingestion's cluster.
1531                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1532                                    match state.get_entry(&ingestion_id).cluster_id() {
1533                                        Some(cluster_id) => Some(cluster_id.to_string()),
1534                                        None => None,
1535                                    }
1536                                }
1537                                _ => match item.cluster_id() {
1538                                    Some(cluster_id) => Some(cluster_id.to_string()),
1539                                    None => None,
1540                                },
1541                            };
1542
1543                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1544                                id: id.to_string(),
1545                                cluster_id,
1546                                name,
1547                                external_type: s.source_type().to_string(),
1548                            })
1549                        }
1550                        CatalogItem::Sink(s) => {
1551                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1552                                id: id.to_string(),
1553                                cluster_id: Some(s.cluster_id.to_string()),
1554                                name,
1555                                external_type: s.sink_type().to_string(),
1556                            })
1557                        }
1558                        CatalogItem::Index(i) => {
1559                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1560                                id: id.to_string(),
1561                                name,
1562                                cluster_id: i.cluster_id.to_string(),
1563                            })
1564                        }
1565                        CatalogItem::MaterializedView(mv) => {
1566                            EventDetails::CreateMaterializedViewV1(
1567                                mz_audit_log::CreateMaterializedViewV1 {
1568                                    id: id.to_string(),
1569                                    name,
1570                                    cluster_id: mv.cluster_id.to_string(),
1571                                    replacement_target_id: mv
1572                                        .replacement_target
1573                                        .map(|id| id.to_string()),
1574                                },
1575                            )
1576                        }
1577                        CatalogItem::Table(_)
1578                        | CatalogItem::Log(_)
1579                        | CatalogItem::View(_)
1580                        | CatalogItem::Type(_)
1581                        | CatalogItem::Func(_)
1582                        | CatalogItem::Secret(_)
1583                        | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1584                            id: id.to_string(),
1585                            name,
1586                        }),
1587                    };
1588                    CatalogState::add_to_audit_log(
1589                        &state.system_configuration,
1590                        oracle_write_ts,
1591                        session,
1592                        tx,
1593                        audit_events,
1594                        EventType::Create,
1595                        catalog_type_to_audit_object_type(item.typ()),
1596                        details,
1597                    )?;
1598                }
1599            }
1600            Op::CreateNetworkPolicy {
1601                rules,
1602                name,
1603                owner_id,
1604            } => {
1605                if state.network_policies_by_name.contains_key(&name) {
1606                    return Err(AdapterError::PlanError(PlanError::Catalog(
1607                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1608                    )));
1609                }
1610                if is_reserved_name(&name) {
1611                    return Err(AdapterError::Catalog(Error::new(
1612                        ErrorKind::ReservedNetworkPolicyName(name),
1613                    )));
1614                }
1615
1616                let owner_privileges = vec![rbac::owner_privilege(
1617                    mz_sql::catalog::ObjectType::NetworkPolicy,
1618                    owner_id,
1619                )];
1620                let default_privileges = state
1621                    .default_privileges
1622                    .get_applicable_privileges(
1623                        owner_id,
1624                        None,
1625                        None,
1626                        mz_sql::catalog::ObjectType::NetworkPolicy,
1627                    )
1628                    .map(|item| item.mz_acl_item(owner_id));
1629                let privileges: Vec<_> =
1630                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1631                        .collect();
1632
1633                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1634                let id = tx.insert_user_network_policy(
1635                    name.clone(),
1636                    rules,
1637                    privileges,
1638                    owner_id,
1639                    &temporary_oids,
1640                )?;
1641
1642                CatalogState::add_to_audit_log(
1643                    &state.system_configuration,
1644                    oracle_write_ts,
1645                    session,
1646                    tx,
1647                    audit_events,
1648                    EventType::Create,
1649                    ObjectType::NetworkPolicy,
1650                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1651                        id: id.to_string(),
1652                        name: name.clone(),
1653                    }),
1654                )?;
1655
1656                info!("created network policy {name} ({id})");
1657            }
1658            Op::Comment {
1659                object_id,
1660                sub_component,
1661                comment,
1662            } => {
1663                tx.update_comment(object_id, sub_component, comment)?;
1664                let entry = state.get_comment_id_entry(&object_id);
1665                let should_log = entry
1666                    .map(|entry| Self::should_audit_log_item(entry.item()))
1667                    // Things that aren't catalog entries can't be temp, so should be logged.
1668                    .unwrap_or(true);
1669                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1670                // comments won't be logged for now.
1671                if let (Some(conn_id), true) =
1672                    (session.map(|session| session.conn_id()), should_log)
1673                {
1674                    CatalogState::add_to_audit_log(
1675                        &state.system_configuration,
1676                        oracle_write_ts,
1677                        session,
1678                        tx,
1679                        audit_events,
1680                        EventType::Comment,
1681                        comment_id_to_audit_object_type(object_id),
1682                        EventDetails::IdNameV1(IdNameV1 {
1683                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1684                            id: format!("{object_id:?}"),
1685                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1686                        }),
1687                    )?;
1688                }
1689            }
1690            Op::UpdateSourceReferences {
1691                source_id,
1692                references,
1693            } => {
1694                tx.update_source_references(
1695                    source_id,
1696                    references
1697                        .references
1698                        .into_iter()
1699                        .map(|reference| reference.into())
1700                        .collect(),
1701                    references.updated_at,
1702                )?;
1703            }
1704            Op::DropObjects(drop_object_infos) => {
1705                // Generate all of the objects that need to get dropped.
1706                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1707
1708                // Drop any associated comments.
1709                tx.drop_comments(&delta.comments)?;
1710
1711                // Drop any items.
1712                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1713                    delta
1714                        .items
1715                        .iter()
1716                        .map(|id| id)
1717                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1718                tx.remove_items(&durable_items_to_drop)?;
1719                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1720                    let entry = state.get_entry(&id);
1721                    (entry.clone().into(), StateDiff::Retraction)
1722                }));
1723
1724                for item_id in delta.items {
1725                    let entry = state.get_entry(&item_id);
1726
1727                    if entry.item().is_storage_collection() {
1728                        storage_collections_to_drop.extend(entry.global_ids());
1729                    }
1730
1731                    if state.source_references.contains_key(&item_id) {
1732                        tx.remove_source_references(item_id)?;
1733                    }
1734
1735                    if Self::should_audit_log_item(entry.item()) {
1736                        CatalogState::add_to_audit_log(
1737                            &state.system_configuration,
1738                            oracle_write_ts,
1739                            session,
1740                            tx,
1741                            audit_events,
1742                            EventType::Drop,
1743                            catalog_type_to_audit_object_type(entry.item().typ()),
1744                            EventDetails::IdFullNameV1(IdFullNameV1 {
1745                                id: item_id.to_string(),
1746                                name: Self::full_name_detail(&state.resolve_full_name(
1747                                    entry.name(),
1748                                    session.map(|session| session.conn_id()),
1749                                )),
1750                            }),
1751                        )?;
1752                    }
1753                    info!(
1754                        "drop {} {} ({})",
1755                        entry.item_type(),
1756                        state.resolve_full_name(entry.name(), entry.conn_id()),
1757                        item_id
1758                    );
1759                }
1760
1761                // Drop any schemas.
1762                let schemas = delta
1763                    .schemas
1764                    .iter()
1765                    .map(|(schema_spec, database_spec)| {
1766                        (SchemaId::from(schema_spec), *database_spec)
1767                    })
1768                    .collect();
1769                tx.remove_schemas(&schemas)?;
1770
1771                for (schema_spec, database_spec) in delta.schemas {
1772                    let schema = state.get_schema(
1773                        &database_spec,
1774                        &schema_spec,
1775                        session
1776                            .map(|session| session.conn_id())
1777                            .unwrap_or(&SYSTEM_CONN_ID),
1778                    );
1779
1780                    let schema_id = SchemaId::from(schema_spec);
1781                    let database_id = match database_spec {
1782                        ResolvedDatabaseSpecifier::Ambient => None,
1783                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1784                    };
1785
1786                    CatalogState::add_to_audit_log(
1787                        &state.system_configuration,
1788                        oracle_write_ts,
1789                        session,
1790                        tx,
1791                        audit_events,
1792                        EventType::Drop,
1793                        ObjectType::Schema,
1794                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1795                            id: schema_id.to_string(),
1796                            name: schema.name.schema.to_string(),
1797                            database_name: database_id
1798                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1799                        }),
1800                    )?;
1801                }
1802
1803                // Drop any databases.
1804                tx.remove_databases(&delta.databases)?;
1805
1806                for database_id in delta.databases {
1807                    let database = state.get_database(&database_id).clone();
1808
1809                    CatalogState::add_to_audit_log(
1810                        &state.system_configuration,
1811                        oracle_write_ts,
1812                        session,
1813                        tx,
1814                        audit_events,
1815                        EventType::Drop,
1816                        ObjectType::Database,
1817                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1818                            id: database_id.to_string(),
1819                            name: database.name.clone(),
1820                        }),
1821                    )?;
1822                }
1823
1824                // Drop any roles.
1825                tx.remove_user_roles(&delta.roles)?;
1826
1827                for role_id in delta.roles {
1828                    let role = state
1829                        .roles_by_id
1830                        .get(&role_id)
1831                        .expect("catalog out of sync");
1832
1833                    CatalogState::add_to_audit_log(
1834                        &state.system_configuration,
1835                        oracle_write_ts,
1836                        session,
1837                        tx,
1838                        audit_events,
1839                        EventType::Drop,
1840                        ObjectType::Role,
1841                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1842                            id: role.id.to_string(),
1843                            name: role.name.clone(),
1844                        }),
1845                    )?;
1846                    info!("drop role {}", role.name());
1847                }
1848
1849                // Drop any network policies.
1850                tx.remove_network_policies(&delta.network_policies)?;
1851
1852                for network_policy_id in delta.network_policies {
1853                    let policy = state
1854                        .network_policies_by_id
1855                        .get(&network_policy_id)
1856                        .expect("catalog out of sync");
1857
1858                    CatalogState::add_to_audit_log(
1859                        &state.system_configuration,
1860                        oracle_write_ts,
1861                        session,
1862                        tx,
1863                        audit_events,
1864                        EventType::Drop,
1865                        ObjectType::NetworkPolicy,
1866                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1867                            id: policy.id.to_string(),
1868                            name: policy.name.clone(),
1869                        }),
1870                    )?;
1871                    info!("drop network policy {}", policy.name.clone());
1872                }
1873
1874                // Drop any replicas.
1875                let replicas = delta.replicas.keys().copied().collect();
1876                tx.remove_cluster_replicas(&replicas)?;
1877
1878                for (replica_id, (cluster_id, reason)) in delta.replicas {
1879                    let cluster = state.get_cluster(cluster_id);
1880                    let replica = cluster.replica(replica_id).expect("Must exist");
1881
1882                    let (reason, scheduling_policies) = reason.into_audit_log();
1883                    let details =
1884                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1885                            cluster_id: cluster_id.to_string(),
1886                            cluster_name: cluster.name.clone(),
1887                            replica_id: Some(replica_id.to_string()),
1888                            replica_name: replica.name.clone(),
1889                            reason,
1890                            scheduling_policies,
1891                        });
1892                    CatalogState::add_to_audit_log(
1893                        &state.system_configuration,
1894                        oracle_write_ts,
1895                        session,
1896                        tx,
1897                        audit_events,
1898                        EventType::Drop,
1899                        ObjectType::ClusterReplica,
1900                        details,
1901                    )?;
1902                }
1903
1904                // Drop any clusters.
1905                tx.remove_clusters(&delta.clusters)?;
1906
1907                for cluster_id in delta.clusters {
1908                    let cluster = state.get_cluster(cluster_id);
1909
1910                    CatalogState::add_to_audit_log(
1911                        &state.system_configuration,
1912                        oracle_write_ts,
1913                        session,
1914                        tx,
1915                        audit_events,
1916                        EventType::Drop,
1917                        ObjectType::Cluster,
1918                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1919                            id: cluster.id.to_string(),
1920                            name: cluster.name.clone(),
1921                        }),
1922                    )?;
1923                }
1924            }
1925            Op::GrantRole {
1926                role_id,
1927                member_id,
1928                grantor_id,
1929            } => {
1930                state.ensure_not_reserved_role(&member_id)?;
1931                state.ensure_grantable_role(&role_id)?;
1932                if state.collect_role_membership(&role_id).contains(&member_id) {
1933                    let group_role = state.get_role(&role_id);
1934                    let member_role = state.get_role(&member_id);
1935                    return Err(AdapterError::Catalog(Error::new(
1936                        ErrorKind::CircularRoleMembership {
1937                            role_name: group_role.name().to_string(),
1938                            member_name: member_role.name().to_string(),
1939                        },
1940                    )));
1941                }
1942                let mut member_role = state.get_role(&member_id).clone();
1943                member_role.membership.map.insert(role_id, grantor_id);
1944                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1945
1946                CatalogState::add_to_audit_log(
1947                    &state.system_configuration,
1948                    oracle_write_ts,
1949                    session,
1950                    tx,
1951                    audit_events,
1952                    EventType::Grant,
1953                    ObjectType::Role,
1954                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1955                        role_id: role_id.to_string(),
1956                        member_id: member_id.to_string(),
1957                        grantor_id: grantor_id.to_string(),
1958                        executed_by: session
1959                            .map(|session| session.authenticated_role_id())
1960                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1961                            .to_string(),
1962                    }),
1963                )?;
1964            }
1965            Op::RevokeRole {
1966                role_id,
1967                member_id,
1968                grantor_id,
1969            } => {
1970                state.ensure_not_reserved_role(&member_id)?;
1971                state.ensure_grantable_role(&role_id)?;
1972                let mut member_role = state.get_role(&member_id).clone();
1973                member_role.membership.map.remove(&role_id);
1974                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1975
1976                CatalogState::add_to_audit_log(
1977                    &state.system_configuration,
1978                    oracle_write_ts,
1979                    session,
1980                    tx,
1981                    audit_events,
1982                    EventType::Revoke,
1983                    ObjectType::Role,
1984                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1985                        role_id: role_id.to_string(),
1986                        member_id: member_id.to_string(),
1987                        grantor_id: grantor_id.to_string(),
1988                        executed_by: session
1989                            .map(|session| session.authenticated_role_id())
1990                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1991                            .to_string(),
1992                    }),
1993                )?;
1994            }
1995            Op::UpdatePrivilege {
1996                target_id,
1997                privilege,
1998                variant,
1999            } => {
2000                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2001                    UpdatePrivilegeVariant::Grant => {
2002                        privileges.grant(privilege);
2003                    }
2004                    UpdatePrivilegeVariant::Revoke => {
2005                        privileges.revoke(&privilege);
2006                    }
2007                };
2008                match &target_id {
2009                    SystemObjectId::Object(object_id) => match object_id {
2010                        ObjectId::Cluster(id) => {
2011                            let mut cluster = state.get_cluster(*id).clone();
2012                            update_privilege_fn(&mut cluster.privileges);
2013                            tx.update_cluster(*id, cluster.into())?;
2014                        }
2015                        ObjectId::Database(id) => {
2016                            let mut database = state.get_database(id).clone();
2017                            update_privilege_fn(&mut database.privileges);
2018                            tx.update_database(*id, database.into())?;
2019                        }
2020                        ObjectId::NetworkPolicy(id) => {
2021                            let mut policy = state.get_network_policy(id).clone();
2022                            update_privilege_fn(&mut policy.privileges);
2023                            tx.update_network_policy(*id, policy.into())?;
2024                        }
2025                        ObjectId::Schema((database_spec, schema_spec)) => {
2026                            let schema_id = schema_spec.clone().into();
2027                            let mut schema = state
2028                                .get_schema(
2029                                    database_spec,
2030                                    schema_spec,
2031                                    session
2032                                        .map(|session| session.conn_id())
2033                                        .unwrap_or(&SYSTEM_CONN_ID),
2034                                )
2035                                .clone();
2036                            update_privilege_fn(&mut schema.privileges);
2037                            tx.update_schema(schema_id, schema.into())?;
2038                        }
2039                        ObjectId::Item(id) => {
2040                            let entry = state.get_entry(id);
2041                            let mut new_entry = entry.clone();
2042                            update_privilege_fn(&mut new_entry.privileges);
2043                            if !new_entry.item().is_temporary() {
2044                                tx.update_item(*id, new_entry.into())?;
2045                            } else {
2046                                temporary_item_updates
2047                                    .push((entry.clone().into(), StateDiff::Retraction));
2048                                temporary_item_updates
2049                                    .push((new_entry.into(), StateDiff::Addition));
2050                            }
2051                        }
2052                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2053                    },
2054                    SystemObjectId::System => {
2055                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2056                        update_privilege_fn(&mut system_privileges);
2057                        let new_privilege =
2058                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2059                        tx.set_system_privilege(
2060                            privilege.grantee,
2061                            privilege.grantor,
2062                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2063                        )?;
2064                    }
2065                }
2066                let object_type = state.get_system_object_type(&target_id);
2067                let object_id_str = match &target_id {
2068                    SystemObjectId::System => "SYSTEM".to_string(),
2069                    SystemObjectId::Object(id) => id.to_string(),
2070                };
2071                CatalogState::add_to_audit_log(
2072                    &state.system_configuration,
2073                    oracle_write_ts,
2074                    session,
2075                    tx,
2076                    audit_events,
2077                    variant.into(),
2078                    system_object_type_to_audit_object_type(&object_type),
2079                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2080                        object_id: object_id_str,
2081                        grantee_id: privilege.grantee.to_string(),
2082                        grantor_id: privilege.grantor.to_string(),
2083                        privileges: privilege.acl_mode.to_string(),
2084                    }),
2085                )?;
2086            }
2087            Op::UpdateDefaultPrivilege {
2088                privilege_object,
2089                privilege_acl_item,
2090                variant,
2091            } => {
2092                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2093                match variant {
2094                    UpdatePrivilegeVariant::Grant => default_privileges
2095                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2096                    UpdatePrivilegeVariant::Revoke => {
2097                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2098                    }
2099                }
2100                let new_acl_mode = default_privileges
2101                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2102                tx.set_default_privilege(
2103                    privilege_object.role_id,
2104                    privilege_object.database_id,
2105                    privilege_object.schema_id,
2106                    privilege_object.object_type,
2107                    privilege_acl_item.grantee,
2108                    new_acl_mode.cloned(),
2109                )?;
2110                CatalogState::add_to_audit_log(
2111                    &state.system_configuration,
2112                    oracle_write_ts,
2113                    session,
2114                    tx,
2115                    audit_events,
2116                    variant.into(),
2117                    object_type_to_audit_object_type(privilege_object.object_type),
2118                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2119                        role_id: privilege_object.role_id.to_string(),
2120                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2121                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2122                        grantee_id: privilege_acl_item.grantee.to_string(),
2123                        privileges: privilege_acl_item.acl_mode.to_string(),
2124                    }),
2125                )?;
2126            }
2127            Op::RenameCluster {
2128                id,
2129                name,
2130                to_name,
2131                check_reserved_names,
2132            } => {
2133                if id.is_system() {
2134                    return Err(AdapterError::Catalog(Error::new(
2135                        ErrorKind::ReadOnlyCluster(name.clone()),
2136                    )));
2137                }
2138                if check_reserved_names && is_reserved_name(&to_name) {
2139                    return Err(AdapterError::Catalog(Error::new(
2140                        ErrorKind::ReservedClusterName(to_name),
2141                    )));
2142                }
2143                tx.rename_cluster(id, &name, &to_name)?;
2144                CatalogState::add_to_audit_log(
2145                    &state.system_configuration,
2146                    oracle_write_ts,
2147                    session,
2148                    tx,
2149                    audit_events,
2150                    EventType::Alter,
2151                    ObjectType::Cluster,
2152                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2153                        id: id.to_string(),
2154                        old_name: name.clone(),
2155                        new_name: to_name.clone(),
2156                    }),
2157                )?;
2158                info!("rename cluster {name} to {to_name}");
2159            }
2160            Op::RenameClusterReplica {
2161                cluster_id,
2162                replica_id,
2163                name,
2164                to_name,
2165            } => {
2166                if is_reserved_name(&to_name) {
2167                    return Err(AdapterError::Catalog(Error::new(
2168                        ErrorKind::ReservedReplicaName(to_name),
2169                    )));
2170                }
2171                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2172                CatalogState::add_to_audit_log(
2173                    &state.system_configuration,
2174                    oracle_write_ts,
2175                    session,
2176                    tx,
2177                    audit_events,
2178                    EventType::Alter,
2179                    ObjectType::ClusterReplica,
2180                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2181                        cluster_id: cluster_id.to_string(),
2182                        replica_id: replica_id.to_string(),
2183                        old_name: name.replica.as_str().to_string(),
2184                        new_name: to_name.clone(),
2185                    }),
2186                )?;
2187                info!("rename cluster replica {name} to {to_name}");
2188            }
2189            Op::RenameItem {
2190                id,
2191                to_name,
2192                current_full_name,
2193            } => {
2194                let mut updates = Vec::new();
2195
2196                let entry = state.get_entry(&id);
2197                if let CatalogItem::Type(_) = entry.item() {
2198                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2199                        current_full_name.to_string(),
2200                    ))));
2201                }
2202
2203                if entry.id().is_system() {
2204                    let name = state
2205                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2206                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2207                        name.to_string(),
2208                    ))));
2209                }
2210
2211                let mut to_full_name = current_full_name.clone();
2212                to_full_name.item.clone_from(&to_name);
2213
2214                let mut to_qualified_name = entry.name().clone();
2215                to_qualified_name.item.clone_from(&to_name);
2216
2217                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2218                    id: id.to_string(),
2219                    old_name: Self::full_name_detail(&current_full_name),
2220                    new_name: Self::full_name_detail(&to_full_name),
2221                });
2222                if Self::should_audit_log_item(entry.item()) {
2223                    CatalogState::add_to_audit_log(
2224                        &state.system_configuration,
2225                        oracle_write_ts,
2226                        session,
2227                        tx,
2228                        audit_events,
2229                        EventType::Alter,
2230                        catalog_type_to_audit_object_type(entry.item().typ()),
2231                        details,
2232                    )?;
2233                }
2234
2235                // Rename item itself.
2236                let mut new_entry = entry.clone();
2237                new_entry.name.item.clone_from(&to_name);
2238                new_entry.item = entry
2239                    .item()
2240                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2241                    .map_err(|e| {
2242                        Error::new(ErrorKind::from(AmbiguousRename {
2243                            depender: state
2244                                .resolve_full_name(entry.name(), entry.conn_id())
2245                                .to_string(),
2246                            dependee: state
2247                                .resolve_full_name(entry.name(), entry.conn_id())
2248                                .to_string(),
2249                            message: e,
2250                        }))
2251                    })?;
2252
2253                for id in entry.referenced_by() {
2254                    let dependent_item = state.get_entry(id);
2255                    let mut to_entry = dependent_item.clone();
2256                    to_entry.item = dependent_item
2257                        .item()
2258                        .rename_item_refs(
2259                            current_full_name.clone(),
2260                            to_full_name.item.clone(),
2261                            false,
2262                        )
2263                        .map_err(|e| {
2264                            Error::new(ErrorKind::from(AmbiguousRename {
2265                                depender: state
2266                                    .resolve_full_name(
2267                                        dependent_item.name(),
2268                                        dependent_item.conn_id(),
2269                                    )
2270                                    .to_string(),
2271                                dependee: state
2272                                    .resolve_full_name(entry.name(), entry.conn_id())
2273                                    .to_string(),
2274                                message: e,
2275                            }))
2276                        })?;
2277
2278                    if !to_entry.item().is_temporary() {
2279                        tx.update_item(*id, to_entry.into())?;
2280                    } else {
2281                        temporary_item_updates
2282                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2283                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2284                    }
2285                    updates.push(*id);
2286                }
2287                if !new_entry.item().is_temporary() {
2288                    tx.update_item(id, new_entry.into())?;
2289                } else {
2290                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2291                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2292                }
2293
2294                updates.push(id);
2295                for id in updates {
2296                    Self::log_update(state, &id);
2297                }
2298            }
2299            Op::RenameSchema {
2300                database_spec,
2301                schema_spec,
2302                new_name,
2303                check_reserved_names,
2304            } => {
2305                if check_reserved_names && is_reserved_name(&new_name) {
2306                    return Err(AdapterError::Catalog(Error::new(
2307                        ErrorKind::ReservedSchemaName(new_name),
2308                    )));
2309                }
2310
2311                let conn_id = session
2312                    .map(|session| session.conn_id())
2313                    .unwrap_or(&SYSTEM_CONN_ID);
2314
2315                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2316                let cur_name = schema.name().schema.clone();
2317
2318                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2319                    return Err(AdapterError::Catalog(Error::new(
2320                        ErrorKind::AmbientSchemaRename(cur_name),
2321                    )));
2322                };
2323                let database = state.get_database(&database_id);
2324                let database_name = &database.name;
2325
2326                let mut updates = Vec::new();
2327                let mut items_to_update = BTreeMap::new();
2328
2329                let mut update_item = |id| {
2330                    if items_to_update.contains_key(id) {
2331                        return Ok(());
2332                    }
2333
2334                    let entry = state.get_entry(id);
2335
2336                    // Update our item.
2337                    let mut new_entry = entry.clone();
2338                    new_entry.item = entry
2339                        .item
2340                        .rename_schema_refs(database_name, &cur_name, &new_name)
2341                        .map_err(|(s, _i)| {
2342                            Error::new(ErrorKind::from(AmbiguousRename {
2343                                depender: state
2344                                    .resolve_full_name(entry.name(), entry.conn_id())
2345                                    .to_string(),
2346                                dependee: format!("{database_name}.{cur_name}"),
2347                                message: format!("ambiguous reference to schema named {s}"),
2348                            }))
2349                        })?;
2350
2351                    // Queue updates for Catalog storage and Builtin Tables.
2352                    if !new_entry.item().is_temporary() {
2353                        items_to_update.insert(*id, new_entry.into());
2354                    } else {
2355                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2356                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2357                    }
2358                    updates.push(id);
2359
2360                    Ok::<_, AdapterError>(())
2361                };
2362
2363                // Update all of the items in the schema.
2364                for (_name, item_id) in &schema.items {
2365                    // Update the item itself.
2366                    update_item(item_id)?;
2367
2368                    // Update everything that depends on this item.
2369                    for id in state.get_entry(item_id).referenced_by() {
2370                        update_item(id)?;
2371                    }
2372                }
2373                // Note: When updating the transaction it's very important that we update the
2374                // items as a whole group, otherwise we exhibit quadratic behavior.
2375                tx.update_items(items_to_update)?;
2376
2377                // Renaming temporary schemas is not supported.
2378                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2379                    let schema_name = schema.name().schema.clone();
2380                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2381                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2382                    )));
2383                };
2384
2385                // Add an entry to the audit log.
2386                let database_name = database_spec
2387                    .id()
2388                    .map(|id| state.get_database(&id).name.clone());
2389                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2390                    id: schema_id.to_string(),
2391                    old_name: schema.name().schema.clone(),
2392                    new_name: new_name.clone(),
2393                    database_name,
2394                });
2395                CatalogState::add_to_audit_log(
2396                    &state.system_configuration,
2397                    oracle_write_ts,
2398                    session,
2399                    tx,
2400                    audit_events,
2401                    EventType::Alter,
2402                    mz_audit_log::ObjectType::Schema,
2403                    details,
2404                )?;
2405
2406                // Update the schema itself.
2407                let mut new_schema = schema.clone();
2408                new_schema.name.schema.clone_from(&new_name);
2409                tx.update_schema(schema_id, new_schema.into())?;
2410
2411                for id in updates {
2412                    Self::log_update(state, id);
2413                }
2414            }
2415            Op::UpdateOwner { id, new_owner } => {
2416                let conn_id = session
2417                    .map(|session| session.conn_id())
2418                    .unwrap_or(&SYSTEM_CONN_ID);
2419                let old_owner = state
2420                    .get_owner_id(&id, conn_id)
2421                    .expect("cannot update the owner of an object without an owner");
2422                match &id {
2423                    ObjectId::Cluster(id) => {
2424                        let mut cluster = state.get_cluster(*id).clone();
2425                        if id.is_system() {
2426                            return Err(AdapterError::Catalog(Error::new(
2427                                ErrorKind::ReadOnlyCluster(cluster.name),
2428                            )));
2429                        }
2430                        Self::update_privilege_owners(
2431                            &mut cluster.privileges,
2432                            cluster.owner_id,
2433                            new_owner,
2434                        );
2435                        cluster.owner_id = new_owner;
2436                        tx.update_cluster(*id, cluster.into())?;
2437                    }
2438                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2439                        let cluster = state.get_cluster(*cluster_id);
2440                        let mut replica = cluster
2441                            .replica(*replica_id)
2442                            .expect("catalog out of sync")
2443                            .clone();
2444                        if replica_id.is_system() {
2445                            return Err(AdapterError::Catalog(Error::new(
2446                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2447                            )));
2448                        }
2449                        replica.owner_id = new_owner;
2450                        tx.update_cluster_replica(*replica_id, replica.into())?;
2451                    }
2452                    ObjectId::Database(id) => {
2453                        let mut database = state.get_database(id).clone();
2454                        if id.is_system() {
2455                            return Err(AdapterError::Catalog(Error::new(
2456                                ErrorKind::ReadOnlyDatabase(database.name),
2457                            )));
2458                        }
2459                        Self::update_privilege_owners(
2460                            &mut database.privileges,
2461                            database.owner_id,
2462                            new_owner,
2463                        );
2464                        database.owner_id = new_owner;
2465                        tx.update_database(*id, database.clone().into())?;
2466                    }
2467                    ObjectId::Schema((database_spec, schema_spec)) => {
2468                        let schema_id: SchemaId = schema_spec.clone().into();
2469                        let mut schema = state
2470                            .get_schema(database_spec, schema_spec, conn_id)
2471                            .clone();
2472                        if schema_id.is_system() {
2473                            let name = schema.name();
2474                            let full_name = state.resolve_full_schema_name(name);
2475                            return Err(AdapterError::Catalog(Error::new(
2476                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2477                            )));
2478                        }
2479                        Self::update_privilege_owners(
2480                            &mut schema.privileges,
2481                            schema.owner_id,
2482                            new_owner,
2483                        );
2484                        schema.owner_id = new_owner;
2485                        tx.update_schema(schema_id, schema.into())?;
2486                    }
2487                    ObjectId::Item(id) => {
2488                        let entry = state.get_entry(id);
2489                        let mut new_entry = entry.clone();
2490                        if id.is_system() {
2491                            let full_name = state.resolve_full_name(
2492                                new_entry.name(),
2493                                session.map(|session| session.conn_id()),
2494                            );
2495                            return Err(AdapterError::Catalog(Error::new(
2496                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2497                            )));
2498                        }
2499                        Self::update_privilege_owners(
2500                            &mut new_entry.privileges,
2501                            new_entry.owner_id,
2502                            new_owner,
2503                        );
2504                        new_entry.owner_id = new_owner;
2505                        if !new_entry.item().is_temporary() {
2506                            tx.update_item(*id, new_entry.into())?;
2507                        } else {
2508                            temporary_item_updates
2509                                .push((entry.clone().into(), StateDiff::Retraction));
2510                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2511                        }
2512                    }
2513                    ObjectId::NetworkPolicy(id) => {
2514                        let mut policy = state.get_network_policy(id).clone();
2515                        if id.is_system() {
2516                            return Err(AdapterError::Catalog(Error::new(
2517                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2518                            )));
2519                        }
2520                        Self::update_privilege_owners(
2521                            &mut policy.privileges,
2522                            policy.owner_id,
2523                            new_owner,
2524                        );
2525                        policy.owner_id = new_owner;
2526                        tx.update_network_policy(*id, policy.into())?;
2527                    }
2528                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2529                }
2530                let object_type = state.get_object_type(&id);
2531                CatalogState::add_to_audit_log(
2532                    &state.system_configuration,
2533                    oracle_write_ts,
2534                    session,
2535                    tx,
2536                    audit_events,
2537                    EventType::Alter,
2538                    object_type_to_audit_object_type(object_type),
2539                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2540                        object_id: id.to_string(),
2541                        old_owner_id: old_owner.to_string(),
2542                        new_owner_id: new_owner.to_string(),
2543                    }),
2544                )?;
2545            }
2546            Op::UpdateClusterConfig { id, name, config } => {
2547                let mut cluster = state.get_cluster(id).clone();
2548                cluster.config = config;
2549                tx.update_cluster(id, cluster.into())?;
2550                info!("update cluster {}", name);
2551
2552                CatalogState::add_to_audit_log(
2553                    &state.system_configuration,
2554                    oracle_write_ts,
2555                    session,
2556                    tx,
2557                    audit_events,
2558                    EventType::Alter,
2559                    ObjectType::Cluster,
2560                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2561                        id: id.to_string(),
2562                        name,
2563                    }),
2564                )?;
2565            }
2566            Op::UpdateClusterReplicaConfig {
2567                replica_id,
2568                cluster_id,
2569                config,
2570            } => {
2571                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2572                info!("update replica {}", replica.name);
2573                tx.update_cluster_replica(
2574                    replica_id,
2575                    mz_catalog::durable::ClusterReplica {
2576                        cluster_id,
2577                        replica_id,
2578                        name: replica.name.clone(),
2579                        config: config.clone().into(),
2580                        owner_id: replica.owner_id,
2581                    },
2582                )?;
2583            }
2584            Op::UpdateItem { id, name, to_item } => {
2585                let mut entry = state.get_entry(&id).clone();
2586                entry.name = name.clone();
2587                entry.item = to_item.clone();
2588                tx.update_item(id, entry.into())?;
2589
2590                if Self::should_audit_log_item(&to_item) {
2591                    let mut full_name = Self::full_name_detail(
2592                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2593                    );
2594                    full_name.item = name.item;
2595
2596                    CatalogState::add_to_audit_log(
2597                        &state.system_configuration,
2598                        oracle_write_ts,
2599                        session,
2600                        tx,
2601                        audit_events,
2602                        EventType::Alter,
2603                        catalog_type_to_audit_object_type(to_item.typ()),
2604                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2605                            id: id.to_string(),
2606                            name: full_name,
2607                        }),
2608                    )?;
2609                }
2610
2611                Self::log_update(state, &id);
2612            }
2613            Op::UpdateSystemConfiguration { name, value } => {
2614                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2615                tx.upsert_system_config(&name, parsed_value.clone())?;
2616                // This mirrors some "system vars" into the catalog storage
2617                // "config" collection so that we can toggle the flag with
2618                // Launch Darkly, but use it in boot before Launch Darkly is
2619                // available.
2620                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2621                    let with_0dt_deployment_max_wait =
2622                        Duration::parse(VarInput::Flat(&parsed_value))
2623                            .expect("parsing succeeded above");
2624                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2625                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2626                    let with_0dt_deployment_ddl_check_interval =
2627                        Duration::parse(VarInput::Flat(&parsed_value))
2628                            .expect("parsing succeeded above");
2629                    tx.set_0dt_deployment_ddl_check_interval(
2630                        with_0dt_deployment_ddl_check_interval,
2631                    )?;
2632                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2633                    let panic_after_timeout =
2634                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2635                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2636                }
2637
2638                CatalogState::add_to_audit_log(
2639                    &state.system_configuration,
2640                    oracle_write_ts,
2641                    session,
2642                    tx,
2643                    audit_events,
2644                    EventType::Alter,
2645                    ObjectType::System,
2646                    EventDetails::SetV1(mz_audit_log::SetV1 {
2647                        name,
2648                        value: Some(value.borrow().to_vec().join(", ")),
2649                    }),
2650                )?;
2651            }
2652            Op::ResetSystemConfiguration { name } => {
2653                tx.remove_system_config(&name);
2654                // This mirrors some "system vars" into the catalog storage
2655                // "config" collection so that we can toggle the flag with
2656                // Launch Darkly, but use it in boot before Launch Darkly is
2657                // available.
2658                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2659                    tx.reset_0dt_deployment_max_wait()?;
2660                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2661                    tx.reset_0dt_deployment_ddl_check_interval()?;
2662                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2663                    tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2664                }
2665
2666                CatalogState::add_to_audit_log(
2667                    &state.system_configuration,
2668                    oracle_write_ts,
2669                    session,
2670                    tx,
2671                    audit_events,
2672                    EventType::Alter,
2673                    ObjectType::System,
2674                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2675                )?;
2676            }
2677            Op::ResetAllSystemConfiguration => {
2678                tx.clear_system_configs();
2679                tx.reset_0dt_deployment_max_wait()?;
2680                tx.reset_0dt_deployment_ddl_check_interval()?;
2681                tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2682
2683                CatalogState::add_to_audit_log(
2684                    &state.system_configuration,
2685                    oracle_write_ts,
2686                    session,
2687                    tx,
2688                    audit_events,
2689                    EventType::Alter,
2690                    ObjectType::System,
2691                    EventDetails::ResetAllV1,
2692                )?;
2693            }
2694            Op::InjectAuditEvents { events } => {
2695                for event in events {
2696                    let id = tx.allocate_audit_log_id()?;
2697                    let ev = VersionedEvent::new(
2698                        id,
2699                        event.event_type,
2700                        event.object_type,
2701                        event.details,
2702                        event.user,
2703                        oracle_write_ts.into(),
2704                    );
2705                    audit_events.push(ev.clone());
2706                    tx.insert_audit_log_event(ev);
2707                }
2708            }
2709        };
2710        Ok(temporary_item_updates)
2711    }
2712
2713    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2714        let entry = state.get_entry(id);
2715        info!(
2716            "update {} {} ({})",
2717            entry.item_type(),
2718            state.resolve_full_name(entry.name(), entry.conn_id()),
2719            id
2720        );
2721    }
2722
2723    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2724    /// implementation:
2725    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2726    fn update_privilege_owners(
2727        privileges: &mut PrivilegeMap,
2728        old_owner: RoleId,
2729        new_owner: RoleId,
2730    ) {
2731        // TODO(jkosh44) Would be nice not to clone every privilege.
2732        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2733
2734        let mut new_present = false;
2735        for privilege in flat_privileges.iter_mut() {
2736            // Old owner's granted privilege are updated to be granted by the new
2737            // owner.
2738            if privilege.grantor == old_owner {
2739                privilege.grantor = new_owner;
2740            } else if privilege.grantor == new_owner {
2741                new_present = true;
2742            }
2743            // Old owner's privileges is given to the new owner.
2744            if privilege.grantee == old_owner {
2745                privilege.grantee = new_owner;
2746            } else if privilege.grantee == new_owner {
2747                new_present = true;
2748            }
2749        }
2750
2751        // If the old privilege list contained references to the new owner, we may
2752        // have created duplicate entries. Here we try and consolidate them. This
2753        // is inspired by PostgreSQL's algorithm but not identical.
2754        if new_present {
2755            // Group privileges by (grantee, grantor).
2756            let privilege_map: BTreeMap<_, Vec<_>> =
2757                flat_privileges
2758                    .into_iter()
2759                    .fold(BTreeMap::new(), |mut accum, privilege| {
2760                        accum
2761                            .entry((privilege.grantee, privilege.grantor))
2762                            .or_default()
2763                            .push(privilege);
2764                        accum
2765                    });
2766
2767            // Consolidate and update all privileges.
2768            flat_privileges = privilege_map
2769                .into_iter()
2770                .map(|((grantee, grantor), values)|
2771                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2772                    values.into_iter().fold(
2773                        MzAclItem::empty(grantee, grantor),
2774                        |mut accum, mz_aclitem| {
2775                            accum.acl_mode =
2776                                accum.acl_mode.union(mz_aclitem.acl_mode);
2777                            accum
2778                        },
2779                    ))
2780                .collect();
2781        }
2782
2783        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2784    }
2785}
2786
2787/// Prepare the given transaction for replacing a catalog item with a new version.
2788///
2789/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2790/// dependent objects to refer to that new ID (at a previous version).
2791///
2792/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2793/// for catalog items. We currently think that there are no use cases that require this assumption,
2794/// but no way to know for sure.
2795fn tx_replace_item(
2796    tx: &mut Transaction<'_>,
2797    state: &CatalogState,
2798    id: CatalogItemId,
2799    new_entry: CatalogEntry,
2800) -> Result<(), AdapterError> {
2801    let new_id = new_entry.id;
2802
2803    // Rewrite dependent objects to point to the new ID.
2804    for use_id in new_entry.referenced_by() {
2805        // The dependent might be dropped in the same tx, so check.
2806        if tx.get_item(use_id).is_none() {
2807            continue;
2808        }
2809
2810        let mut dependent = state.get_entry(use_id).clone();
2811        dependent.item = dependent.item.replace_item_refs(id, new_id);
2812        tx.update_item(*use_id, dependent.into())?;
2813    }
2814
2815    // Move comments to the new ID.
2816    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2817    let new_comment_id = new_entry.comment_object_id();
2818    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2819        tx.drop_comments(&[old_comment_id].into())?;
2820        for (sub, comment) in comments {
2821            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2822        }
2823    }
2824
2825    let mz_catalog::durable::Item {
2826        id: _,
2827        oid,
2828        global_id,
2829        schema_id,
2830        name,
2831        create_sql,
2832        owner_id,
2833        privileges,
2834        extra_versions,
2835    } = new_entry.into();
2836
2837    tx.remove_item(id)?;
2838    tx.insert_item(
2839        new_id,
2840        oid,
2841        global_id,
2842        schema_id,
2843        &name,
2844        create_sql,
2845        owner_id,
2846        privileges,
2847        extra_versions,
2848    )?;
2849
2850    Ok(())
2851}
2852
2853/// Generate audit events for a replacement apply operation.
2854fn apply_replacement_audit_events(
2855    state: &CatalogState,
2856    target: &CatalogEntry,
2857    replacement: &CatalogEntry,
2858) -> Vec<(EventType, EventDetails)> {
2859    let mut events = Vec::new();
2860
2861    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2862    let target_id_name = IdFullNameV1 {
2863        id: target.id().to_string(),
2864        name: Catalog::full_name_detail(&target_name),
2865    };
2866    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2867    let replacement_id_name = IdFullNameV1 {
2868        id: replacement.id().to_string(),
2869        name: Catalog::full_name_detail(&replacement_name),
2870    };
2871
2872    if Catalog::should_audit_log_item(&replacement.item) {
2873        events.push((
2874            EventType::Drop,
2875            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2876        ));
2877    }
2878
2879    if Catalog::should_audit_log_item(&target.item) {
2880        events.push((
2881            EventType::Alter,
2882            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2883                target: target_id_name.clone(),
2884                replacement: replacement_id_name,
2885            }),
2886        ));
2887
2888        if let Some(old_cluster_id) = target.cluster_id()
2889            && let Some(new_cluster_id) = replacement.cluster_id()
2890            && old_cluster_id != new_cluster_id
2891        {
2892            // When the replacement is applied, the target takes on the ID of the replacement, so
2893            // we should use that ID for subsequent events.
2894            events.push((
2895                EventType::Alter,
2896                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2897                    id: replacement.id().to_string(),
2898                    name: target_id_name.name,
2899                    old_cluster_id: old_cluster_id.to_string(),
2900                    new_cluster_id: new_cluster_id.to_string(),
2901                }),
2902            ));
2903        }
2904    }
2905
2906    events
2907}
2908
2909/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2910///
2911/// Note: Previously we used to omit a single `Op::DropObject` for every object
2912/// we needed to drop. But removing a batch of objects from a durable Catalog
2913/// Transaction is O(n) where `n` is the number of objects that exist in the
2914/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2915/// `DROP ... CASCADE` statement.
2916#[derive(Debug, Default)]
2917pub(crate) struct ObjectsToDrop {
2918    pub comments: BTreeSet<CommentObjectId>,
2919    pub databases: BTreeSet<DatabaseId>,
2920    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2921    pub clusters: BTreeSet<ClusterId>,
2922    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2923    pub roles: BTreeSet<RoleId>,
2924    pub items: Vec<CatalogItemId>,
2925    pub network_policies: BTreeSet<NetworkPolicyId>,
2926}
2927
2928impl ObjectsToDrop {
2929    pub fn generate(
2930        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2931        state: &CatalogState,
2932        session: Option<&ConnMeta>,
2933    ) -> Result<Self, AdapterError> {
2934        let mut delta = ObjectsToDrop::default();
2935
2936        for drop_object_info in drop_object_infos {
2937            delta.add_item(drop_object_info, state, session)?;
2938        }
2939
2940        Ok(delta)
2941    }
2942
2943    fn add_item(
2944        &mut self,
2945        drop_object_info: DropObjectInfo,
2946        state: &CatalogState,
2947        session: Option<&ConnMeta>,
2948    ) -> Result<(), AdapterError> {
2949        self.comments
2950            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2951
2952        match drop_object_info {
2953            DropObjectInfo::Database(database_id) => {
2954                let database = &state.database_by_id[&database_id];
2955                if database_id.is_system() {
2956                    return Err(AdapterError::Catalog(Error::new(
2957                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2958                    )));
2959                }
2960
2961                self.databases.insert(database_id);
2962            }
2963            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2964                let schema = state.get_schema(
2965                    &database_spec,
2966                    &schema_spec,
2967                    session
2968                        .map(|session| session.conn_id())
2969                        .unwrap_or(&SYSTEM_CONN_ID),
2970                );
2971                let schema_id: SchemaId = schema_spec.into();
2972                if schema_id.is_system() {
2973                    let name = schema.name();
2974                    let full_name = state.resolve_full_schema_name(name);
2975                    return Err(AdapterError::Catalog(Error::new(
2976                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2977                    )));
2978                }
2979
2980                self.schemas.insert(schema_spec, database_spec);
2981            }
2982            DropObjectInfo::Role(role_id) => {
2983                let name = state.get_role(&role_id).name().to_string();
2984                if role_id.is_system() || role_id.is_predefined() {
2985                    return Err(AdapterError::Catalog(Error::new(
2986                        ErrorKind::ReservedRoleName(name.clone()),
2987                    )));
2988                }
2989                state.ensure_not_reserved_role(&role_id)?;
2990
2991                self.roles.insert(role_id);
2992            }
2993            DropObjectInfo::Cluster(cluster_id) => {
2994                let cluster = state.get_cluster(cluster_id);
2995                let name = &cluster.name;
2996                if cluster_id.is_system() {
2997                    return Err(AdapterError::Catalog(Error::new(
2998                        ErrorKind::ReadOnlyCluster(name.clone()),
2999                    )));
3000                }
3001
3002                self.clusters.insert(cluster_id);
3003            }
3004            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3005                let cluster = state.get_cluster(cluster_id);
3006                let replica = cluster.replica(replica_id).expect("Must exist");
3007
3008                self.replicas
3009                    .insert(replica.replica_id, (cluster.id, reason));
3010
3011                // Implicitly drop materialized views that target this replica.
3012                // When the target replica is gone, no replica advances the
3013                // persist shard's upper frontier, causing reads to hang.
3014                //
3015                // Also cascade to anything depending on the implicitly-dropped
3016                // MV so we don't leave dangling references in the catalog.
3017                // Plan-driven drops already include these via
3018                // `cluster_replica_dependents`; this branch handles internal
3019                // callers that build `Op::DropObjects` directly without going
3020                // through the plan stage.
3021                let mut seen: BTreeSet<ObjectId> =
3022                    self.items.iter().copied().map(ObjectId::Item).collect();
3023                for item_id in &cluster.bound_objects {
3024                    let entry = state.get_entry(item_id);
3025                    if let CatalogItem::MaterializedView(mv) = entry.item()
3026                        && mv.target_replica == Some(replica_id)
3027                        && !seen.contains(&ObjectId::Item(*item_id))
3028                    {
3029                        info!(
3030                            "implicitly dropping materialized view {} because target replica was dropped",
3031                            entry.name().item,
3032                        );
3033                        for dep in state.item_dependents(*item_id, &mut seen) {
3034                            if let ObjectId::Item(dep_id) = dep {
3035                                self.items.push(dep_id);
3036                            }
3037                        }
3038                    }
3039                }
3040            }
3041            DropObjectInfo::Item(item_id) => {
3042                let entry = state.get_entry(&item_id);
3043                if item_id.is_system() {
3044                    let name = entry.name();
3045                    let full_name =
3046                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
3047                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3048                        full_name.to_string(),
3049                    ))));
3050                }
3051
3052                self.items.push(item_id);
3053            }
3054            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3055                let policy = state.get_network_policy(&network_policy_id);
3056                let name = &policy.name;
3057                if network_policy_id.is_system() {
3058                    return Err(AdapterError::Catalog(Error::new(
3059                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3060                    )));
3061                }
3062
3063                self.network_policies.insert(network_policy_id);
3064            }
3065        }
3066
3067        Ok(())
3068    }
3069}
3070
3071#[cfg(test)]
3072mod tests {
3073    use mz_catalog::SYSTEM_CONN_ID;
3074    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3075    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3076    use mz_repr::role_id::RoleId;
3077    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3078    use mz_sql::DEFAULT_SCHEMA;
3079    use mz_sql::catalog::CatalogDatabase;
3080    use mz_sql::names::{
3081        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3082    };
3083    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3084
3085    use crate::catalog::{Catalog, Op};
3086    use crate::session::DEFAULT_DATABASE_NAME;
3087
3088    #[mz_ore::test]
3089    fn test_update_privilege_owners() {
3090        let old_owner = RoleId::User(1);
3091        let new_owner = RoleId::User(2);
3092        let other_role = RoleId::User(3);
3093
3094        // older owner exists as grantor.
3095        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3096            MzAclItem {
3097                grantee: other_role,
3098                grantor: old_owner,
3099                acl_mode: AclMode::UPDATE,
3100            },
3101            MzAclItem {
3102                grantee: other_role,
3103                grantor: new_owner,
3104                acl_mode: AclMode::SELECT,
3105            },
3106        ]);
3107        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3108        assert_eq!(1, privileges.all_values().count());
3109        assert_eq!(
3110            vec![MzAclItem {
3111                grantee: other_role,
3112                grantor: new_owner,
3113                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3114            }],
3115            privileges.all_values_owned().collect::<Vec<_>>()
3116        );
3117
3118        // older owner exists as grantee.
3119        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3120            MzAclItem {
3121                grantee: old_owner,
3122                grantor: other_role,
3123                acl_mode: AclMode::UPDATE,
3124            },
3125            MzAclItem {
3126                grantee: new_owner,
3127                grantor: other_role,
3128                acl_mode: AclMode::SELECT,
3129            },
3130        ]);
3131        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3132        assert_eq!(1, privileges.all_values().count());
3133        assert_eq!(
3134            vec![MzAclItem {
3135                grantee: new_owner,
3136                grantor: other_role,
3137                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3138            }],
3139            privileges.all_values_owned().collect::<Vec<_>>()
3140        );
3141
3142        // older owner exists as grantee and grantor.
3143        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3144            MzAclItem {
3145                grantee: old_owner,
3146                grantor: old_owner,
3147                acl_mode: AclMode::UPDATE,
3148            },
3149            MzAclItem {
3150                grantee: new_owner,
3151                grantor: new_owner,
3152                acl_mode: AclMode::SELECT,
3153            },
3154        ]);
3155        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3156        assert_eq!(1, privileges.all_values().count());
3157        assert_eq!(
3158            vec![MzAclItem {
3159                grantee: new_owner,
3160                grantor: new_owner,
3161                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3162            }],
3163            privileges.all_values_owned().collect::<Vec<_>>()
3164        );
3165    }
3166
3167    /// Verifies that `transact_incremental_dry_run` processes only new ops
3168    /// against the accumulated state, not all ops from scratch. Two paths are
3169    /// compared:
3170    ///   - Incremental: two separate calls, each with one op
3171    ///   - All-at-once: one call with both ops
3172    /// Both must produce equivalent catalog state.
3173    #[mz_ore::test(tokio::test)]
3174    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3175    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3176        Catalog::with_debug(|catalog| async move {
3177            // Resolve default database and schema.
3178            let database = catalog
3179                .resolve_database(DEFAULT_DATABASE_NAME)
3180                .expect("default database");
3181            let database_name = database.name.clone();
3182            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3183            let schema = catalog
3184                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3185                .expect("default schema");
3186            let schema_name = schema.name.schema.clone();
3187            let schema_spec = schema.id.clone();
3188
3189            // Allocate IDs for two tables.
3190            let (id_t1, global_id_t1) = catalog
3191                .allocate_user_id_for_test()
3192                .await
3193                .expect("allocate id for t1");
3194            let (id_t2, global_id_t2) = catalog
3195                .allocate_user_id_for_test()
3196                .await
3197                .expect("allocate id for t2");
3198
3199            let oracle_write_ts = catalog.current_upper().await;
3200
3201            // Build two CreateItem ops.
3202            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3203                id,
3204                name: QualifiedItemName {
3205                    qualifiers: ItemQualifiers {
3206                        database_spec: database_spec.clone(),
3207                        schema_spec: schema_spec.clone(),
3208                    },
3209                    item: name.to_string(),
3210                },
3211                item: CatalogItem::Table(Table {
3212                    create_sql: Some(format!(
3213                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3214                    )),
3215                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3216                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3217                    conn_id: None,
3218                    resolved_ids: ResolvedIds::empty(),
3219                    custom_logical_compaction_window: None,
3220                    is_retained_metrics_object: false,
3221                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3222                }),
3223                owner_id: MZ_SYSTEM_ROLE_ID,
3224            };
3225
3226            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3227            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3228
3229            let base_state = catalog.state().clone();
3230
3231            // --- Path A: Incremental (two separate dry-run calls) ---
3232
3233            // First call: only op_t1, no previous snapshot.
3234            let (state_after_t1, snapshot_after_t1) = catalog
3235                .transact_incremental_dry_run(
3236                    &base_state,
3237                    vec![op_t1.clone()],
3238                    None,
3239                    None,
3240                    oracle_write_ts,
3241                )
3242                .await
3243                .expect("first dry run");
3244
3245            // After first dry run: t1 exists, t2 does not.
3246            assert!(
3247                state_after_t1.try_get_entry(&id_t1).is_some(),
3248                "t1 should exist after first dry run"
3249            );
3250            assert_eq!(
3251                state_after_t1
3252                    .try_get_entry(&id_t1)
3253                    .expect("t1 entry")
3254                    .name()
3255                    .item,
3256                "t1"
3257            );
3258            assert!(
3259                state_after_t1.try_get_entry(&id_t2).is_none(),
3260                "t2 should NOT exist after first dry run"
3261            );
3262
3263            // Second call: only op_t2, using state/snapshot from first call.
3264            let (state_incremental, _) = catalog
3265                .transact_incremental_dry_run(
3266                    &state_after_t1,
3267                    vec![op_t2.clone()],
3268                    None,
3269                    Some(snapshot_after_t1),
3270                    oracle_write_ts,
3271                )
3272                .await
3273                .expect("second dry run");
3274
3275            // After second dry run: both t1 and t2 exist.
3276            assert!(
3277                state_incremental.try_get_entry(&id_t1).is_some(),
3278                "t1 should exist in incremental result"
3279            );
3280            assert!(
3281                state_incremental.try_get_entry(&id_t2).is_some(),
3282                "t2 should exist in incremental result"
3283            );
3284
3285            // --- Path B: All-at-once (single dry-run call with both ops) ---
3286
3287            let (state_all_at_once, _) = catalog
3288                .transact_incremental_dry_run(
3289                    &base_state,
3290                    vec![op_t1.clone(), op_t2.clone()],
3291                    None,
3292                    None,
3293                    oracle_write_ts,
3294                )
3295                .await
3296                .expect("all-at-once dry run");
3297
3298            assert!(
3299                state_all_at_once.try_get_entry(&id_t1).is_some(),
3300                "t1 should exist in all-at-once result"
3301            );
3302            assert!(
3303                state_all_at_once.try_get_entry(&id_t2).is_some(),
3304                "t2 should exist in all-at-once result"
3305            );
3306
3307            // --- Compare: both paths produce equivalent items ---
3308
3309            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3310            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3311            assert_eq!(inc_t1.name(), all_t1.name());
3312            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3313
3314            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3315            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3316            assert_eq!(inc_t2.name(), all_t2.name());
3317            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3318
3319            catalog.expire().await;
3320        })
3321        .await
3322    }
3323}