Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::optimize::OptimizerFeatures;
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, ColumnName, GlobalId, SqlColumnType, strconv};
47use mz_sql::ast::RawDataType;
48use mz_sql::catalog::{
49    AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
50    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
51    DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
52    RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use serde::{Deserialize, Serialize};
66use tracing::{info, trace};
67
68use crate::AdapterError;
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
72    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
73    is_reserved_role_name, object_type_to_audit_object_type,
74    system_object_type_to_audit_object_type,
75};
76use crate::config::{ScopedParameters, ScopedParametersScope};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82/// A manually injected audit event.
83///
84/// Matches [`mz_audit_log::EventV1`], but without the `id` and `occurred_at` fields -- both are
85/// filled in automatically.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88    pub event_type: EventType,
89    pub object_type: ObjectType,
90    pub details: EventDetails,
91    pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96    AlterRetainHistory {
97        id: CatalogItemId,
98        value: Option<Value>,
99        window: CompactionWindow,
100    },
101    AlterSourceTimestampInterval {
102        id: CatalogItemId,
103        value: Option<Value>,
104        interval: Duration,
105    },
106    AlterRole {
107        id: RoleId,
108        name: String,
109        attributes: RoleAttributesRaw,
110        nopassword: bool,
111        vars: RoleVars,
112    },
113    AlterNetworkPolicy {
114        id: NetworkPolicyId,
115        rules: Vec<NetworkPolicyRule>,
116        name: String,
117        owner_id: RoleId,
118    },
119    AlterAddColumn {
120        id: CatalogItemId,
121        new_global_id: GlobalId,
122        name: ColumnName,
123        typ: SqlColumnType,
124        sql: RawDataType,
125    },
126    AlterMaterializedViewApplyReplacement {
127        id: CatalogItemId,
128        replacement_id: CatalogItemId,
129    },
130    CreateDatabase {
131        name: String,
132        owner_id: RoleId,
133    },
134    CreateSchema {
135        database_id: ResolvedDatabaseSpecifier,
136        schema_name: String,
137        owner_id: RoleId,
138    },
139    CreateRole {
140        name: String,
141        attributes: RoleAttributesRaw,
142    },
143    CreateCluster {
144        id: ClusterId,
145        name: String,
146        introspection_sources: Vec<&'static BuiltinLog>,
147        owner_id: RoleId,
148        config: ClusterConfig,
149    },
150    CreateClusterReplica {
151        cluster_id: ClusterId,
152        replica_id: ReplicaId,
153        name: String,
154        config: ReplicaConfig,
155        owner_id: RoleId,
156        reason: ReplicaCreateDropReason,
157    },
158    CreateItem {
159        id: CatalogItemId,
160        name: QualifiedItemName,
161        item: CatalogItem,
162        owner_id: RoleId,
163    },
164    CreateNetworkPolicy {
165        rules: Vec<NetworkPolicyRule>,
166        name: String,
167        owner_id: RoleId,
168    },
169    Comment {
170        object_id: CommentObjectId,
171        sub_component: Option<usize>,
172        comment: Option<String>,
173    },
174    DropObjects(Vec<DropObjectInfo>),
175    GrantRole {
176        role_id: RoleId,
177        member_id: RoleId,
178        grantor_id: RoleId,
179    },
180    RenameCluster {
181        id: ClusterId,
182        name: String,
183        to_name: String,
184        check_reserved_names: bool,
185    },
186    RenameClusterReplica {
187        cluster_id: ClusterId,
188        replica_id: ReplicaId,
189        name: QualifiedReplica,
190        to_name: String,
191    },
192    RenameItem {
193        id: CatalogItemId,
194        current_full_name: FullItemName,
195        to_name: String,
196    },
197    RenameSchema {
198        database_spec: ResolvedDatabaseSpecifier,
199        schema_spec: SchemaSpecifier,
200        new_name: String,
201        check_reserved_names: bool,
202    },
203    UpdateOwner {
204        id: ObjectId,
205        new_owner: RoleId,
206    },
207    UpdatePrivilege {
208        target_id: SystemObjectId,
209        privilege: MzAclItem,
210        variant: UpdatePrivilegeVariant,
211    },
212    UpdateDefaultPrivilege {
213        privilege_object: DefaultPrivilegeObject,
214        privilege_acl_item: DefaultPrivilegeAclItem,
215        variant: UpdatePrivilegeVariant,
216    },
217    RevokeRole {
218        role_id: RoleId,
219        member_id: RoleId,
220        grantor_id: RoleId,
221    },
222    UpdateClusterConfig {
223        id: ClusterId,
224        name: String,
225        config: ClusterConfig,
226    },
227    UpdateClusterReplicaConfig {
228        cluster_id: ClusterId,
229        replica_id: ReplicaId,
230        config: ReplicaConfig,
231    },
232    UpdateItem {
233        id: CatalogItemId,
234        name: QualifiedItemName,
235        to_item: CatalogItem,
236    },
237    UpdateSourceReferences {
238        source_id: CatalogItemId,
239        references: SourceReferences,
240    },
241    UpdateSystemConfiguration {
242        name: String,
243        value: OwnedVarInput,
244    },
245    ResetSystemConfiguration {
246        name: String,
247    },
248    ResetAllSystemConfiguration,
249    /// Persists the durable cache of scoped (per-cluster and per-replica)
250    /// system parameters towards `scoped`. The handler diffs against the current
251    /// durable contents: it upserts changed/added entries and removes entries
252    /// the update no longer serves.
253    ///
254    /// `prune_scope` bounds removals to the objects the update was evaluated
255    /// for. `Some(scope)` removes a row only when its owning object is in
256    /// `scope`, so an object created after the update's evaluation snapshot, and
257    /// the override it folded into its own create transaction, survives a
258    /// concurrent full-state reconcile. `None` is an unscoped full replace, used
259    /// only by the disabled-feature clear path where no create-time writes race.
260    UpdateScopedSystemParameters {
261        scoped: ScopedParameters,
262        prune_scope: Option<ScopedParametersScope>,
263    },
264    /// Injects audit events into the catalog.
265    ///
266    /// This is a nonstandard path used for manually appending audit events at the current time.
267    /// Mainly useful for correcting the audit log in the face of bugs that made us forget to emit
268    /// audit events.
269    InjectAuditEvents {
270        events: Vec<InjectedAuditEvent>,
271    },
272}
273
274/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
275/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
276/// the `Op::DropObjects`.
277#[derive(Debug, Clone)]
278pub enum DropObjectInfo {
279    Cluster(ClusterId),
280    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
281    Database(DatabaseId),
282    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
283    Role(RoleId),
284    Item(CatalogItemId),
285    NetworkPolicy(NetworkPolicyId),
286}
287
288impl DropObjectInfo {
289    /// Creates a `DropObjectInfo` from an `ObjectId`.
290    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
291    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
292        match id {
293            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
294            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
295                cluster_id,
296                replica_id,
297                ReplicaCreateDropReason::Manual,
298            )),
299            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
300            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
301            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
302            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
303            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
304        }
305    }
306
307    /// Creates an `ObjectId` from a `DropObjectInfo`.
308    /// Loses the `ReplicaCreateDropReason` if there is one!
309    fn to_object_id(&self) -> ObjectId {
310        match &self {
311            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
312            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
313                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
314            }
315            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
316            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
317            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
318            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
319            DropObjectInfo::NetworkPolicy(network_policy_id) => {
320                ObjectId::NetworkPolicy(network_policy_id.clone())
321            }
322        }
323    }
324}
325
326/// The reason for creating or dropping a replica.
327#[derive(Debug, Clone)]
328pub enum ReplicaCreateDropReason {
329    /// The user initiated the replica create or drop, e.g., by
330    /// - creating/dropping a cluster,
331    /// - ALTERing various options on a managed cluster,
332    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
333    Manual,
334    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
335    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
336    ClusterScheduling(Vec<SchedulingDecision>),
337}
338
339impl ReplicaCreateDropReason {
340    pub fn into_audit_log(
341        self,
342    ) -> (
343        CreateOrDropClusterReplicaReasonV1,
344        Option<SchedulingDecisionsWithReasonsV2>,
345    ) {
346        let (reason, scheduling_policies) = match self {
347            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
348            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
349                CreateOrDropClusterReplicaReasonV1::Schedule,
350                Some(scheduling_decisions),
351            ),
352        };
353        (
354            reason,
355            scheduling_policies
356                .as_ref()
357                .map(SchedulingDecision::reasons_to_audit_log_reasons),
358        )
359    }
360}
361
362pub struct TransactionResult {
363    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
364    /// Parsed catalog updates from which we will derive catalog implications.
365    pub catalog_updates: Vec<ParsedStateUpdate>,
366    pub audit_events: Vec<VersionedEvent>,
367}
368
369#[derive(Debug, Clone, Copy)]
370enum TransactInnerMode {
371    /// Prepare storage state and return a commit-ready transaction state.
372    ///
373    /// This mode is used by durable catalog transactions.
374    Commit,
375    /// Execute a dry run against a durable transaction that will not be
376    /// committed.
377    ///
378    /// This mode still validates and applies updates to an in-memory
379    /// `CatalogState`, but it must not call `prepare_state` because dry runs
380    /// must not trigger controller side effects.
381    DryRun,
382}
383
384impl Catalog {
385    fn should_audit_log_item(item: &CatalogItem) -> bool {
386        !item.is_temporary()
387    }
388
389    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
390    /// within a connection id.
391    fn temporary_ids(
392        &self,
393        ops: &[Op],
394        temporary_drops: BTreeSet<(&ConnectionId, String)>,
395    ) -> Result<BTreeSet<CatalogItemId>, Error> {
396        let mut creating = BTreeSet::new();
397        let mut temporary_ids = BTreeSet::new();
398        for op in ops.iter() {
399            if let Op::CreateItem {
400                id,
401                name,
402                item,
403                owner_id: _,
404            } = op
405            {
406                if let Some(conn_id) = item.conn_id() {
407                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
408                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
409                        || creating.contains(&(conn_id, &name.item))
410                    {
411                        return Err(
412                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
413                        );
414                    } else {
415                        creating.insert((conn_id, &name.item));
416                        temporary_ids.insert(id.clone());
417                    }
418                }
419            }
420        }
421        Ok(temporary_ids)
422    }
423
424    #[instrument(name = "catalog::transact")]
425    pub async fn transact(
426        &mut self,
427        // n.b. this is an option to prevent us from needing to build out a
428        // dummy impl of `StorageController` for tests.
429        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
430        oracle_write_ts: mz_repr::Timestamp,
431        session: Option<&ConnMeta>,
432        ops: Vec<Op>,
433    ) -> Result<TransactionResult, AdapterError> {
434        trace!("transact: {:?}", ops);
435        fail::fail_point!("catalog_transact", |arg| {
436            Err(AdapterError::Unstructured(anyhow::anyhow!(
437                "failpoint: {arg:?}"
438            )))
439        });
440
441        let drop_ids: BTreeSet<CatalogItemId> = ops
442            .iter()
443            .filter_map(|op| match op {
444                Op::DropObjects(drop_object_infos) => {
445                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
446                    let item_ids = ids.filter_map(|id| match id {
447                        ObjectId::Item(id) => Some(id),
448                        _ => None,
449                    });
450                    Some(item_ids)
451                }
452                _ => None,
453            })
454            .flatten()
455            .collect();
456        let temporary_drops = drop_ids
457            .iter()
458            .filter_map(|id| {
459                let entry = self.get_entry(id);
460                match entry.item().conn_id() {
461                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
462                    None => None,
463                }
464            })
465            .collect();
466
467        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
468        let mut builtin_table_updates = vec![];
469        let mut catalog_updates = vec![];
470        let mut audit_events = vec![];
471        let mut storage = self.storage().await;
472        let mut tx = storage
473            .transaction()
474            .await
475            .unwrap_or_terminate("starting catalog transaction");
476
477        let new_state = Self::transact_inner(
478            TransactInnerMode::Commit,
479            storage_collections,
480            oracle_write_ts,
481            session,
482            ops,
483            temporary_ids,
484            &mut builtin_table_updates,
485            &mut catalog_updates,
486            &mut audit_events,
487            &mut tx,
488            &self.state,
489        )
490        .await?;
491
492        // The user closure was successful, apply the updates. Terminate the
493        // process if this fails, because we have to restart envd due to
494        // indeterminate catalog state, which we only reconcile during catalog
495        // init.
496        tx.commit(oracle_write_ts)
497            .await
498            .unwrap_or_terminate("catalog storage transaction commit must succeed");
499
500        // Dropping here keeps the mutable borrow on self, preventing us accidentally
501        // mutating anything until after f is executed.
502        drop(storage);
503        if let Some(new_state) = new_state {
504            self.transient_revision += 1;
505            self.state = new_state;
506        }
507
508        Ok(TransactionResult {
509            builtin_table_updates,
510            catalog_updates,
511            audit_events,
512        })
513    }
514
515    /// Performs an incremental dry-run catalog transaction: processes only the
516    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
517    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
518    ///
519    /// The durable transaction is intentionally never committed and no storage
520    /// controller prepare-state side effects are run.
521    ///
522    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
523    /// snapshot (which represents the tx state after the previous dry run),
524    /// ensuring it starts in sync with `base_state`. If `None` (first
525    /// statement), a fresh transaction is loaded from durable storage.
526    ///
527    /// Returns the new accumulated state and a snapshot of the transaction's
528    /// state for use in subsequent incremental dry runs.
529    pub async fn transact_incremental_dry_run(
530        &self,
531        base_state: &CatalogState,
532        ops: Vec<Op>,
533        session: Option<&ConnMeta>,
534        prev_snapshot: Option<Snapshot>,
535        oracle_write_ts: mz_repr::Timestamp,
536    ) -> Result<(CatalogState, Snapshot), AdapterError> {
537        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
538        // but we still need to check for collisions.
539        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
540
541        let mut builtin_table_updates = vec![];
542        let mut catalog_updates = vec![];
543        let mut audit_events = vec![];
544        let mut storage = self.storage().await;
545        let mut tx = if let Some(snapshot) = prev_snapshot {
546            // Restore transaction from saved snapshot so it starts in sync
547            // with the accumulated CatalogState from previous dry runs.
548            storage
549                .transaction_from_snapshot(snapshot)
550                .unwrap_or_terminate("starting catalog transaction from snapshot")
551        } else {
552            // First statement: fresh transaction from durable storage, which
553            // is in sync with the real catalog state.
554            storage
555                .transaction()
556                .await
557                .unwrap_or_terminate("starting catalog transaction")
558        };
559
560        // Process only the new ops against the accumulated state in dry-run mode.
561        let new_state = Self::transact_inner(
562            TransactInnerMode::DryRun,
563            None,
564            oracle_write_ts,
565            session,
566            ops,
567            temporary_ids,
568            &mut builtin_table_updates,
569            &mut catalog_updates,
570            &mut audit_events,
571            &mut tx,
572            base_state,
573        )
574        .await?;
575
576        // Save the transaction's current state as a snapshot for the next
577        // incremental dry run.
578        let new_snapshot = tx.current_snapshot();
579
580        // Transaction is NOT committed — drop it.
581        drop(storage);
582
583        // transact_inner returns Some(state) when ops produced changes.
584        let state = new_state.unwrap_or_else(|| base_state.clone());
585        Ok((state, new_snapshot))
586    }
587
588    /// Extracts optimized expressions from `Op::CreateItem` operations for views
589    /// and materialized views. These can be used to populate a `LocalExpressionCache`
590    /// to avoid re-optimization during `apply_updates`.
591    fn extract_expressions_from_ops(
592        ops: &[Op],
593        optimizer_features: &OptimizerFeatures,
594    ) -> BTreeMap<GlobalId, LocalExpressions> {
595        let mut exprs = BTreeMap::new();
596
597        for op in ops {
598            if let Op::CreateItem { item, .. } = op {
599                match item {
600                    CatalogItem::View(view) => {
601                        exprs.insert(
602                            view.global_id,
603                            LocalExpressions {
604                                local_mir: (*view.locally_optimized_expr).clone(),
605                                optimizer_features: optimizer_features.clone(),
606                            },
607                        );
608                    }
609                    CatalogItem::MaterializedView(mv) => {
610                        exprs.insert(
611                            mv.global_id_writes(),
612                            LocalExpressions {
613                                local_mir: (*mv.locally_optimized_expr).clone(),
614                                optimizer_features: optimizer_features.clone(),
615                            },
616                        );
617                    }
618                    CatalogItem::Table(_)
619                    | CatalogItem::Source(_)
620                    | CatalogItem::Log(_)
621                    | CatalogItem::Sink(_)
622                    | CatalogItem::Index(_)
623                    | CatalogItem::Type(_)
624                    | CatalogItem::Func(_)
625                    | CatalogItem::Secret(_)
626                    | CatalogItem::Connection(_) => {}
627                }
628            }
629        }
630
631        exprs
632    }
633
634    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
635    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
636    ///
637    /// `mode` controls whether storage prepare-state side effects are allowed.
638    /// In `DryRun` mode, this method may update the in-memory state returned to
639    /// the caller, but it must not trigger controller side effects.
640    ///
641    #[instrument(name = "catalog::transact_inner")]
642    async fn transact_inner(
643        mode: TransactInnerMode,
644        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
645        oracle_write_ts: mz_repr::Timestamp,
646        session: Option<&ConnMeta>,
647        ops: Vec<Op>,
648        temporary_ids: BTreeSet<CatalogItemId>,
649        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
650        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
651        audit_events: &mut Vec<VersionedEvent>,
652        tx: &mut Transaction<'_>,
653        state: &CatalogState,
654    ) -> Result<Option<CatalogState>, AdapterError> {
655        // We come up with new catalog state, builtin state updates, and parsed
656        // catalog updates (for deriving catalog implications) in two phases:
657        //
658        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
659        //    one-by-one. This will give us the full list of updates to apply to
660        //    the catalog, which will allow us to apply it in one batch, which
661        //    in turn will allow the apply machinery to consolidate the updates.
662        // 2. We do one final apply call with all updates, which gives us the
663        //    final builtin table updates and parsed catalog updates.
664        //
665        // The reason is that the loop that is working off ops first does a
666        // transact_op to derive the state updates for that op, and then calls
667        // apply_updates on the catalog state. And successive ops might expect
668        // the catalog state to reflect the modified state _after_ applying
669        // previous ops.
670        //
671        // We want to, however, have one final apply_state that takes all the
672        // accumulated updates to derive the required controller updates and the
673        // builtin table updates.
674        //
675        // We won't win any DDL throughput benchmarks, but so far that's not
676        // what we're optimizing for and there would probably be other
677        // bottlenecks before we hit this one as a bottleneck.
678        //
679        // We could work around this by refactoring how the interplay of
680        // transact_op and apply_updates works, but that's a larger undertaking.
681        let mut preliminary_state = Cow::Borrowed(state);
682
683        // The final state that we will return, if modified.
684        let mut state = Cow::Borrowed(state);
685
686        if ops.is_empty() {
687            return Ok(None);
688        }
689
690        // Extract optimized expressions from CreateItem ops to avoid re-optimization
691        // during apply_updates. We extract before the loop since `ops` is moved there.
692        let optimizer_features = OptimizerFeatures::from(state.system_config());
693        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
694
695        let mut storage_collections_to_create = BTreeSet::new();
696        let mut storage_collections_to_drop = BTreeSet::new();
697        let mut storage_collections_to_register = BTreeMap::new();
698
699        let mut updates = Vec::new();
700
701        for op in ops {
702            let temporary_item_updates = Self::transact_op(
703                oracle_write_ts,
704                session,
705                op,
706                &temporary_ids,
707                audit_events,
708                tx,
709                &*preliminary_state,
710                &mut storage_collections_to_create,
711                &mut storage_collections_to_drop,
712                &mut storage_collections_to_register,
713            )
714            .await?;
715
716            // Temporary items are not stored in the durable catalog, so they need to be handled
717            // separately for updating state and builtin tables.
718            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
719            // in a multi-subscriber catalog world.
720            let upper = tx.upper();
721            let temporary_item_updates =
722                temporary_item_updates
723                    .into_iter()
724                    .map(|(item, diff)| StateUpdate {
725                        kind: StateUpdateKind::TemporaryItem(item),
726                        ts: upper,
727                        diff,
728                    });
729
730            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
731            op_updates.extend(temporary_item_updates);
732            if !op_updates.is_empty() {
733                // Clone the cache so each apply_updates call has access to cached expressions.
734                // The cache uses `remove` semantics, so we need a fresh clone for each call.
735                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
736                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
737                    .to_mut()
738                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
739                    .await;
740            }
741            updates.append(&mut op_updates);
742        }
743
744        if !updates.is_empty() {
745            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
746            let (op_builtin_table_updates, op_catalog_updates) = state
747                .to_mut()
748                .apply_updates(updates.clone(), &mut local_expr_cache)
749                .await;
750            let op_builtin_table_updates = state
751                .to_mut()
752                .resolve_builtin_table_updates(op_builtin_table_updates);
753            builtin_table_updates.extend(op_builtin_table_updates);
754            parsed_catalog_updates.extend(op_catalog_updates);
755        }
756
757        match mode {
758            TransactInnerMode::Commit => {
759                // `storage_collections` can be `None` in tests.
760                if let Some(c) = storage_collections {
761                    c.prepare_state(
762                        tx,
763                        storage_collections_to_create,
764                        storage_collections_to_drop,
765                        storage_collections_to_register,
766                    )
767                    .await?;
768                }
769            }
770            TransactInnerMode::DryRun => {
771                debug_assert!(
772                    storage_collections.is_none(),
773                    "dry-run mode must not prepare storage state"
774                );
775            }
776        }
777
778        let updates = tx.get_and_commit_op_updates();
779        if !updates.is_empty() {
780            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
781            let (op_builtin_table_updates, op_catalog_updates) = state
782                .to_mut()
783                .apply_updates(updates.clone(), &mut local_expr_cache)
784                .await;
785            let op_builtin_table_updates = state
786                .to_mut()
787                .resolve_builtin_table_updates(op_builtin_table_updates);
788            builtin_table_updates.extend(op_builtin_table_updates);
789            parsed_catalog_updates.extend(op_catalog_updates);
790        }
791
792        match state {
793            Cow::Owned(state) => Ok(Some(state)),
794            Cow::Borrowed(_) => Ok(None),
795        }
796    }
797
798    /// Performs the transaction operation described by `op`. This function prepares the changes in
799    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
800    /// changes.
801    ///
802    /// Optionally returns a builtin table update for any builtin table updates than cannot be
803    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
804    /// scenarios and ideally in the future don't exist.
805    #[instrument]
806    async fn transact_op(
807        oracle_write_ts: mz_repr::Timestamp,
808        session: Option<&ConnMeta>,
809        op: Op,
810        temporary_ids: &BTreeSet<CatalogItemId>,
811        audit_events: &mut Vec<VersionedEvent>,
812        tx: &mut Transaction<'_>,
813        state: &CatalogState,
814        storage_collections_to_create: &mut BTreeSet<GlobalId>,
815        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
816        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
817    ) -> Result<Vec<(TemporaryItem, StateDiff)>, AdapterError> {
818        let mut temporary_item_updates = Vec::new();
819
820        match op {
821            Op::AlterRetainHistory { id, value, window } => {
822                let entry = state.get_entry(&id);
823                if id.is_system() {
824                    let name = entry.name();
825                    let full_name =
826                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
827                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
828                        full_name.to_string(),
829                    ))));
830                }
831
832                let mut new_entry = entry.clone();
833                let previous = new_entry
834                    .item
835                    .update_retain_history(value.clone(), window)
836                    .map_err(|_| {
837                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
838                            "planner should have rejected invalid alter retain history item type"
839                                .to_string(),
840                        )))
841                    })?;
842
843                if Self::should_audit_log_item(new_entry.item()) {
844                    let details =
845                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
846                            id: id.to_string(),
847                            old_history: previous.map(|previous| previous.to_string()),
848                            new_history: value.map(|v| v.to_string()),
849                        });
850                    CatalogState::add_to_audit_log(
851                        &state.system_configuration,
852                        oracle_write_ts,
853                        session,
854                        tx,
855                        audit_events,
856                        EventType::Alter,
857                        catalog_type_to_audit_object_type(new_entry.item().typ()),
858                        details,
859                    )?;
860                }
861
862                tx.update_item(id, new_entry.into())?;
863
864                Self::log_update(state, &id);
865            }
866            Op::AlterSourceTimestampInterval {
867                id,
868                value,
869                interval,
870            } => {
871                let entry = state.get_entry(&id);
872                if id.is_system() {
873                    let name = entry.name();
874                    let full_name =
875                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
876                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
877                        full_name.to_string(),
878                    ))));
879                }
880
881                let mut new_entry = entry.clone();
882                let previous = new_entry
883                    .item
884                    .update_timestamp_interval(value.clone(), interval)
885                    .map_err(|_| {
886                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
887                            "planner should have rejected invalid alter timestamp interval item type"
888                                .to_string(),
889                        )))
890                    })?;
891
892                if Self::should_audit_log_item(new_entry.item()) {
893                    let details = EventDetails::AlterSourceTimestampIntervalV1(
894                        mz_audit_log::AlterSourceTimestampIntervalV1 {
895                            id: id.to_string(),
896                            old_interval: previous.map(|previous| previous.to_string()),
897                            new_interval: value.map(|v| v.to_string()),
898                        },
899                    );
900                    CatalogState::add_to_audit_log(
901                        &state.system_configuration,
902                        oracle_write_ts,
903                        session,
904                        tx,
905                        audit_events,
906                        EventType::Alter,
907                        catalog_type_to_audit_object_type(new_entry.item().typ()),
908                        details,
909                    )?;
910                }
911
912                tx.update_item(id, new_entry.into())?;
913
914                Self::log_update(state, &id);
915            }
916            Op::AlterRole {
917                id,
918                name,
919                attributes,
920                nopassword,
921                vars,
922            } => {
923                state.ensure_not_reserved_role(&id)?;
924
925                let mut existing_role = state.get_role(&id).clone();
926                let password = attributes.password.clone();
927                let scram_iterations = attributes
928                    .scram_iterations
929                    .unwrap_or_else(|| state.system_config().scram_iterations());
930                existing_role.attributes = attributes.into();
931                existing_role.vars = vars;
932                let password_action = if nopassword {
933                    PasswordAction::Clear
934                } else if let Some(password) = password {
935                    PasswordAction::Set(PasswordConfig {
936                        password,
937                        scram_iterations,
938                    })
939                } else {
940                    PasswordAction::NoChange
941                };
942                tx.update_role(id, existing_role.into(), password_action)?;
943
944                CatalogState::add_to_audit_log(
945                    &state.system_configuration,
946                    oracle_write_ts,
947                    session,
948                    tx,
949                    audit_events,
950                    EventType::Alter,
951                    ObjectType::Role,
952                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
953                        id: id.to_string(),
954                        name: name.clone(),
955                    }),
956                )?;
957
958                info!("update role {name} ({id})");
959            }
960            Op::AlterNetworkPolicy {
961                id,
962                rules,
963                name,
964                owner_id: _owner_id,
965            } => {
966                let existing_policy = state.get_network_policy(&id).clone();
967                let mut policy: NetworkPolicy = existing_policy.into();
968                policy.rules = rules;
969                if is_reserved_name(&name) {
970                    return Err(AdapterError::Catalog(Error::new(
971                        ErrorKind::ReservedNetworkPolicyName(name),
972                    )));
973                }
974                tx.update_network_policy(id, policy.clone())?;
975
976                CatalogState::add_to_audit_log(
977                    &state.system_configuration,
978                    oracle_write_ts,
979                    session,
980                    tx,
981                    audit_events,
982                    EventType::Alter,
983                    ObjectType::NetworkPolicy,
984                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
985                        id: id.to_string(),
986                        name: name.clone(),
987                    }),
988                )?;
989
990                info!("update network policy {name} ({id})");
991            }
992            Op::AlterAddColumn {
993                id,
994                new_global_id,
995                name,
996                typ,
997                sql,
998            } => {
999                let column_name = name.to_string();
1000                let column_type = typ.to_string();
1001                let nullable = typ.nullable;
1002                let mut new_entry = state.get_entry(&id).clone();
1003                let version = new_entry.item.add_column(name, typ, sql)?;
1004                // All versions of a table share the same shard, so it shouldn't matter what
1005                // GlobalId we use here.
1006                let shard_id = state
1007                    .storage_metadata()
1008                    .get_collection_shard(new_entry.latest_global_id())?;
1009
1010                // TODO(alter_table): Support adding columns to sources.
1011                let CatalogItem::Table(table) = &mut new_entry.item else {
1012                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
1013                };
1014                table.collections.insert(version, new_global_id);
1015
1016                if Self::should_audit_log_item(new_entry.item()) {
1017                    let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
1018                        id: id.to_string(),
1019                        column: column_name,
1020                        column_type,
1021                        nullable,
1022                    });
1023                    CatalogState::add_to_audit_log(
1024                        &state.system_configuration,
1025                        oracle_write_ts,
1026                        session,
1027                        tx,
1028                        audit_events,
1029                        EventType::Alter,
1030                        catalog_type_to_audit_object_type(new_entry.item().typ()),
1031                        details,
1032                    )?;
1033                }
1034
1035                tx.update_item(id, new_entry.into())?;
1036                storage_collections_to_register.insert(new_global_id, shard_id);
1037            }
1038            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1039                let mut new_entry = state.get_entry(&id).clone();
1040                let replacement = state.get_entry(&replacement_id);
1041
1042                let new_audit_events =
1043                    apply_replacement_audit_events(state, &new_entry, replacement);
1044
1045                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1046                    return Err(AdapterError::internal(
1047                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1048                        "id must refer to a materialized view",
1049                    ));
1050                };
1051                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1052                    return Err(AdapterError::internal(
1053                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1054                        "replacement_id must refer to a materialized view",
1055                    ));
1056                };
1057
1058                mv.apply_replacement(replacement_mv.clone());
1059
1060                tx.remove_item(replacement_id)?;
1061
1062                new_entry.id = replacement_id;
1063                tx_replace_item(tx, state, id, new_entry)?;
1064
1065                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1066                tx.drop_comments(&[comment_id].into())?;
1067
1068                for (event_type, details) in new_audit_events {
1069                    CatalogState::add_to_audit_log(
1070                        &state.system_configuration,
1071                        oracle_write_ts,
1072                        session,
1073                        tx,
1074                        audit_events,
1075                        event_type,
1076                        ObjectType::MaterializedView,
1077                        details,
1078                    )?;
1079                }
1080            }
1081            Op::CreateDatabase { name, owner_id } => {
1082                let database_owner_privileges = vec![rbac::owner_privilege(
1083                    mz_sql::catalog::ObjectType::Database,
1084                    owner_id,
1085                )];
1086                let database_default_privileges = state
1087                    .default_privileges
1088                    .get_applicable_privileges(
1089                        owner_id,
1090                        None,
1091                        None,
1092                        mz_sql::catalog::ObjectType::Database,
1093                    )
1094                    .map(|item| item.mz_acl_item(owner_id));
1095                let database_privileges: Vec<_> = merge_mz_acl_items(
1096                    database_owner_privileges
1097                        .into_iter()
1098                        .chain(database_default_privileges),
1099                )
1100                .collect();
1101
1102                let schema_owner_privileges = vec![rbac::owner_privilege(
1103                    mz_sql::catalog::ObjectType::Schema,
1104                    owner_id,
1105                )];
1106                let schema_default_privileges = state
1107                    .default_privileges
1108                    .get_applicable_privileges(
1109                        owner_id,
1110                        None,
1111                        None,
1112                        mz_sql::catalog::ObjectType::Schema,
1113                    )
1114                    .map(|item| item.mz_acl_item(owner_id))
1115                    // Special default privilege on public schemas.
1116                    .chain(std::iter::once(MzAclItem {
1117                        grantee: RoleId::Public,
1118                        grantor: owner_id,
1119                        acl_mode: AclMode::USAGE,
1120                    }));
1121                let schema_privileges: Vec<_> = merge_mz_acl_items(
1122                    schema_owner_privileges
1123                        .into_iter()
1124                        .chain(schema_default_privileges),
1125                )
1126                .collect();
1127
1128                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1129                let (database_id, _) = tx.insert_user_database(
1130                    &name,
1131                    owner_id,
1132                    database_privileges.clone(),
1133                    &temporary_oids,
1134                )?;
1135                let (schema_id, _) = tx.insert_user_schema(
1136                    database_id,
1137                    DEFAULT_SCHEMA,
1138                    owner_id,
1139                    schema_privileges.clone(),
1140                    &temporary_oids,
1141                )?;
1142                CatalogState::add_to_audit_log(
1143                    &state.system_configuration,
1144                    oracle_write_ts,
1145                    session,
1146                    tx,
1147                    audit_events,
1148                    EventType::Create,
1149                    ObjectType::Database,
1150                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1151                        id: database_id.to_string(),
1152                        name: name.clone(),
1153                    }),
1154                )?;
1155                info!("create database {}", name);
1156
1157                CatalogState::add_to_audit_log(
1158                    &state.system_configuration,
1159                    oracle_write_ts,
1160                    session,
1161                    tx,
1162                    audit_events,
1163                    EventType::Create,
1164                    ObjectType::Schema,
1165                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1166                        id: schema_id.to_string(),
1167                        name: DEFAULT_SCHEMA.to_string(),
1168                        database_name: Some(name),
1169                    }),
1170                )?;
1171            }
1172            Op::CreateSchema {
1173                database_id,
1174                schema_name,
1175                owner_id,
1176            } => {
1177                if is_reserved_name(&schema_name) {
1178                    return Err(AdapterError::Catalog(Error::new(
1179                        ErrorKind::ReservedSchemaName(schema_name),
1180                    )));
1181                }
1182                let database_id = match database_id {
1183                    ResolvedDatabaseSpecifier::Id(id) => id,
1184                    ResolvedDatabaseSpecifier::Ambient => {
1185                        return Err(AdapterError::Catalog(Error::new(
1186                            ErrorKind::ReadOnlySystemSchema(schema_name),
1187                        )));
1188                    }
1189                };
1190                let owner_privileges = vec![rbac::owner_privilege(
1191                    mz_sql::catalog::ObjectType::Schema,
1192                    owner_id,
1193                )];
1194                let default_privileges = state
1195                    .default_privileges
1196                    .get_applicable_privileges(
1197                        owner_id,
1198                        Some(database_id),
1199                        None,
1200                        mz_sql::catalog::ObjectType::Schema,
1201                    )
1202                    .map(|item| item.mz_acl_item(owner_id));
1203                let privileges: Vec<_> =
1204                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1205                        .collect();
1206                let (schema_id, _) = tx.insert_user_schema(
1207                    database_id,
1208                    &schema_name,
1209                    owner_id,
1210                    privileges.clone(),
1211                    &state.get_temporary_oids().collect(),
1212                )?;
1213                CatalogState::add_to_audit_log(
1214                    &state.system_configuration,
1215                    oracle_write_ts,
1216                    session,
1217                    tx,
1218                    audit_events,
1219                    EventType::Create,
1220                    ObjectType::Schema,
1221                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1222                        id: schema_id.to_string(),
1223                        name: schema_name.clone(),
1224                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1225                    }),
1226                )?;
1227            }
1228            Op::CreateRole { name, attributes } => {
1229                if is_reserved_role_name(&name) {
1230                    return Err(AdapterError::Catalog(Error::new(
1231                        ErrorKind::ReservedRoleName(name),
1232                    )));
1233                }
1234                let membership = RoleMembership::new();
1235                let vars = RoleVars::default();
1236                let (id, _) = tx.insert_user_role(
1237                    name.clone(),
1238                    attributes.clone(),
1239                    membership.clone(),
1240                    vars.clone(),
1241                    &state.get_temporary_oids().collect(),
1242                )?;
1243                CatalogState::add_to_audit_log(
1244                    &state.system_configuration,
1245                    oracle_write_ts,
1246                    session,
1247                    tx,
1248                    audit_events,
1249                    EventType::Create,
1250                    ObjectType::Role,
1251                    EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1252                        id: id.to_string(),
1253                        name: name.clone(),
1254                        auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1255                            AutoProvisionSource::Oidc => "oidc".to_string(),
1256                            AutoProvisionSource::Frontegg => "frontegg".to_string(),
1257                            AutoProvisionSource::None => "none".to_string(),
1258                        }),
1259                    }),
1260                )?;
1261                info!("create role {}", name);
1262            }
1263            Op::CreateCluster {
1264                id,
1265                name,
1266                introspection_sources,
1267                owner_id,
1268                config,
1269            } => {
1270                if is_reserved_name(&name) {
1271                    return Err(AdapterError::Catalog(Error::new(
1272                        ErrorKind::ReservedClusterName(name),
1273                    )));
1274                }
1275                let owner_privileges = vec![rbac::owner_privilege(
1276                    mz_sql::catalog::ObjectType::Cluster,
1277                    owner_id,
1278                )];
1279                let default_privileges = state
1280                    .default_privileges
1281                    .get_applicable_privileges(
1282                        owner_id,
1283                        None,
1284                        None,
1285                        mz_sql::catalog::ObjectType::Cluster,
1286                    )
1287                    .map(|item| item.mz_acl_item(owner_id));
1288                let privileges: Vec<_> =
1289                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1290                        .collect();
1291                let introspection_source_ids: Vec<_> = introspection_sources
1292                    .iter()
1293                    .map(|introspection_source| {
1294                        Transaction::allocate_introspection_source_index_id(
1295                            &id,
1296                            introspection_source.variant,
1297                        )
1298                    })
1299                    .collect();
1300
1301                let introspection_sources = introspection_sources
1302                    .into_iter()
1303                    .zip_eq(introspection_source_ids)
1304                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1305                    .collect();
1306
1307                tx.insert_user_cluster(
1308                    id,
1309                    &name,
1310                    introspection_sources,
1311                    owner_id,
1312                    privileges.clone(),
1313                    config.clone().into(),
1314                    &state.get_temporary_oids().collect(),
1315                )?;
1316                CatalogState::add_to_audit_log(
1317                    &state.system_configuration,
1318                    oracle_write_ts,
1319                    session,
1320                    tx,
1321                    audit_events,
1322                    EventType::Create,
1323                    ObjectType::Cluster,
1324                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1325                        id: id.to_string(),
1326                        name: name.clone(),
1327                    }),
1328                )?;
1329                info!("create cluster {}", name);
1330            }
1331            Op::CreateClusterReplica {
1332                cluster_id,
1333                replica_id,
1334                name,
1335                config,
1336                owner_id,
1337                reason,
1338            } => {
1339                if is_reserved_name(&name) {
1340                    return Err(AdapterError::Catalog(Error::new(
1341                        ErrorKind::ReservedReplicaName(name),
1342                    )));
1343                }
1344                let cluster = state.get_cluster(cluster_id);
1345                // The replica id is allocated out-of-band by the durable
1346                // allocator before the transaction, mirroring cluster and item
1347                // ids. Nothing allocates a replica id in-apply.
1348                tx.insert_cluster_replica_with_id(
1349                    cluster_id,
1350                    replica_id,
1351                    &name,
1352                    config.clone().into(),
1353                    owner_id,
1354                )?;
1355                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1356                    size,
1357                    billed_as,
1358                    internal,
1359                    ..
1360                }) = &config.location
1361                {
1362                    let (reason, scheduling_policies) = reason.into_audit_log();
1363                    let details = EventDetails::CreateClusterReplicaV4(
1364                        mz_audit_log::CreateClusterReplicaV4 {
1365                            cluster_id: cluster_id.to_string(),
1366                            cluster_name: cluster.name.clone(),
1367                            replica_id: Some(replica_id.to_string()),
1368                            replica_name: name.clone(),
1369                            logical_size: size.clone(),
1370                            billed_as: billed_as.clone(),
1371                            internal: *internal,
1372                            reason,
1373                            scheduling_policies,
1374                        },
1375                    );
1376                    CatalogState::add_to_audit_log(
1377                        &state.system_configuration,
1378                        oracle_write_ts,
1379                        session,
1380                        tx,
1381                        audit_events,
1382                        EventType::Create,
1383                        ObjectType::ClusterReplica,
1384                        details,
1385                    )?;
1386                }
1387            }
1388            Op::CreateItem {
1389                id,
1390                name,
1391                item,
1392                owner_id,
1393            } => {
1394                state.check_unstable_dependencies(&item)?;
1395
1396                match &item {
1397                    CatalogItem::Table(table) => {
1398                        let gids: Vec<_> = table.global_ids().collect();
1399                        assert_eq!(gids.len(), 1);
1400                        storage_collections_to_create.extend(gids);
1401                    }
1402                    CatalogItem::Source(source) => {
1403                        storage_collections_to_create.insert(source.global_id());
1404                    }
1405                    CatalogItem::MaterializedView(mv) => {
1406                        let mv_gid = mv.global_id_writes();
1407                        if let Some(target_id) = mv.replacement_target {
1408                            let target_gid = state.get_entry(&target_id).latest_global_id();
1409                            let shard_id =
1410                                state.storage_metadata().get_collection_shard(target_gid)?;
1411                            storage_collections_to_register.insert(mv_gid, shard_id);
1412                        } else {
1413                            storage_collections_to_create.insert(mv_gid);
1414                        }
1415                    }
1416                    CatalogItem::Sink(sink) => {
1417                        storage_collections_to_create.insert(sink.global_id());
1418                    }
1419                    CatalogItem::Log(_)
1420                    | CatalogItem::View(_)
1421                    | CatalogItem::Index(_)
1422                    | CatalogItem::Type(_)
1423                    | CatalogItem::Func(_)
1424                    | CatalogItem::Secret(_)
1425                    | CatalogItem::Connection(_) => (),
1426                }
1427
1428                let system_user = session.map_or(false, |s| s.user().is_system_user());
1429                if !system_user {
1430                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1431                        let cluster_name = state.clusters_by_id[&id].name.clone();
1432                        return Err(AdapterError::Catalog(Error::new(
1433                            ErrorKind::ReadOnlyCluster(cluster_name),
1434                        )));
1435                    }
1436                }
1437
1438                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1439                let default_privileges = state
1440                    .default_privileges
1441                    .get_applicable_privileges(
1442                        owner_id,
1443                        name.qualifiers.database_spec.id(),
1444                        Some(name.qualifiers.schema_spec.into()),
1445                        item.typ().into(),
1446                    )
1447                    .map(|item| item.mz_acl_item(owner_id));
1448                // mz_support can read all progress sources.
1449                let progress_source_privilege = if item.is_progress_source() {
1450                    Some(MzAclItem {
1451                        grantee: MZ_SUPPORT_ROLE_ID,
1452                        grantor: owner_id,
1453                        acl_mode: AclMode::SELECT,
1454                    })
1455                } else {
1456                    None
1457                };
1458                let privileges: Vec<_> = merge_mz_acl_items(
1459                    owner_privileges
1460                        .into_iter()
1461                        .chain(default_privileges)
1462                        .chain(progress_source_privilege),
1463                )
1464                .collect();
1465
1466                let temporary_oids = state.get_temporary_oids().collect();
1467
1468                if item.is_temporary() {
1469                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1470                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1471                    {
1472                        return Err(AdapterError::Catalog(Error::new(
1473                            ErrorKind::InvalidTemporarySchema,
1474                        )));
1475                    }
1476                    let oid = tx.allocate_oid(&temporary_oids)?;
1477
1478                    let schema_id = name.qualifiers.schema_spec.clone().into();
1479                    let item_type = item.typ();
1480                    let (create_sql, global_id, versions) = item.to_serialized();
1481
1482                    let item = TemporaryItem {
1483                        id,
1484                        oid,
1485                        global_id,
1486                        schema_id,
1487                        name: name.item.clone(),
1488                        create_sql,
1489                        conn_id: item.conn_id().cloned(),
1490                        owner_id,
1491                        privileges: privileges.clone(),
1492                        extra_versions: versions,
1493                    };
1494                    temporary_item_updates.push((item, StateDiff::Addition));
1495
1496                    info!(
1497                        "create temporary {} {} ({})",
1498                        item_type,
1499                        state.resolve_full_name(&name, None),
1500                        id
1501                    );
1502                } else {
1503                    if let Some(temp_id) =
1504                        item.uses()
1505                            .iter()
1506                            .find(|id| match state.try_get_entry(*id) {
1507                                Some(entry) => entry.item().is_temporary(),
1508                                None => temporary_ids.contains(id),
1509                            })
1510                    {
1511                        let temp_item = state.get_entry(temp_id);
1512                        return Err(AdapterError::Catalog(Error::new(
1513                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1514                        )));
1515                    }
1516                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1517                        && !system_user
1518                    {
1519                        let schema_name = state
1520                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1521                            .schema;
1522                        return Err(AdapterError::Catalog(Error::new(
1523                            ErrorKind::ReadOnlySystemSchema(schema_name),
1524                        )));
1525                    }
1526                    let schema_id = name.qualifiers.schema_spec.clone().into();
1527                    let item_type = item.typ();
1528                    let (create_sql, global_id, versions) = item.to_serialized();
1529                    tx.insert_user_item(
1530                        id,
1531                        global_id,
1532                        schema_id,
1533                        &name.item,
1534                        create_sql,
1535                        owner_id,
1536                        privileges.clone(),
1537                        &temporary_oids,
1538                        versions,
1539                    )?;
1540                    info!(
1541                        "create {} {} ({})",
1542                        item_type,
1543                        state.resolve_full_name(&name, None),
1544                        id
1545                    );
1546                }
1547
1548                if Self::should_audit_log_item(&item) {
1549                    let name = Self::full_name_detail(
1550                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1551                    );
1552                    let details = match &item {
1553                        CatalogItem::Source(s) => {
1554                            let cluster_id = match s.data_source {
1555                                // Ingestion exports don't have their own cluster, but
1556                                // run on their ingestion's cluster.
1557                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1558                                    match state.get_entry(&ingestion_id).cluster_id() {
1559                                        Some(cluster_id) => Some(cluster_id.to_string()),
1560                                        None => None,
1561                                    }
1562                                }
1563                                _ => match item.cluster_id() {
1564                                    Some(cluster_id) => Some(cluster_id.to_string()),
1565                                    None => None,
1566                                },
1567                            };
1568
1569                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1570                                id: id.to_string(),
1571                                cluster_id,
1572                                name,
1573                                external_type: s.source_type().to_string(),
1574                            })
1575                        }
1576                        CatalogItem::Sink(s) => {
1577                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1578                                id: id.to_string(),
1579                                cluster_id: Some(s.cluster_id.to_string()),
1580                                name,
1581                                external_type: s.sink_type().to_string(),
1582                            })
1583                        }
1584                        CatalogItem::Index(i) => {
1585                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1586                                id: id.to_string(),
1587                                name,
1588                                cluster_id: i.cluster_id.to_string(),
1589                            })
1590                        }
1591                        CatalogItem::MaterializedView(mv) => {
1592                            EventDetails::CreateMaterializedViewV1(
1593                                mz_audit_log::CreateMaterializedViewV1 {
1594                                    id: id.to_string(),
1595                                    name,
1596                                    cluster_id: mv.cluster_id.to_string(),
1597                                    replacement_target_id: mv
1598                                        .replacement_target
1599                                        .map(|id| id.to_string()),
1600                                },
1601                            )
1602                        }
1603                        CatalogItem::Table(_)
1604                        | CatalogItem::Log(_)
1605                        | CatalogItem::View(_)
1606                        | CatalogItem::Type(_)
1607                        | CatalogItem::Func(_)
1608                        | CatalogItem::Secret(_)
1609                        | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1610                            id: id.to_string(),
1611                            name,
1612                        }),
1613                    };
1614                    CatalogState::add_to_audit_log(
1615                        &state.system_configuration,
1616                        oracle_write_ts,
1617                        session,
1618                        tx,
1619                        audit_events,
1620                        EventType::Create,
1621                        catalog_type_to_audit_object_type(item.typ()),
1622                        details,
1623                    )?;
1624                }
1625            }
1626            Op::CreateNetworkPolicy {
1627                rules,
1628                name,
1629                owner_id,
1630            } => {
1631                if state.network_policies_by_name.contains_key(&name) {
1632                    return Err(AdapterError::PlanError(PlanError::Catalog(
1633                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1634                    )));
1635                }
1636                if is_reserved_name(&name) {
1637                    return Err(AdapterError::Catalog(Error::new(
1638                        ErrorKind::ReservedNetworkPolicyName(name),
1639                    )));
1640                }
1641
1642                let owner_privileges = vec![rbac::owner_privilege(
1643                    mz_sql::catalog::ObjectType::NetworkPolicy,
1644                    owner_id,
1645                )];
1646                let default_privileges = state
1647                    .default_privileges
1648                    .get_applicable_privileges(
1649                        owner_id,
1650                        None,
1651                        None,
1652                        mz_sql::catalog::ObjectType::NetworkPolicy,
1653                    )
1654                    .map(|item| item.mz_acl_item(owner_id));
1655                let privileges: Vec<_> =
1656                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1657                        .collect();
1658
1659                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1660                let id = tx.insert_user_network_policy(
1661                    name.clone(),
1662                    rules,
1663                    privileges,
1664                    owner_id,
1665                    &temporary_oids,
1666                )?;
1667
1668                CatalogState::add_to_audit_log(
1669                    &state.system_configuration,
1670                    oracle_write_ts,
1671                    session,
1672                    tx,
1673                    audit_events,
1674                    EventType::Create,
1675                    ObjectType::NetworkPolicy,
1676                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1677                        id: id.to_string(),
1678                        name: name.clone(),
1679                    }),
1680                )?;
1681
1682                info!("created network policy {name} ({id})");
1683            }
1684            Op::Comment {
1685                object_id,
1686                sub_component,
1687                comment,
1688            } => {
1689                tx.update_comment(object_id, sub_component, comment)?;
1690                let entry = state.get_comment_id_entry(&object_id);
1691                let should_log = entry
1692                    .map(|entry| Self::should_audit_log_item(entry.item()))
1693                    // Things that aren't catalog entries can't be temp, so should be logged.
1694                    .unwrap_or(true);
1695                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1696                // comments won't be logged for now.
1697                if let (Some(conn_id), true) =
1698                    (session.map(|session| session.conn_id()), should_log)
1699                {
1700                    CatalogState::add_to_audit_log(
1701                        &state.system_configuration,
1702                        oracle_write_ts,
1703                        session,
1704                        tx,
1705                        audit_events,
1706                        EventType::Comment,
1707                        comment_id_to_audit_object_type(object_id),
1708                        EventDetails::IdNameV1(IdNameV1 {
1709                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1710                            id: format!("{object_id:?}"),
1711                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1712                        }),
1713                    )?;
1714                }
1715            }
1716            Op::UpdateSourceReferences {
1717                source_id,
1718                references,
1719            } => {
1720                tx.update_source_references(
1721                    source_id,
1722                    references
1723                        .references
1724                        .into_iter()
1725                        .map(|reference| reference.into())
1726                        .collect(),
1727                    references.updated_at,
1728                )?;
1729            }
1730            Op::DropObjects(drop_object_infos) => {
1731                // Generate all of the objects that need to get dropped.
1732                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1733
1734                // Drop any associated comments.
1735                tx.drop_comments(&delta.comments)?;
1736
1737                // Drop any items.
1738                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1739                    delta
1740                        .items
1741                        .iter()
1742                        .map(|id| id)
1743                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1744                tx.remove_items(&durable_items_to_drop)?;
1745                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1746                    let entry = state.get_entry(&id);
1747                    (entry.clone().into(), StateDiff::Retraction)
1748                }));
1749
1750                for item_id in delta.items {
1751                    let entry = state.get_entry(&item_id);
1752
1753                    if entry.item().is_storage_collection() {
1754                        storage_collections_to_drop.extend(entry.global_ids());
1755                    }
1756
1757                    if state.source_references.contains_key(&item_id) {
1758                        tx.remove_source_references(item_id)?;
1759                    }
1760
1761                    if Self::should_audit_log_item(entry.item()) {
1762                        CatalogState::add_to_audit_log(
1763                            &state.system_configuration,
1764                            oracle_write_ts,
1765                            session,
1766                            tx,
1767                            audit_events,
1768                            EventType::Drop,
1769                            catalog_type_to_audit_object_type(entry.item().typ()),
1770                            EventDetails::IdFullNameV1(IdFullNameV1 {
1771                                id: item_id.to_string(),
1772                                name: Self::full_name_detail(&state.resolve_full_name(
1773                                    entry.name(),
1774                                    session.map(|session| session.conn_id()),
1775                                )),
1776                            }),
1777                        )?;
1778                    }
1779                    info!(
1780                        "drop {} {} ({})",
1781                        entry.item_type(),
1782                        state.resolve_full_name(entry.name(), entry.conn_id()),
1783                        item_id
1784                    );
1785                }
1786
1787                // Drop any schemas.
1788                let schemas = delta
1789                    .schemas
1790                    .iter()
1791                    .map(|(schema_spec, database_spec)| {
1792                        (SchemaId::from(schema_spec), *database_spec)
1793                    })
1794                    .collect();
1795                tx.remove_schemas(&schemas)?;
1796
1797                for (schema_spec, database_spec) in delta.schemas {
1798                    let schema = state.get_schema(
1799                        &database_spec,
1800                        &schema_spec,
1801                        session
1802                            .map(|session| session.conn_id())
1803                            .unwrap_or(&SYSTEM_CONN_ID),
1804                    );
1805
1806                    let schema_id = SchemaId::from(schema_spec);
1807                    let database_id = match database_spec {
1808                        ResolvedDatabaseSpecifier::Ambient => None,
1809                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1810                    };
1811
1812                    CatalogState::add_to_audit_log(
1813                        &state.system_configuration,
1814                        oracle_write_ts,
1815                        session,
1816                        tx,
1817                        audit_events,
1818                        EventType::Drop,
1819                        ObjectType::Schema,
1820                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1821                            id: schema_id.to_string(),
1822                            name: schema.name.schema.to_string(),
1823                            database_name: database_id
1824                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1825                        }),
1826                    )?;
1827                }
1828
1829                // Drop any databases.
1830                tx.remove_databases(&delta.databases)?;
1831
1832                for database_id in delta.databases {
1833                    let database = state.get_database(&database_id).clone();
1834
1835                    CatalogState::add_to_audit_log(
1836                        &state.system_configuration,
1837                        oracle_write_ts,
1838                        session,
1839                        tx,
1840                        audit_events,
1841                        EventType::Drop,
1842                        ObjectType::Database,
1843                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1844                            id: database_id.to_string(),
1845                            name: database.name.clone(),
1846                        }),
1847                    )?;
1848                }
1849
1850                // Drop any roles.
1851                tx.remove_user_roles(&delta.roles)?;
1852
1853                for role_id in delta.roles {
1854                    let role = state
1855                        .roles_by_id
1856                        .get(&role_id)
1857                        .expect("catalog out of sync");
1858
1859                    CatalogState::add_to_audit_log(
1860                        &state.system_configuration,
1861                        oracle_write_ts,
1862                        session,
1863                        tx,
1864                        audit_events,
1865                        EventType::Drop,
1866                        ObjectType::Role,
1867                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1868                            id: role.id.to_string(),
1869                            name: role.name.clone(),
1870                        }),
1871                    )?;
1872                    info!("drop role {}", role.name());
1873                }
1874
1875                // Drop any network policies.
1876                tx.remove_network_policies(&delta.network_policies)?;
1877
1878                for network_policy_id in delta.network_policies {
1879                    let policy = state
1880                        .network_policies_by_id
1881                        .get(&network_policy_id)
1882                        .expect("catalog out of sync");
1883
1884                    CatalogState::add_to_audit_log(
1885                        &state.system_configuration,
1886                        oracle_write_ts,
1887                        session,
1888                        tx,
1889                        audit_events,
1890                        EventType::Drop,
1891                        ObjectType::NetworkPolicy,
1892                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1893                            id: policy.id.to_string(),
1894                            name: policy.name.clone(),
1895                        }),
1896                    )?;
1897                    info!("drop network policy {}", policy.name.clone());
1898                }
1899
1900                // Drop any replicas.
1901                let replicas = delta.replicas.keys().copied().collect();
1902                tx.remove_cluster_replicas(&replicas)?;
1903
1904                for (replica_id, (cluster_id, reason)) in delta.replicas {
1905                    let cluster = state.get_cluster(cluster_id);
1906                    let replica = cluster.replica(replica_id).expect("Must exist");
1907
1908                    let (reason, scheduling_policies) = reason.into_audit_log();
1909                    let details =
1910                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1911                            cluster_id: cluster_id.to_string(),
1912                            cluster_name: cluster.name.clone(),
1913                            replica_id: Some(replica_id.to_string()),
1914                            replica_name: replica.name.clone(),
1915                            reason,
1916                            scheduling_policies,
1917                        });
1918                    CatalogState::add_to_audit_log(
1919                        &state.system_configuration,
1920                        oracle_write_ts,
1921                        session,
1922                        tx,
1923                        audit_events,
1924                        EventType::Drop,
1925                        ObjectType::ClusterReplica,
1926                        details,
1927                    )?;
1928                }
1929
1930                // Drop any clusters.
1931                tx.remove_clusters(&delta.clusters)?;
1932
1933                for cluster_id in delta.clusters {
1934                    let cluster = state.get_cluster(cluster_id);
1935
1936                    CatalogState::add_to_audit_log(
1937                        &state.system_configuration,
1938                        oracle_write_ts,
1939                        session,
1940                        tx,
1941                        audit_events,
1942                        EventType::Drop,
1943                        ObjectType::Cluster,
1944                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1945                            id: cluster.id.to_string(),
1946                            name: cluster.name.clone(),
1947                        }),
1948                    )?;
1949                }
1950            }
1951            Op::GrantRole {
1952                role_id,
1953                member_id,
1954                grantor_id,
1955            } => {
1956                state.ensure_not_reserved_role(&member_id)?;
1957                state.ensure_grantable_role(&role_id)?;
1958                if state.collect_role_membership(&role_id).contains(&member_id) {
1959                    let group_role = state.get_role(&role_id);
1960                    let member_role = state.get_role(&member_id);
1961                    return Err(AdapterError::Catalog(Error::new(
1962                        ErrorKind::CircularRoleMembership {
1963                            role_name: group_role.name().to_string(),
1964                            member_name: member_role.name().to_string(),
1965                        },
1966                    )));
1967                }
1968                let mut member_role = state.get_role(&member_id).clone();
1969                member_role.membership.map.insert(role_id, grantor_id);
1970                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1971
1972                CatalogState::add_to_audit_log(
1973                    &state.system_configuration,
1974                    oracle_write_ts,
1975                    session,
1976                    tx,
1977                    audit_events,
1978                    EventType::Grant,
1979                    ObjectType::Role,
1980                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1981                        role_id: role_id.to_string(),
1982                        member_id: member_id.to_string(),
1983                        grantor_id: grantor_id.to_string(),
1984                        executed_by: session
1985                            .map(|session| session.authenticated_role_id())
1986                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1987                            .to_string(),
1988                    }),
1989                )?;
1990            }
1991            Op::RevokeRole {
1992                role_id,
1993                member_id,
1994                grantor_id,
1995            } => {
1996                state.ensure_not_reserved_role(&member_id)?;
1997                state.ensure_grantable_role(&role_id)?;
1998                let mut member_role = state.get_role(&member_id).clone();
1999                member_role.membership.map.remove(&role_id);
2000                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
2001
2002                CatalogState::add_to_audit_log(
2003                    &state.system_configuration,
2004                    oracle_write_ts,
2005                    session,
2006                    tx,
2007                    audit_events,
2008                    EventType::Revoke,
2009                    ObjectType::Role,
2010                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
2011                        role_id: role_id.to_string(),
2012                        member_id: member_id.to_string(),
2013                        grantor_id: grantor_id.to_string(),
2014                        executed_by: session
2015                            .map(|session| session.authenticated_role_id())
2016                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
2017                            .to_string(),
2018                    }),
2019                )?;
2020            }
2021            Op::UpdatePrivilege {
2022                target_id,
2023                privilege,
2024                variant,
2025            } => {
2026                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2027                    UpdatePrivilegeVariant::Grant => {
2028                        privileges.grant(privilege);
2029                    }
2030                    UpdatePrivilegeVariant::Revoke => {
2031                        privileges.revoke(&privilege);
2032                    }
2033                };
2034                match &target_id {
2035                    SystemObjectId::Object(object_id) => match object_id {
2036                        ObjectId::Cluster(id) => {
2037                            let mut cluster = state.get_cluster(*id).clone();
2038                            update_privilege_fn(&mut cluster.privileges);
2039                            tx.update_cluster(*id, cluster.into())?;
2040                        }
2041                        ObjectId::Database(id) => {
2042                            let mut database = state.get_database(id).clone();
2043                            update_privilege_fn(&mut database.privileges);
2044                            tx.update_database(*id, database.into())?;
2045                        }
2046                        ObjectId::NetworkPolicy(id) => {
2047                            let mut policy = state.get_network_policy(id).clone();
2048                            update_privilege_fn(&mut policy.privileges);
2049                            tx.update_network_policy(*id, policy.into())?;
2050                        }
2051                        ObjectId::Schema((database_spec, schema_spec)) => {
2052                            let schema_id = schema_spec.clone().into();
2053                            let mut schema = state
2054                                .get_schema(
2055                                    database_spec,
2056                                    schema_spec,
2057                                    session
2058                                        .map(|session| session.conn_id())
2059                                        .unwrap_or(&SYSTEM_CONN_ID),
2060                                )
2061                                .clone();
2062                            update_privilege_fn(&mut schema.privileges);
2063                            tx.update_schema(schema_id, schema.into())?;
2064                        }
2065                        ObjectId::Item(id) => {
2066                            let entry = state.get_entry(id);
2067                            let mut new_entry = entry.clone();
2068                            update_privilege_fn(&mut new_entry.privileges);
2069                            if !new_entry.item().is_temporary() {
2070                                tx.update_item(*id, new_entry.into())?;
2071                            } else {
2072                                temporary_item_updates
2073                                    .push((entry.clone().into(), StateDiff::Retraction));
2074                                temporary_item_updates
2075                                    .push((new_entry.into(), StateDiff::Addition));
2076                            }
2077                        }
2078                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2079                    },
2080                    SystemObjectId::System => {
2081                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2082                        update_privilege_fn(&mut system_privileges);
2083                        let new_privilege =
2084                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2085                        tx.set_system_privilege(
2086                            privilege.grantee,
2087                            privilege.grantor,
2088                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2089                        )?;
2090                    }
2091                }
2092                let object_type = state.get_system_object_type(&target_id);
2093                let object_id_str = match &target_id {
2094                    SystemObjectId::System => "SYSTEM".to_string(),
2095                    SystemObjectId::Object(id) => id.to_string(),
2096                };
2097                CatalogState::add_to_audit_log(
2098                    &state.system_configuration,
2099                    oracle_write_ts,
2100                    session,
2101                    tx,
2102                    audit_events,
2103                    variant.into(),
2104                    system_object_type_to_audit_object_type(&object_type),
2105                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2106                        object_id: object_id_str,
2107                        grantee_id: privilege.grantee.to_string(),
2108                        grantor_id: privilege.grantor.to_string(),
2109                        privileges: privilege.acl_mode.to_string(),
2110                    }),
2111                )?;
2112            }
2113            Op::UpdateDefaultPrivilege {
2114                privilege_object,
2115                privilege_acl_item,
2116                variant,
2117            } => {
2118                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2119                match variant {
2120                    UpdatePrivilegeVariant::Grant => default_privileges
2121                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2122                    UpdatePrivilegeVariant::Revoke => {
2123                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2124                    }
2125                }
2126                let new_acl_mode = default_privileges
2127                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2128                tx.set_default_privilege(
2129                    privilege_object.role_id,
2130                    privilege_object.database_id,
2131                    privilege_object.schema_id,
2132                    privilege_object.object_type,
2133                    privilege_acl_item.grantee,
2134                    new_acl_mode.cloned(),
2135                )?;
2136                CatalogState::add_to_audit_log(
2137                    &state.system_configuration,
2138                    oracle_write_ts,
2139                    session,
2140                    tx,
2141                    audit_events,
2142                    variant.into(),
2143                    object_type_to_audit_object_type(privilege_object.object_type),
2144                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2145                        role_id: privilege_object.role_id.to_string(),
2146                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2147                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2148                        grantee_id: privilege_acl_item.grantee.to_string(),
2149                        privileges: privilege_acl_item.acl_mode.to_string(),
2150                    }),
2151                )?;
2152            }
2153            Op::RenameCluster {
2154                id,
2155                name,
2156                to_name,
2157                check_reserved_names,
2158            } => {
2159                if id.is_system() {
2160                    return Err(AdapterError::Catalog(Error::new(
2161                        ErrorKind::ReadOnlyCluster(name.clone()),
2162                    )));
2163                }
2164                if check_reserved_names && is_reserved_name(&to_name) {
2165                    return Err(AdapterError::Catalog(Error::new(
2166                        ErrorKind::ReservedClusterName(to_name),
2167                    )));
2168                }
2169                tx.rename_cluster(id, &name, &to_name)?;
2170                CatalogState::add_to_audit_log(
2171                    &state.system_configuration,
2172                    oracle_write_ts,
2173                    session,
2174                    tx,
2175                    audit_events,
2176                    EventType::Alter,
2177                    ObjectType::Cluster,
2178                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2179                        id: id.to_string(),
2180                        old_name: name.clone(),
2181                        new_name: to_name.clone(),
2182                    }),
2183                )?;
2184                info!("rename cluster {name} to {to_name}");
2185            }
2186            Op::RenameClusterReplica {
2187                cluster_id,
2188                replica_id,
2189                name,
2190                to_name,
2191            } => {
2192                if is_reserved_name(&to_name) {
2193                    return Err(AdapterError::Catalog(Error::new(
2194                        ErrorKind::ReservedReplicaName(to_name),
2195                    )));
2196                }
2197                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2198                CatalogState::add_to_audit_log(
2199                    &state.system_configuration,
2200                    oracle_write_ts,
2201                    session,
2202                    tx,
2203                    audit_events,
2204                    EventType::Alter,
2205                    ObjectType::ClusterReplica,
2206                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2207                        cluster_id: cluster_id.to_string(),
2208                        replica_id: replica_id.to_string(),
2209                        old_name: name.replica.as_str().to_string(),
2210                        new_name: to_name.clone(),
2211                    }),
2212                )?;
2213                info!("rename cluster replica {name} to {to_name}");
2214            }
2215            Op::RenameItem {
2216                id,
2217                to_name,
2218                current_full_name,
2219            } => {
2220                let mut updates = Vec::new();
2221
2222                let entry = state.get_entry(&id);
2223                if let CatalogItem::Type(_) = entry.item() {
2224                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2225                        current_full_name.to_string(),
2226                    ))));
2227                }
2228
2229                if entry.id().is_system() {
2230                    let name = state
2231                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2232                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2233                        name.to_string(),
2234                    ))));
2235                }
2236
2237                let mut to_full_name = current_full_name.clone();
2238                to_full_name.item.clone_from(&to_name);
2239
2240                let mut to_qualified_name = entry.name().clone();
2241                to_qualified_name.item.clone_from(&to_name);
2242
2243                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2244                    id: id.to_string(),
2245                    old_name: Self::full_name_detail(&current_full_name),
2246                    new_name: Self::full_name_detail(&to_full_name),
2247                });
2248                if Self::should_audit_log_item(entry.item()) {
2249                    CatalogState::add_to_audit_log(
2250                        &state.system_configuration,
2251                        oracle_write_ts,
2252                        session,
2253                        tx,
2254                        audit_events,
2255                        EventType::Alter,
2256                        catalog_type_to_audit_object_type(entry.item().typ()),
2257                        details,
2258                    )?;
2259                }
2260
2261                // Rename item itself.
2262                let mut new_entry = entry.clone();
2263                new_entry.name.item.clone_from(&to_name);
2264                new_entry.item = entry
2265                    .item()
2266                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2267                    .map_err(|e| {
2268                        Error::new(ErrorKind::from(AmbiguousRename {
2269                            depender: state
2270                                .resolve_full_name(entry.name(), entry.conn_id())
2271                                .to_string(),
2272                            dependee: state
2273                                .resolve_full_name(entry.name(), entry.conn_id())
2274                                .to_string(),
2275                            message: e,
2276                        }))
2277                    })?;
2278
2279                for id in entry.referenced_by() {
2280                    let dependent_item = state.get_entry(id);
2281                    let mut to_entry = dependent_item.clone();
2282                    to_entry.item = dependent_item
2283                        .item()
2284                        .rename_item_refs(
2285                            current_full_name.clone(),
2286                            to_full_name.item.clone(),
2287                            false,
2288                        )
2289                        .map_err(|e| {
2290                            Error::new(ErrorKind::from(AmbiguousRename {
2291                                depender: state
2292                                    .resolve_full_name(
2293                                        dependent_item.name(),
2294                                        dependent_item.conn_id(),
2295                                    )
2296                                    .to_string(),
2297                                dependee: state
2298                                    .resolve_full_name(entry.name(), entry.conn_id())
2299                                    .to_string(),
2300                                message: e,
2301                            }))
2302                        })?;
2303
2304                    if !to_entry.item().is_temporary() {
2305                        tx.update_item(*id, to_entry.into())?;
2306                    } else {
2307                        temporary_item_updates
2308                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2309                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2310                    }
2311                    updates.push(*id);
2312                }
2313                if !new_entry.item().is_temporary() {
2314                    tx.update_item(id, new_entry.into())?;
2315                } else {
2316                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2317                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2318                }
2319
2320                updates.push(id);
2321                for id in updates {
2322                    Self::log_update(state, &id);
2323                }
2324            }
2325            Op::RenameSchema {
2326                database_spec,
2327                schema_spec,
2328                new_name,
2329                check_reserved_names,
2330            } => {
2331                if check_reserved_names && is_reserved_name(&new_name) {
2332                    return Err(AdapterError::Catalog(Error::new(
2333                        ErrorKind::ReservedSchemaName(new_name),
2334                    )));
2335                }
2336
2337                let conn_id = session
2338                    .map(|session| session.conn_id())
2339                    .unwrap_or(&SYSTEM_CONN_ID);
2340
2341                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2342                let cur_name = schema.name().schema.clone();
2343
2344                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2345                    return Err(AdapterError::Catalog(Error::new(
2346                        ErrorKind::AmbientSchemaRename(cur_name),
2347                    )));
2348                };
2349                let database = state.get_database(&database_id);
2350                let database_name = &database.name;
2351
2352                let mut updates = Vec::new();
2353                let mut items_to_update = BTreeMap::new();
2354
2355                let mut update_item = |id| {
2356                    if items_to_update.contains_key(id) {
2357                        return Ok(());
2358                    }
2359
2360                    let entry = state.get_entry(id);
2361
2362                    // Update our item.
2363                    let mut new_entry = entry.clone();
2364                    new_entry.item = entry
2365                        .item
2366                        .rename_schema_refs(database_name, &cur_name, &new_name)
2367                        .map_err(|(s, _i)| {
2368                            Error::new(ErrorKind::from(AmbiguousRename {
2369                                depender: state
2370                                    .resolve_full_name(entry.name(), entry.conn_id())
2371                                    .to_string(),
2372                                dependee: format!("{database_name}.{cur_name}"),
2373                                message: format!("ambiguous reference to schema named {s}"),
2374                            }))
2375                        })?;
2376
2377                    // Queue updates for Catalog storage and Builtin Tables.
2378                    if !new_entry.item().is_temporary() {
2379                        items_to_update.insert(*id, new_entry.into());
2380                    } else {
2381                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2382                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2383                    }
2384                    updates.push(id);
2385
2386                    Ok::<_, AdapterError>(())
2387                };
2388
2389                // Update all of the items in the schema.
2390                for (_name, item_id) in &schema.items {
2391                    // Update the item itself.
2392                    update_item(item_id)?;
2393
2394                    // Update everything that depends on this item.
2395                    for id in state.get_entry(item_id).referenced_by() {
2396                        update_item(id)?;
2397                    }
2398                }
2399                // Note: When updating the transaction it's very important that we update the
2400                // items as a whole group, otherwise we exhibit quadratic behavior.
2401                tx.update_items(items_to_update)?;
2402
2403                // Renaming temporary schemas is not supported.
2404                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2405                    let schema_name = schema.name().schema.clone();
2406                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2407                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2408                    )));
2409                };
2410
2411                // Add an entry to the audit log.
2412                let database_name = database_spec
2413                    .id()
2414                    .map(|id| state.get_database(&id).name.clone());
2415                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2416                    id: schema_id.to_string(),
2417                    old_name: schema.name().schema.clone(),
2418                    new_name: new_name.clone(),
2419                    database_name,
2420                });
2421                CatalogState::add_to_audit_log(
2422                    &state.system_configuration,
2423                    oracle_write_ts,
2424                    session,
2425                    tx,
2426                    audit_events,
2427                    EventType::Alter,
2428                    mz_audit_log::ObjectType::Schema,
2429                    details,
2430                )?;
2431
2432                // Update the schema itself.
2433                let mut new_schema = schema.clone();
2434                new_schema.name.schema.clone_from(&new_name);
2435                tx.update_schema(schema_id, new_schema.into())?;
2436
2437                for id in updates {
2438                    Self::log_update(state, id);
2439                }
2440            }
2441            Op::UpdateOwner { id, new_owner } => {
2442                let conn_id = session
2443                    .map(|session| session.conn_id())
2444                    .unwrap_or(&SYSTEM_CONN_ID);
2445                let old_owner = state
2446                    .get_owner_id(&id, conn_id)
2447                    .expect("cannot update the owner of an object without an owner");
2448                match &id {
2449                    ObjectId::Cluster(id) => {
2450                        let mut cluster = state.get_cluster(*id).clone();
2451                        if id.is_system() {
2452                            return Err(AdapterError::Catalog(Error::new(
2453                                ErrorKind::ReadOnlyCluster(cluster.name),
2454                            )));
2455                        }
2456                        Self::update_privilege_owners(
2457                            &mut cluster.privileges,
2458                            cluster.owner_id,
2459                            new_owner,
2460                        );
2461                        cluster.owner_id = new_owner;
2462                        tx.update_cluster(*id, cluster.into())?;
2463                    }
2464                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2465                        let cluster = state.get_cluster(*cluster_id);
2466                        let mut replica = cluster
2467                            .replica(*replica_id)
2468                            .expect("catalog out of sync")
2469                            .clone();
2470                        if replica_id.is_system() {
2471                            return Err(AdapterError::Catalog(Error::new(
2472                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2473                            )));
2474                        }
2475                        replica.owner_id = new_owner;
2476                        tx.update_cluster_replica(*replica_id, replica.into())?;
2477                    }
2478                    ObjectId::Database(id) => {
2479                        let mut database = state.get_database(id).clone();
2480                        if id.is_system() {
2481                            return Err(AdapterError::Catalog(Error::new(
2482                                ErrorKind::ReadOnlyDatabase(database.name),
2483                            )));
2484                        }
2485                        Self::update_privilege_owners(
2486                            &mut database.privileges,
2487                            database.owner_id,
2488                            new_owner,
2489                        );
2490                        database.owner_id = new_owner;
2491                        tx.update_database(*id, database.clone().into())?;
2492                    }
2493                    ObjectId::Schema((database_spec, schema_spec)) => {
2494                        let schema_id: SchemaId = schema_spec.clone().into();
2495                        let mut schema = state
2496                            .get_schema(database_spec, schema_spec, conn_id)
2497                            .clone();
2498                        if schema_id.is_system() {
2499                            let name = schema.name();
2500                            let full_name = state.resolve_full_schema_name(name);
2501                            return Err(AdapterError::Catalog(Error::new(
2502                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2503                            )));
2504                        }
2505                        Self::update_privilege_owners(
2506                            &mut schema.privileges,
2507                            schema.owner_id,
2508                            new_owner,
2509                        );
2510                        schema.owner_id = new_owner;
2511                        tx.update_schema(schema_id, schema.into())?;
2512                    }
2513                    ObjectId::Item(id) => {
2514                        let entry = state.get_entry(id);
2515                        let mut new_entry = entry.clone();
2516                        if id.is_system() {
2517                            let full_name = state.resolve_full_name(
2518                                new_entry.name(),
2519                                session.map(|session| session.conn_id()),
2520                            );
2521                            return Err(AdapterError::Catalog(Error::new(
2522                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2523                            )));
2524                        }
2525                        Self::update_privilege_owners(
2526                            &mut new_entry.privileges,
2527                            new_entry.owner_id,
2528                            new_owner,
2529                        );
2530                        new_entry.owner_id = new_owner;
2531                        if !new_entry.item().is_temporary() {
2532                            tx.update_item(*id, new_entry.into())?;
2533                        } else {
2534                            temporary_item_updates
2535                                .push((entry.clone().into(), StateDiff::Retraction));
2536                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2537                        }
2538                    }
2539                    ObjectId::NetworkPolicy(id) => {
2540                        let mut policy = state.get_network_policy(id).clone();
2541                        if id.is_system() {
2542                            return Err(AdapterError::Catalog(Error::new(
2543                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2544                            )));
2545                        }
2546                        Self::update_privilege_owners(
2547                            &mut policy.privileges,
2548                            policy.owner_id,
2549                            new_owner,
2550                        );
2551                        policy.owner_id = new_owner;
2552                        tx.update_network_policy(*id, policy.into())?;
2553                    }
2554                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2555                }
2556                let object_type = state.get_object_type(&id);
2557                CatalogState::add_to_audit_log(
2558                    &state.system_configuration,
2559                    oracle_write_ts,
2560                    session,
2561                    tx,
2562                    audit_events,
2563                    EventType::Alter,
2564                    object_type_to_audit_object_type(object_type),
2565                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2566                        object_id: id.to_string(),
2567                        old_owner_id: old_owner.to_string(),
2568                        new_owner_id: new_owner.to_string(),
2569                    }),
2570                )?;
2571            }
2572            Op::UpdateClusterConfig { id, name, config } => {
2573                let mut cluster = state.get_cluster(id).clone();
2574                cluster.config = config;
2575                tx.update_cluster(id, cluster.into())?;
2576                info!("update cluster {}", name);
2577
2578                CatalogState::add_to_audit_log(
2579                    &state.system_configuration,
2580                    oracle_write_ts,
2581                    session,
2582                    tx,
2583                    audit_events,
2584                    EventType::Alter,
2585                    ObjectType::Cluster,
2586                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2587                        id: id.to_string(),
2588                        name,
2589                    }),
2590                )?;
2591            }
2592            Op::UpdateClusterReplicaConfig {
2593                replica_id,
2594                cluster_id,
2595                config,
2596            } => {
2597                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2598                info!("update replica {}", replica.name);
2599                tx.update_cluster_replica(
2600                    replica_id,
2601                    mz_catalog::durable::ClusterReplica {
2602                        cluster_id,
2603                        replica_id,
2604                        name: replica.name.clone(),
2605                        config: config.clone().into(),
2606                        owner_id: replica.owner_id,
2607                    },
2608                )?;
2609            }
2610            Op::UpdateItem { id, name, to_item } => {
2611                let mut entry = state.get_entry(&id).clone();
2612                entry.name = name.clone();
2613                entry.item = to_item.clone();
2614                tx.update_item(id, entry.into())?;
2615
2616                if Self::should_audit_log_item(&to_item) {
2617                    let mut full_name = Self::full_name_detail(
2618                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2619                    );
2620                    full_name.item = name.item;
2621
2622                    CatalogState::add_to_audit_log(
2623                        &state.system_configuration,
2624                        oracle_write_ts,
2625                        session,
2626                        tx,
2627                        audit_events,
2628                        EventType::Alter,
2629                        catalog_type_to_audit_object_type(to_item.typ()),
2630                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2631                            id: id.to_string(),
2632                            name: full_name,
2633                        }),
2634                    )?;
2635                }
2636
2637                Self::log_update(state, &id);
2638            }
2639            Op::UpdateSystemConfiguration { name, value } => {
2640                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2641                tx.upsert_system_config(&name, parsed_value.clone())?;
2642                // This mirrors some "system vars" into the catalog storage
2643                // "config" collection so that we can toggle the flag with
2644                // Launch Darkly, but use it in boot before Launch Darkly is
2645                // available.
2646                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2647                    let with_0dt_deployment_max_wait =
2648                        Duration::parse(VarInput::Flat(&parsed_value))
2649                            .expect("parsing succeeded above");
2650                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2651                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2652                    let with_0dt_deployment_ddl_check_interval =
2653                        Duration::parse(VarInput::Flat(&parsed_value))
2654                            .expect("parsing succeeded above");
2655                    tx.set_0dt_deployment_ddl_check_interval(
2656                        with_0dt_deployment_ddl_check_interval,
2657                    )?;
2658                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2659                    let panic_after_timeout =
2660                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2661                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2662                }
2663
2664                CatalogState::add_to_audit_log(
2665                    &state.system_configuration,
2666                    oracle_write_ts,
2667                    session,
2668                    tx,
2669                    audit_events,
2670                    EventType::Alter,
2671                    ObjectType::System,
2672                    EventDetails::SetV1(mz_audit_log::SetV1 {
2673                        name,
2674                        value: Some(value.borrow().to_vec().join(", ")),
2675                    }),
2676                )?;
2677            }
2678            Op::ResetSystemConfiguration { name } => {
2679                tx.remove_system_config(&name);
2680                // This mirrors some "system vars" into the catalog storage
2681                // "config" collection so that we can toggle the flag with
2682                // Launch Darkly, but use it in boot before Launch Darkly is
2683                // available.
2684                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2685                    tx.reset_0dt_deployment_max_wait()?;
2686                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2687                    tx.reset_0dt_deployment_ddl_check_interval()?;
2688                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2689                    tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2690                }
2691
2692                CatalogState::add_to_audit_log(
2693                    &state.system_configuration,
2694                    oracle_write_ts,
2695                    session,
2696                    tx,
2697                    audit_events,
2698                    EventType::Alter,
2699                    ObjectType::System,
2700                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2701                )?;
2702            }
2703            Op::ResetAllSystemConfiguration => {
2704                tx.clear_system_configs();
2705                tx.reset_0dt_deployment_max_wait()?;
2706                tx.reset_0dt_deployment_ddl_check_interval()?;
2707                tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2708
2709                CatalogState::add_to_audit_log(
2710                    &state.system_configuration,
2711                    oracle_write_ts,
2712                    session,
2713                    tx,
2714                    audit_events,
2715                    EventType::Alter,
2716                    ObjectType::System,
2717                    EventDetails::ResetAllV1,
2718                )?;
2719            }
2720            Op::UpdateScopedSystemParameters {
2721                scoped,
2722                prune_scope,
2723            } => {
2724                // Diff `scoped` against the durable cache and persist only the
2725                // delta: upsert changed/added entries, then remove entries the
2726                // update no longer serves. A removal for a still-live object is
2727                // bounded by `prune_scope` so this update does not delete a row
2728                // for an object it was not evaluated for (e.g. one created after
2729                // the update's snapshot, whose override rode its own create
2730                // transaction). `None` prunes unconditionally (the
2731                // disabled-feature clear path). Rows whose owning object is no
2732                // longer live are always pruned regardless of `prune_scope`,
2733                // because nothing else garbage collects them and ids are never
2734                // reused, so this lazily reclaims orphans left by dropped
2735                // objects.
2736                let live_clusters: BTreeSet<ClusterId> =
2737                    tx.get_clusters().map(|cluster| cluster.id).collect();
2738                let live_replicas: BTreeSet<ReplicaId> = tx
2739                    .get_cluster_replicas()
2740                    .map(|replica| replica.replica_id)
2741                    .collect();
2742                let prune_cluster = |id: &ClusterId| {
2743                    prune_scope
2744                        .as_ref()
2745                        .map_or(true, |s| s.clusters.contains(id))
2746                };
2747                let prune_replica = |id: &ReplicaId| {
2748                    prune_scope
2749                        .as_ref()
2750                        .map_or(true, |s| s.replicas.contains(id))
2751                };
2752
2753                // Cluster-coherent scope.
2754                let existing_cluster: BTreeMap<(ClusterId, String), String> = tx
2755                    .get_cluster_system_configurations()
2756                    .map(|c| ((c.cluster_id, c.name), c.value))
2757                    .collect();
2758                let mut desired_cluster: BTreeSet<(ClusterId, String)> = BTreeSet::new();
2759                for (cluster_id, values) in &scoped.cluster {
2760                    if !live_clusters.contains(cluster_id) {
2761                        continue;
2762                    }
2763                    for (name, value) in values {
2764                        desired_cluster.insert((*cluster_id, name.clone()));
2765                        if existing_cluster.get(&(*cluster_id, name.clone())) != Some(value) {
2766                            tx.upsert_cluster_system_config(*cluster_id, name, value.clone())?;
2767                        }
2768                    }
2769                }
2770                for (cluster_id, name) in existing_cluster.into_keys() {
2771                    if (!live_clusters.contains(&cluster_id) || prune_cluster(&cluster_id))
2772                        && !desired_cluster.contains(&(cluster_id, name.clone()))
2773                    {
2774                        tx.remove_cluster_system_config(cluster_id, &name);
2775                    }
2776                }
2777
2778                // Replica-local scope.
2779                let existing_replica: BTreeMap<(ReplicaId, String), String> = tx
2780                    .get_replica_system_configurations()
2781                    .map(|r| ((r.replica_id, r.name), r.value))
2782                    .collect();
2783                let mut desired_replica: BTreeSet<(ReplicaId, String)> = BTreeSet::new();
2784                for (replica_id, values) in &scoped.replica {
2785                    if !live_replicas.contains(replica_id) {
2786                        continue;
2787                    }
2788                    for (name, value) in values {
2789                        desired_replica.insert((*replica_id, name.clone()));
2790                        if existing_replica.get(&(*replica_id, name.clone())) != Some(value) {
2791                            tx.upsert_replica_system_config(*replica_id, name, value.clone())?;
2792                        }
2793                    }
2794                }
2795                for (replica_id, name) in existing_replica.into_keys() {
2796                    if (!live_replicas.contains(&replica_id) || prune_replica(&replica_id))
2797                        && !desired_replica.contains(&(replica_id, name.clone()))
2798                    {
2799                        tx.remove_replica_system_config(replica_id, &name);
2800                    }
2801                }
2802            }
2803            Op::InjectAuditEvents { events } => {
2804                for event in events {
2805                    let id = tx.allocate_audit_log_id()?;
2806                    let ev = VersionedEvent::new(
2807                        id,
2808                        event.event_type,
2809                        event.object_type,
2810                        event.details,
2811                        event.user,
2812                        oracle_write_ts.into(),
2813                    );
2814                    audit_events.push(ev.clone());
2815                    tx.insert_audit_log_event(ev);
2816                }
2817            }
2818        };
2819        Ok(temporary_item_updates)
2820    }
2821
2822    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2823        let entry = state.get_entry(id);
2824        info!(
2825            "update {} {} ({})",
2826            entry.item_type(),
2827            state.resolve_full_name(entry.name(), entry.conn_id()),
2828            id
2829        );
2830    }
2831
2832    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2833    /// implementation:
2834    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2835    fn update_privilege_owners(
2836        privileges: &mut PrivilegeMap,
2837        old_owner: RoleId,
2838        new_owner: RoleId,
2839    ) {
2840        // TODO(jkosh44) Would be nice not to clone every privilege.
2841        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2842
2843        let mut new_present = false;
2844        for privilege in flat_privileges.iter_mut() {
2845            // Old owner's granted privilege are updated to be granted by the new
2846            // owner.
2847            if privilege.grantor == old_owner {
2848                privilege.grantor = new_owner;
2849            } else if privilege.grantor == new_owner {
2850                new_present = true;
2851            }
2852            // Old owner's privileges is given to the new owner.
2853            if privilege.grantee == old_owner {
2854                privilege.grantee = new_owner;
2855            } else if privilege.grantee == new_owner {
2856                new_present = true;
2857            }
2858        }
2859
2860        // If the old privilege list contained references to the new owner, we may
2861        // have created duplicate entries. Here we try and consolidate them. This
2862        // is inspired by PostgreSQL's algorithm but not identical.
2863        if new_present {
2864            // Group privileges by (grantee, grantor).
2865            let privilege_map: BTreeMap<_, Vec<_>> =
2866                flat_privileges
2867                    .into_iter()
2868                    .fold(BTreeMap::new(), |mut accum, privilege| {
2869                        accum
2870                            .entry((privilege.grantee, privilege.grantor))
2871                            .or_default()
2872                            .push(privilege);
2873                        accum
2874                    });
2875
2876            // Consolidate and update all privileges.
2877            flat_privileges = privilege_map
2878                .into_iter()
2879                .map(|((grantee, grantor), values)|
2880                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2881                    values.into_iter().fold(
2882                        MzAclItem::empty(grantee, grantor),
2883                        |mut accum, mz_aclitem| {
2884                            accum.acl_mode =
2885                                accum.acl_mode.union(mz_aclitem.acl_mode);
2886                            accum
2887                        },
2888                    ))
2889                .collect();
2890        }
2891
2892        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2893    }
2894}
2895
2896/// Prepare the given transaction for replacing a catalog item with a new version.
2897///
2898/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2899/// dependent objects to refer to that new ID (at a previous version).
2900///
2901/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2902/// for catalog items. We currently think that there are no use cases that require this assumption,
2903/// but no way to know for sure.
2904fn tx_replace_item(
2905    tx: &mut Transaction<'_>,
2906    state: &CatalogState,
2907    id: CatalogItemId,
2908    new_entry: CatalogEntry,
2909) -> Result<(), AdapterError> {
2910    let new_id = new_entry.id;
2911
2912    // Rewrite dependent objects to point to the new ID.
2913    for use_id in new_entry.referenced_by() {
2914        // The dependent might be dropped in the same tx, so check.
2915        if tx.get_item(use_id).is_none() {
2916            continue;
2917        }
2918
2919        let mut dependent = state.get_entry(use_id).clone();
2920        dependent.item = dependent.item.replace_item_refs(id, new_id);
2921        tx.update_item(*use_id, dependent.into())?;
2922    }
2923
2924    // Move comments to the new ID.
2925    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2926    let new_comment_id = new_entry.comment_object_id();
2927    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2928        tx.drop_comments(&[old_comment_id].into())?;
2929        for (sub, comment) in comments {
2930            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2931        }
2932    }
2933
2934    let mz_catalog::durable::Item {
2935        id: _,
2936        oid,
2937        global_id,
2938        schema_id,
2939        name,
2940        create_sql,
2941        owner_id,
2942        privileges,
2943        extra_versions,
2944    } = new_entry.into();
2945
2946    tx.remove_item(id)?;
2947    tx.insert_item(
2948        new_id,
2949        oid,
2950        global_id,
2951        schema_id,
2952        &name,
2953        create_sql,
2954        owner_id,
2955        privileges,
2956        extra_versions,
2957    )?;
2958
2959    Ok(())
2960}
2961
2962/// Generate audit events for a replacement apply operation.
2963fn apply_replacement_audit_events(
2964    state: &CatalogState,
2965    target: &CatalogEntry,
2966    replacement: &CatalogEntry,
2967) -> Vec<(EventType, EventDetails)> {
2968    let mut events = Vec::new();
2969
2970    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2971    let target_id_name = IdFullNameV1 {
2972        id: target.id().to_string(),
2973        name: Catalog::full_name_detail(&target_name),
2974    };
2975    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2976    let replacement_id_name = IdFullNameV1 {
2977        id: replacement.id().to_string(),
2978        name: Catalog::full_name_detail(&replacement_name),
2979    };
2980
2981    if Catalog::should_audit_log_item(&replacement.item) {
2982        events.push((
2983            EventType::Drop,
2984            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2985        ));
2986    }
2987
2988    if Catalog::should_audit_log_item(&target.item) {
2989        events.push((
2990            EventType::Alter,
2991            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2992                target: target_id_name.clone(),
2993                replacement: replacement_id_name,
2994            }),
2995        ));
2996
2997        if let Some(old_cluster_id) = target.cluster_id()
2998            && let Some(new_cluster_id) = replacement.cluster_id()
2999            && old_cluster_id != new_cluster_id
3000        {
3001            // When the replacement is applied, the target takes on the ID of the replacement, so
3002            // we should use that ID for subsequent events.
3003            events.push((
3004                EventType::Alter,
3005                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
3006                    id: replacement.id().to_string(),
3007                    name: target_id_name.name,
3008                    old_cluster_id: old_cluster_id.to_string(),
3009                    new_cluster_id: new_cluster_id.to_string(),
3010                }),
3011            ));
3012        }
3013    }
3014
3015    events
3016}
3017
3018/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
3019///
3020/// Note: Previously we used to omit a single `Op::DropObject` for every object
3021/// we needed to drop. But removing a batch of objects from a durable Catalog
3022/// Transaction is O(n) where `n` is the number of objects that exist in the
3023/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
3024/// `DROP ... CASCADE` statement.
3025#[derive(Debug, Default)]
3026pub(crate) struct ObjectsToDrop {
3027    pub comments: BTreeSet<CommentObjectId>,
3028    pub databases: BTreeSet<DatabaseId>,
3029    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
3030    pub clusters: BTreeSet<ClusterId>,
3031    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
3032    pub roles: BTreeSet<RoleId>,
3033    pub items: Vec<CatalogItemId>,
3034    pub network_policies: BTreeSet<NetworkPolicyId>,
3035}
3036
3037impl ObjectsToDrop {
3038    pub fn generate(
3039        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
3040        state: &CatalogState,
3041        session: Option<&ConnMeta>,
3042    ) -> Result<Self, AdapterError> {
3043        let mut delta = ObjectsToDrop::default();
3044
3045        for drop_object_info in drop_object_infos {
3046            delta.add_item(drop_object_info, state, session)?;
3047        }
3048
3049        Ok(delta)
3050    }
3051
3052    fn add_item(
3053        &mut self,
3054        drop_object_info: DropObjectInfo,
3055        state: &CatalogState,
3056        session: Option<&ConnMeta>,
3057    ) -> Result<(), AdapterError> {
3058        self.comments
3059            .insert(state.get_comment_id(drop_object_info.to_object_id()));
3060
3061        match drop_object_info {
3062            DropObjectInfo::Database(database_id) => {
3063                let database = &state.database_by_id[&database_id];
3064                if database_id.is_system() {
3065                    return Err(AdapterError::Catalog(Error::new(
3066                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
3067                    )));
3068                }
3069
3070                self.databases.insert(database_id);
3071            }
3072            DropObjectInfo::Schema((database_spec, schema_spec)) => {
3073                let schema = state.get_schema(
3074                    &database_spec,
3075                    &schema_spec,
3076                    session
3077                        .map(|session| session.conn_id())
3078                        .unwrap_or(&SYSTEM_CONN_ID),
3079                );
3080                let schema_id: SchemaId = schema_spec.into();
3081                if schema_id.is_system() {
3082                    let name = schema.name();
3083                    let full_name = state.resolve_full_schema_name(name);
3084                    return Err(AdapterError::Catalog(Error::new(
3085                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
3086                    )));
3087                }
3088
3089                self.schemas.insert(schema_spec, database_spec);
3090            }
3091            DropObjectInfo::Role(role_id) => {
3092                let name = state.get_role(&role_id).name().to_string();
3093                if role_id.is_system() || role_id.is_predefined() {
3094                    return Err(AdapterError::Catalog(Error::new(
3095                        ErrorKind::ReservedRoleName(name.clone()),
3096                    )));
3097                }
3098                state.ensure_not_reserved_role(&role_id)?;
3099
3100                self.roles.insert(role_id);
3101            }
3102            DropObjectInfo::Cluster(cluster_id) => {
3103                let cluster = state.get_cluster(cluster_id);
3104                let name = &cluster.name;
3105                if cluster_id.is_system() {
3106                    return Err(AdapterError::Catalog(Error::new(
3107                        ErrorKind::ReadOnlyCluster(name.clone()),
3108                    )));
3109                }
3110
3111                self.clusters.insert(cluster_id);
3112            }
3113            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3114                let cluster = state.get_cluster(cluster_id);
3115                let replica = cluster.replica(replica_id).expect("Must exist");
3116
3117                self.replicas
3118                    .insert(replica.replica_id, (cluster.id, reason));
3119
3120                // Implicitly drop materialized views that target this replica.
3121                // When the target replica is gone, no replica advances the
3122                // persist shard's upper frontier, causing reads to hang. Cascade
3123                // to anything depending on the implicitly-dropped MV so we don't
3124                // leave dangling references in the catalog.
3125                //
3126                // Plan-driven drops already include these dependents as their
3127                // own `DropObjectInfo::Item` entries (the plan stage expands
3128                // them via `cluster_replica_dependents`), so each one's comment
3129                // is recorded by the top-of-function `self.comments` insert.
3130                // This branch handles internal callers that build
3131                // `Op::DropObjects` directly with only the replica, so we expand
3132                // the dependents and record their comments ourselves.
3133                //
3134                // `seen` is seeded from the items already collected so that the
3135                // plan-driven path (where the dependents are processed before
3136                // the replica, in reverse-dependency order) does not re-add them
3137                // here.
3138                let mut seen: BTreeSet<ObjectId> =
3139                    self.items.iter().copied().map(ObjectId::Item).collect();
3140                for dep in state.cluster_replica_dependents(cluster_id, replica_id, &mut seen) {
3141                    if let ObjectId::Item(dep_id) = dep {
3142                        info!(
3143                            "implicitly dropping {} because target replica was dropped",
3144                            state.get_entry(&dep_id).name().item,
3145                        );
3146                        self.comments
3147                            .insert(state.get_comment_id(ObjectId::Item(dep_id)));
3148                        self.items.push(dep_id);
3149                    }
3150                }
3151            }
3152            DropObjectInfo::Item(item_id) => {
3153                let entry = state.get_entry(&item_id);
3154                if item_id.is_system() {
3155                    let name = entry.name();
3156                    let full_name =
3157                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
3158                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3159                        full_name.to_string(),
3160                    ))));
3161                }
3162
3163                self.items.push(item_id);
3164            }
3165            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3166                let policy = state.get_network_policy(&network_policy_id);
3167                let name = &policy.name;
3168                if network_policy_id.is_system() {
3169                    return Err(AdapterError::Catalog(Error::new(
3170                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3171                    )));
3172                }
3173
3174                self.network_policies.insert(network_policy_id);
3175            }
3176        }
3177
3178        Ok(())
3179    }
3180}
3181
3182#[cfg(test)]
3183mod tests {
3184    use mz_catalog::SYSTEM_CONN_ID;
3185    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3186    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3187    use mz_repr::role_id::RoleId;
3188    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3189    use mz_sql::DEFAULT_SCHEMA;
3190    use mz_sql::catalog::CatalogDatabase;
3191    use mz_sql::names::{
3192        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3193    };
3194    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3195
3196    use crate::catalog::{Catalog, Op};
3197    use crate::session::DEFAULT_DATABASE_NAME;
3198
3199    #[mz_ore::test]
3200    fn test_update_privilege_owners() {
3201        let old_owner = RoleId::User(1);
3202        let new_owner = RoleId::User(2);
3203        let other_role = RoleId::User(3);
3204
3205        // older owner exists as grantor.
3206        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3207            MzAclItem {
3208                grantee: other_role,
3209                grantor: old_owner,
3210                acl_mode: AclMode::UPDATE,
3211            },
3212            MzAclItem {
3213                grantee: other_role,
3214                grantor: new_owner,
3215                acl_mode: AclMode::SELECT,
3216            },
3217        ]);
3218        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3219        assert_eq!(1, privileges.all_values().count());
3220        assert_eq!(
3221            vec![MzAclItem {
3222                grantee: other_role,
3223                grantor: new_owner,
3224                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3225            }],
3226            privileges.all_values_owned().collect::<Vec<_>>()
3227        );
3228
3229        // older owner exists as grantee.
3230        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3231            MzAclItem {
3232                grantee: old_owner,
3233                grantor: other_role,
3234                acl_mode: AclMode::UPDATE,
3235            },
3236            MzAclItem {
3237                grantee: new_owner,
3238                grantor: other_role,
3239                acl_mode: AclMode::SELECT,
3240            },
3241        ]);
3242        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3243        assert_eq!(1, privileges.all_values().count());
3244        assert_eq!(
3245            vec![MzAclItem {
3246                grantee: new_owner,
3247                grantor: other_role,
3248                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3249            }],
3250            privileges.all_values_owned().collect::<Vec<_>>()
3251        );
3252
3253        // older owner exists as grantee and grantor.
3254        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3255            MzAclItem {
3256                grantee: old_owner,
3257                grantor: old_owner,
3258                acl_mode: AclMode::UPDATE,
3259            },
3260            MzAclItem {
3261                grantee: new_owner,
3262                grantor: new_owner,
3263                acl_mode: AclMode::SELECT,
3264            },
3265        ]);
3266        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3267        assert_eq!(1, privileges.all_values().count());
3268        assert_eq!(
3269            vec![MzAclItem {
3270                grantee: new_owner,
3271                grantor: new_owner,
3272                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3273            }],
3274            privileges.all_values_owned().collect::<Vec<_>>()
3275        );
3276    }
3277
3278    /// Verifies that `transact_incremental_dry_run` processes only new ops
3279    /// against the accumulated state, not all ops from scratch. Two paths are
3280    /// compared:
3281    ///   - Incremental: two separate calls, each with one op
3282    ///   - All-at-once: one call with both ops
3283    /// Both must produce equivalent catalog state.
3284    #[mz_ore::test(tokio::test)]
3285    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3286    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3287        Catalog::with_debug(|catalog| async move {
3288            // Resolve default database and schema.
3289            let database = catalog
3290                .resolve_database(DEFAULT_DATABASE_NAME)
3291                .expect("default database");
3292            let database_name = database.name.clone();
3293            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3294            let schema = catalog
3295                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3296                .expect("default schema");
3297            let schema_name = schema.name.schema.clone();
3298            let schema_spec = schema.id.clone();
3299
3300            // Allocate IDs for two tables.
3301            let (id_t1, global_id_t1) = catalog
3302                .allocate_user_id_for_test()
3303                .await
3304                .expect("allocate id for t1");
3305            let (id_t2, global_id_t2) = catalog
3306                .allocate_user_id_for_test()
3307                .await
3308                .expect("allocate id for t2");
3309
3310            let oracle_write_ts = catalog.current_upper().await;
3311
3312            // Build two CreateItem ops.
3313            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3314                id,
3315                name: QualifiedItemName {
3316                    qualifiers: ItemQualifiers {
3317                        database_spec: database_spec.clone(),
3318                        schema_spec: schema_spec.clone(),
3319                    },
3320                    item: name.to_string(),
3321                },
3322                item: CatalogItem::Table(Table {
3323                    create_sql: Some(format!(
3324                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3325                    )),
3326                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3327                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3328                    conn_id: None,
3329                    resolved_ids: ResolvedIds::empty(),
3330                    custom_logical_compaction_window: None,
3331                    is_retained_metrics_object: false,
3332                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3333                }),
3334                owner_id: MZ_SYSTEM_ROLE_ID,
3335            };
3336
3337            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3338            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3339
3340            let base_state = catalog.state().clone();
3341
3342            // --- Path A: Incremental (two separate dry-run calls) ---
3343
3344            // First call: only op_t1, no previous snapshot.
3345            let (state_after_t1, snapshot_after_t1) = catalog
3346                .transact_incremental_dry_run(
3347                    &base_state,
3348                    vec![op_t1.clone()],
3349                    None,
3350                    None,
3351                    oracle_write_ts,
3352                )
3353                .await
3354                .expect("first dry run");
3355
3356            // After first dry run: t1 exists, t2 does not.
3357            assert!(
3358                state_after_t1.try_get_entry(&id_t1).is_some(),
3359                "t1 should exist after first dry run"
3360            );
3361            assert_eq!(
3362                state_after_t1
3363                    .try_get_entry(&id_t1)
3364                    .expect("t1 entry")
3365                    .name()
3366                    .item,
3367                "t1"
3368            );
3369            assert!(
3370                state_after_t1.try_get_entry(&id_t2).is_none(),
3371                "t2 should NOT exist after first dry run"
3372            );
3373
3374            // Second call: only op_t2, using state/snapshot from first call.
3375            let (state_incremental, _) = catalog
3376                .transact_incremental_dry_run(
3377                    &state_after_t1,
3378                    vec![op_t2.clone()],
3379                    None,
3380                    Some(snapshot_after_t1),
3381                    oracle_write_ts,
3382                )
3383                .await
3384                .expect("second dry run");
3385
3386            // After second dry run: both t1 and t2 exist.
3387            assert!(
3388                state_incremental.try_get_entry(&id_t1).is_some(),
3389                "t1 should exist in incremental result"
3390            );
3391            assert!(
3392                state_incremental.try_get_entry(&id_t2).is_some(),
3393                "t2 should exist in incremental result"
3394            );
3395
3396            // --- Path B: All-at-once (single dry-run call with both ops) ---
3397
3398            let (state_all_at_once, _) = catalog
3399                .transact_incremental_dry_run(
3400                    &base_state,
3401                    vec![op_t1.clone(), op_t2.clone()],
3402                    None,
3403                    None,
3404                    oracle_write_ts,
3405                )
3406                .await
3407                .expect("all-at-once dry run");
3408
3409            assert!(
3410                state_all_at_once.try_get_entry(&id_t1).is_some(),
3411                "t1 should exist in all-at-once result"
3412            );
3413            assert!(
3414                state_all_at_once.try_get_entry(&id_t2).is_some(),
3415                "t2 should exist in all-at-once result"
3416            );
3417
3418            // --- Compare: both paths produce equivalent items ---
3419
3420            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3421            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3422            assert_eq!(inc_t1.name(), all_t1.name());
3423            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3424
3425            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3426            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3427            assert_eq!(inc_t2.name(), all_t2.name());
3428            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3429
3430            catalog.expire().await;
3431        })
3432        .await
3433    }
3434
3435    /// Exercises the diff-and-prune semantics of the
3436    /// `Op::UpdateScopedSystemParameters` apply handler over the cluster scope.
3437    /// Covers four behaviors: upsert of a desired row, the bounded-prune
3438    /// protection that spares a live cluster outside `prune_scope`, removal of a
3439    /// stale in-scope row, and the orphan-prune that reclaims a row whose owning
3440    /// cluster was dropped even though that id is absent from `prune_scope`.
3441    #[mz_ore::test(tokio::test)]
3442    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3443    async fn test_transact_update_scoped_system_parameters_prune() {
3444        use std::collections::{BTreeMap, BTreeSet};
3445
3446        use mz_catalog::memory::objects::{ClusterConfig, ClusterVariant};
3447        use mz_controller_types::ClusterId;
3448
3449        use crate::catalog::DropObjectInfo;
3450        use crate::config::{ScopedParameters, ScopedParametersScope};
3451
3452        Catalog::with_debug(|mut catalog| async move {
3453            let param = "max_query_result_size".to_string();
3454            let value = "1MB".to_string();
3455
3456            // Allocate and create two live clusters, A and B, the same way
3457            // production code does.
3458            let commit_ts = catalog.current_upper().await;
3459            let cluster_a = catalog
3460                .allocate_user_cluster_id(commit_ts)
3461                .await
3462                .expect("allocate cluster A");
3463            let commit_ts = catalog.current_upper().await;
3464            let cluster_b = catalog
3465                .allocate_user_cluster_id(commit_ts)
3466                .await
3467                .expect("allocate cluster B");
3468
3469            let make_cluster_op = |id: ClusterId, name: &str| Op::CreateCluster {
3470                id,
3471                name: name.to_string(),
3472                introspection_sources: Vec::new(),
3473                owner_id: MZ_SYSTEM_ROLE_ID,
3474                config: ClusterConfig {
3475                    variant: ClusterVariant::Unmanaged,
3476                    workload_class: None,
3477                },
3478            };
3479
3480            let oracle_write_ts = catalog.current_upper().await;
3481            catalog
3482                .transact(
3483                    None,
3484                    oracle_write_ts,
3485                    None,
3486                    vec![
3487                        make_cluster_op(cluster_a, "test_cluster_a"),
3488                        make_cluster_op(cluster_b, "test_cluster_b"),
3489                    ],
3490                )
3491                .await
3492                .expect("create clusters");
3493
3494            // Reads the durable cluster system configuration rows.
3495            async fn rows(catalog: &Catalog) -> BTreeSet<(ClusterId, String, String)> {
3496                let mut storage = catalog.storage().await;
3497                let tx = storage.transaction().await.expect("open transaction");
3498                tx.get_cluster_system_configurations()
3499                    .map(|c| (c.cluster_id, c.name, c.value))
3500                    .collect()
3501            }
3502
3503            let scope_with = |ids: &[ClusterId]| ScopedParametersScope {
3504                clusters: ids.iter().copied().collect(),
3505                replicas: BTreeSet::new(),
3506            };
3507            let cluster_scoped = |id: ClusterId| {
3508                let mut cluster = BTreeMap::new();
3509                let mut overrides = BTreeMap::new();
3510                overrides.insert(param.clone(), value.clone());
3511                cluster.insert(id, overrides);
3512                ScopedParameters {
3513                    cluster,
3514                    replica: BTreeMap::new(),
3515                }
3516            };
3517            let empty_scoped = || ScopedParameters {
3518                cluster: BTreeMap::new(),
3519                replica: BTreeMap::new(),
3520            };
3521
3522            // Behavior 1: upsert. A is live and in scope, so the row is created.
3523            let oracle_write_ts = catalog.current_upper().await;
3524            catalog
3525                .transact(
3526                    None,
3527                    oracle_write_ts,
3528                    None,
3529                    vec![Op::UpdateScopedSystemParameters {
3530                        scoped: cluster_scoped(cluster_a),
3531                        prune_scope: Some(scope_with(&[cluster_a])),
3532                    }],
3533                )
3534                .await
3535                .expect("upsert A");
3536            assert!(
3537                rows(&catalog)
3538                    .await
3539                    .contains(&(cluster_a, param.clone(), value.clone())),
3540                "upsert must create the row for A"
3541            );
3542
3543            // Set up an additional row for B so the prune cases have something to
3544            // act on.
3545            let oracle_write_ts = catalog.current_upper().await;
3546            catalog
3547                .transact(
3548                    None,
3549                    oracle_write_ts,
3550                    None,
3551                    vec![Op::UpdateScopedSystemParameters {
3552                        scoped: cluster_scoped(cluster_b),
3553                        prune_scope: Some(scope_with(&[cluster_b])),
3554                    }],
3555                )
3556                .await
3557                .expect("upsert B");
3558            assert!(
3559                rows(&catalog)
3560                    .await
3561                    .contains(&(cluster_b, param.clone(), value.clone())),
3562                "setup must create the row for B"
3563            );
3564
3565            // Behavior 2: bounded prune. Running with empty `scoped` and a scope
3566            // that excludes the live cluster B must leave B's row intact, because
3567            // the update was not authoritative for B.
3568            let oracle_write_ts = catalog.current_upper().await;
3569            catalog
3570                .transact(
3571                    None,
3572                    oracle_write_ts,
3573                    None,
3574                    vec![Op::UpdateScopedSystemParameters {
3575                        scoped: empty_scoped(),
3576                        prune_scope: Some(scope_with(&[cluster_a])),
3577                    }],
3578                )
3579                .await
3580                .expect("bounded prune");
3581            assert!(
3582                rows(&catalog)
3583                    .await
3584                    .contains(&(cluster_b, param.clone(), value.clone())),
3585                "a live cluster outside prune_scope must keep its row"
3586            );
3587
3588            // Behavior 3: stale in-scope prune. A is live and in scope but not
3589            // desired, so its row is removed.
3590            let oracle_write_ts = catalog.current_upper().await;
3591            catalog
3592                .transact(
3593                    None,
3594                    oracle_write_ts,
3595                    None,
3596                    vec![Op::UpdateScopedSystemParameters {
3597                        scoped: empty_scoped(),
3598                        prune_scope: Some(scope_with(&[cluster_a])),
3599                    }],
3600                )
3601                .await
3602                .expect("stale in-scope prune");
3603            assert!(
3604                !rows(&catalog)
3605                    .await
3606                    .iter()
3607                    .any(|(id, _, _)| *id == cluster_a),
3608                "a live in-scope undesired row must be removed"
3609            );
3610
3611            // Behavior 4: orphan prune. Re-create A's row, drop cluster A, then run
3612            // the update with a scope that does NOT contain A. The sync loop only
3613            // ever places live ids in prune_scope, so the orphan must still be
3614            // reclaimed because its owning cluster is no longer live.
3615            let oracle_write_ts = catalog.current_upper().await;
3616            catalog
3617                .transact(
3618                    None,
3619                    oracle_write_ts,
3620                    None,
3621                    vec![Op::UpdateScopedSystemParameters {
3622                        scoped: cluster_scoped(cluster_a),
3623                        prune_scope: Some(scope_with(&[cluster_a])),
3624                    }],
3625                )
3626                .await
3627                .expect("re-create A row");
3628            assert!(
3629                rows(&catalog)
3630                    .await
3631                    .contains(&(cluster_a, param.clone(), value.clone())),
3632                "re-created row for A must exist"
3633            );
3634
3635            let oracle_write_ts = catalog.current_upper().await;
3636            catalog
3637                .transact(
3638                    None,
3639                    oracle_write_ts,
3640                    None,
3641                    vec![Op::DropObjects(vec![DropObjectInfo::Cluster(cluster_a)])],
3642                )
3643                .await
3644                .expect("drop cluster A");
3645
3646            // B is the only live cluster the sync loop would have evaluated, so
3647            // only B appears in prune_scope. A's id is deliberately absent.
3648            let oracle_write_ts = catalog.current_upper().await;
3649            catalog
3650                .transact(
3651                    None,
3652                    oracle_write_ts,
3653                    None,
3654                    vec![Op::UpdateScopedSystemParameters {
3655                        scoped: cluster_scoped(cluster_b),
3656                        prune_scope: Some(scope_with(&[cluster_b])),
3657                    }],
3658                )
3659                .await
3660                .expect("orphan prune");
3661            assert!(
3662                !rows(&catalog)
3663                    .await
3664                    .iter()
3665                    .any(|(id, _, _)| *id == cluster_a),
3666                "orphan row for a dropped cluster must be removed even when absent from prune_scope"
3667            );
3668
3669            catalog.expire().await;
3670        })
3671        .await
3672    }
3673}