Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
51    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
52    DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
53    RoleVars,
54};
55use mz_sql::names::{
56    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
57    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{NetworkPolicyRule, PlanError};
60use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
61use mz_sql::session::vars::OwnedVarInput;
62use mz_sql::session::vars::{Value as VarValue, VarInput};
63use mz_sql::{DEFAULT_SCHEMA, rbac};
64use mz_sql_parser::ast::{QualifiedReplica, Value};
65use mz_storage_client::storage_collections::StorageCollections;
66use serde::{Deserialize, Serialize};
67use tracing::{info, trace};
68
69use crate::AdapterError;
70use crate::catalog::state::LocalExpressionCache;
71use crate::catalog::{
72    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
73    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
74    is_reserved_role_name, object_type_to_audit_object_type,
75    system_object_type_to_audit_object_type,
76};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82/// A manually injected audit event.
83///
84/// Matches [`mz_audit_log::EventV1`], but without the `id` and `occurred_at` fields -- both are
85/// filled in automatically.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88    pub event_type: EventType,
89    pub object_type: ObjectType,
90    pub details: EventDetails,
91    pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96    AlterRetainHistory {
97        id: CatalogItemId,
98        value: Option<Value>,
99        window: CompactionWindow,
100    },
101    AlterSourceTimestampInterval {
102        id: CatalogItemId,
103        value: Option<Value>,
104        interval: Duration,
105    },
106    AlterRole {
107        id: RoleId,
108        name: String,
109        attributes: RoleAttributesRaw,
110        nopassword: bool,
111        vars: RoleVars,
112    },
113    AlterNetworkPolicy {
114        id: NetworkPolicyId,
115        rules: Vec<NetworkPolicyRule>,
116        name: String,
117        owner_id: RoleId,
118    },
119    AlterAddColumn {
120        id: CatalogItemId,
121        new_global_id: GlobalId,
122        name: ColumnName,
123        typ: SqlColumnType,
124        sql: RawDataType,
125    },
126    AlterMaterializedViewApplyReplacement {
127        id: CatalogItemId,
128        replacement_id: CatalogItemId,
129    },
130    CreateDatabase {
131        name: String,
132        owner_id: RoleId,
133    },
134    CreateSchema {
135        database_id: ResolvedDatabaseSpecifier,
136        schema_name: String,
137        owner_id: RoleId,
138    },
139    CreateRole {
140        name: String,
141        attributes: RoleAttributesRaw,
142    },
143    CreateCluster {
144        id: ClusterId,
145        name: String,
146        introspection_sources: Vec<&'static BuiltinLog>,
147        owner_id: RoleId,
148        config: ClusterConfig,
149    },
150    CreateClusterReplica {
151        cluster_id: ClusterId,
152        name: String,
153        config: ReplicaConfig,
154        owner_id: RoleId,
155        reason: ReplicaCreateDropReason,
156    },
157    CreateItem {
158        id: CatalogItemId,
159        name: QualifiedItemName,
160        item: CatalogItem,
161        owner_id: RoleId,
162    },
163    CreateNetworkPolicy {
164        rules: Vec<NetworkPolicyRule>,
165        name: String,
166        owner_id: RoleId,
167    },
168    Comment {
169        object_id: CommentObjectId,
170        sub_component: Option<usize>,
171        comment: Option<String>,
172    },
173    DropObjects(Vec<DropObjectInfo>),
174    GrantRole {
175        role_id: RoleId,
176        member_id: RoleId,
177        grantor_id: RoleId,
178    },
179    RenameCluster {
180        id: ClusterId,
181        name: String,
182        to_name: String,
183        check_reserved_names: bool,
184    },
185    RenameClusterReplica {
186        cluster_id: ClusterId,
187        replica_id: ReplicaId,
188        name: QualifiedReplica,
189        to_name: String,
190    },
191    RenameItem {
192        id: CatalogItemId,
193        current_full_name: FullItemName,
194        to_name: String,
195    },
196    RenameSchema {
197        database_spec: ResolvedDatabaseSpecifier,
198        schema_spec: SchemaSpecifier,
199        new_name: String,
200        check_reserved_names: bool,
201    },
202    UpdateOwner {
203        id: ObjectId,
204        new_owner: RoleId,
205    },
206    UpdatePrivilege {
207        target_id: SystemObjectId,
208        privilege: MzAclItem,
209        variant: UpdatePrivilegeVariant,
210    },
211    UpdateDefaultPrivilege {
212        privilege_object: DefaultPrivilegeObject,
213        privilege_acl_item: DefaultPrivilegeAclItem,
214        variant: UpdatePrivilegeVariant,
215    },
216    RevokeRole {
217        role_id: RoleId,
218        member_id: RoleId,
219        grantor_id: RoleId,
220    },
221    UpdateClusterConfig {
222        id: ClusterId,
223        name: String,
224        config: ClusterConfig,
225    },
226    UpdateClusterReplicaConfig {
227        cluster_id: ClusterId,
228        replica_id: ReplicaId,
229        config: ReplicaConfig,
230    },
231    UpdateItem {
232        id: CatalogItemId,
233        name: QualifiedItemName,
234        to_item: CatalogItem,
235    },
236    UpdateSourceReferences {
237        source_id: CatalogItemId,
238        references: SourceReferences,
239    },
240    UpdateSystemConfiguration {
241        name: String,
242        value: OwnedVarInput,
243    },
244    ResetSystemConfiguration {
245        name: String,
246    },
247    ResetAllSystemConfiguration,
248    /// Performs updates to the storage usage table, which probably should be a builtin source.
249    ///
250    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
251    /// might not work. If a process crashes after collecting storage usage events
252    /// but before updating the builtin table, then another listening catalog
253    /// will never know to update the builtin table.
254    WeirdStorageUsageUpdates {
255        object_id: Option<String>,
256        size_bytes: u64,
257        collection_timestamp: EpochMillis,
258    },
259    /// Injects audit events into the catalog.
260    ///
261    /// This is a nonstandard path used for manually appending audit events at the current time.
262    /// Mainly useful for correcting the audit log in the face of bugs that made us forget to emit
263    /// audit events.
264    InjectAuditEvents {
265        events: Vec<InjectedAuditEvent>,
266    },
267}
268
269/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
270/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
271/// the `Op::DropObjects`.
272#[derive(Debug, Clone)]
273pub enum DropObjectInfo {
274    Cluster(ClusterId),
275    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
276    Database(DatabaseId),
277    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
278    Role(RoleId),
279    Item(CatalogItemId),
280    NetworkPolicy(NetworkPolicyId),
281}
282
283impl DropObjectInfo {
284    /// Creates a `DropObjectInfo` from an `ObjectId`.
285    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
286    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
287        match id {
288            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
289            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
290                cluster_id,
291                replica_id,
292                ReplicaCreateDropReason::Manual,
293            )),
294            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
295            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
296            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
297            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
298            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
299        }
300    }
301
302    /// Creates an `ObjectId` from a `DropObjectInfo`.
303    /// Loses the `ReplicaCreateDropReason` if there is one!
304    fn to_object_id(&self) -> ObjectId {
305        match &self {
306            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
307            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
308                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
309            }
310            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
311            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
312            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
313            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
314            DropObjectInfo::NetworkPolicy(network_policy_id) => {
315                ObjectId::NetworkPolicy(network_policy_id.clone())
316            }
317        }
318    }
319}
320
321/// The reason for creating or dropping a replica.
322#[derive(Debug, Clone)]
323pub enum ReplicaCreateDropReason {
324    /// The user initiated the replica create or drop, e.g., by
325    /// - creating/dropping a cluster,
326    /// - ALTERing various options on a managed cluster,
327    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
328    Manual,
329    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
330    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
331    ClusterScheduling(Vec<SchedulingDecision>),
332}
333
334impl ReplicaCreateDropReason {
335    pub fn into_audit_log(
336        self,
337    ) -> (
338        CreateOrDropClusterReplicaReasonV1,
339        Option<SchedulingDecisionsWithReasonsV2>,
340    ) {
341        let (reason, scheduling_policies) = match self {
342            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
343            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
344                CreateOrDropClusterReplicaReasonV1::Schedule,
345                Some(scheduling_decisions),
346            ),
347        };
348        (
349            reason,
350            scheduling_policies
351                .as_ref()
352                .map(SchedulingDecision::reasons_to_audit_log_reasons),
353        )
354    }
355}
356
357pub struct TransactionResult {
358    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
359    /// Parsed catalog updates from which we will derive catalog implications.
360    pub catalog_updates: Vec<ParsedStateUpdate>,
361    pub audit_events: Vec<VersionedEvent>,
362}
363
364#[derive(Debug, Clone, Copy)]
365enum TransactInnerMode {
366    /// Prepare storage state and return a commit-ready transaction state.
367    ///
368    /// This mode is used by durable catalog transactions.
369    Commit,
370    /// Execute a dry run against a durable transaction that will not be
371    /// committed.
372    ///
373    /// This mode still validates and applies updates to an in-memory
374    /// `CatalogState`, but it must not call `prepare_state` because dry runs
375    /// must not trigger controller side effects.
376    DryRun,
377}
378
379impl Catalog {
380    fn should_audit_log_item(item: &CatalogItem) -> bool {
381        !item.is_temporary()
382    }
383
384    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
385    /// within a connection id.
386    fn temporary_ids(
387        &self,
388        ops: &[Op],
389        temporary_drops: BTreeSet<(&ConnectionId, String)>,
390    ) -> Result<BTreeSet<CatalogItemId>, Error> {
391        let mut creating = BTreeSet::new();
392        let mut temporary_ids = BTreeSet::new();
393        for op in ops.iter() {
394            if let Op::CreateItem {
395                id,
396                name,
397                item,
398                owner_id: _,
399            } = op
400            {
401                if let Some(conn_id) = item.conn_id() {
402                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
403                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
404                        || creating.contains(&(conn_id, &name.item))
405                    {
406                        return Err(
407                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
408                        );
409                    } else {
410                        creating.insert((conn_id, &name.item));
411                        temporary_ids.insert(id.clone());
412                    }
413                }
414            }
415        }
416        Ok(temporary_ids)
417    }
418
419    #[instrument(name = "catalog::transact")]
420    pub async fn transact(
421        &mut self,
422        // n.b. this is an option to prevent us from needing to build out a
423        // dummy impl of `StorageController` for tests.
424        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
425        oracle_write_ts: mz_repr::Timestamp,
426        session: Option<&ConnMeta>,
427        ops: Vec<Op>,
428    ) -> Result<TransactionResult, AdapterError> {
429        trace!("transact: {:?}", ops);
430        fail::fail_point!("catalog_transact", |arg| {
431            Err(AdapterError::Unstructured(anyhow::anyhow!(
432                "failpoint: {arg:?}"
433            )))
434        });
435
436        let drop_ids: BTreeSet<CatalogItemId> = ops
437            .iter()
438            .filter_map(|op| match op {
439                Op::DropObjects(drop_object_infos) => {
440                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
441                    let item_ids = ids.filter_map(|id| match id {
442                        ObjectId::Item(id) => Some(id),
443                        _ => None,
444                    });
445                    Some(item_ids)
446                }
447                _ => None,
448            })
449            .flatten()
450            .collect();
451        let temporary_drops = drop_ids
452            .iter()
453            .filter_map(|id| {
454                let entry = self.get_entry(id);
455                match entry.item().conn_id() {
456                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
457                    None => None,
458                }
459            })
460            .collect();
461
462        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
463        let mut builtin_table_updates = vec![];
464        let mut catalog_updates = vec![];
465        let mut audit_events = vec![];
466        let mut storage = self.storage().await;
467        let mut tx = storage
468            .transaction()
469            .await
470            .unwrap_or_terminate("starting catalog transaction");
471
472        let new_state = Self::transact_inner(
473            TransactInnerMode::Commit,
474            storage_collections,
475            oracle_write_ts,
476            session,
477            ops,
478            temporary_ids,
479            &mut builtin_table_updates,
480            &mut catalog_updates,
481            &mut audit_events,
482            &mut tx,
483            &self.state,
484        )
485        .await?;
486
487        // The user closure was successful, apply the updates. Terminate the
488        // process if this fails, because we have to restart envd due to
489        // indeterminate catalog state, which we only reconcile during catalog
490        // init.
491        tx.commit(oracle_write_ts)
492            .await
493            .unwrap_or_terminate("catalog storage transaction commit must succeed");
494
495        // Dropping here keeps the mutable borrow on self, preventing us accidentally
496        // mutating anything until after f is executed.
497        drop(storage);
498        if let Some(new_state) = new_state {
499            self.transient_revision += 1;
500            self.state = new_state;
501        }
502
503        Ok(TransactionResult {
504            builtin_table_updates,
505            catalog_updates,
506            audit_events,
507        })
508    }
509
510    /// Performs an incremental dry-run catalog transaction: processes only the
511    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
512    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
513    ///
514    /// The durable transaction is intentionally never committed and no storage
515    /// controller prepare-state side effects are run.
516    ///
517    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
518    /// snapshot (which represents the tx state after the previous dry run),
519    /// ensuring it starts in sync with `base_state`. If `None` (first
520    /// statement), a fresh transaction is loaded from durable storage.
521    ///
522    /// Returns the new accumulated state and a snapshot of the transaction's
523    /// state for use in subsequent incremental dry runs.
524    pub async fn transact_incremental_dry_run(
525        &self,
526        base_state: &CatalogState,
527        ops: Vec<Op>,
528        session: Option<&ConnMeta>,
529        prev_snapshot: Option<Snapshot>,
530        oracle_write_ts: mz_repr::Timestamp,
531    ) -> Result<(CatalogState, Snapshot), AdapterError> {
532        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
533        // but we still need to check for collisions.
534        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
535
536        let mut builtin_table_updates = vec![];
537        let mut catalog_updates = vec![];
538        let mut audit_events = vec![];
539        let mut storage = self.storage().await;
540        let mut tx = if let Some(snapshot) = prev_snapshot {
541            // Restore transaction from saved snapshot so it starts in sync
542            // with the accumulated CatalogState from previous dry runs.
543            storage
544                .transaction_from_snapshot(snapshot)
545                .unwrap_or_terminate("starting catalog transaction from snapshot")
546        } else {
547            // First statement: fresh transaction from durable storage, which
548            // is in sync with the real catalog state.
549            storage
550                .transaction()
551                .await
552                .unwrap_or_terminate("starting catalog transaction")
553        };
554
555        // Process only the new ops against the accumulated state in dry-run mode.
556        let new_state = Self::transact_inner(
557            TransactInnerMode::DryRun,
558            None,
559            oracle_write_ts,
560            session,
561            ops,
562            temporary_ids,
563            &mut builtin_table_updates,
564            &mut catalog_updates,
565            &mut audit_events,
566            &mut tx,
567            base_state,
568        )
569        .await?;
570
571        // Save the transaction's current state as a snapshot for the next
572        // incremental dry run.
573        let new_snapshot = tx.current_snapshot();
574
575        // Transaction is NOT committed — drop it.
576        drop(storage);
577
578        // transact_inner returns Some(state) when ops produced changes.
579        let state = new_state.unwrap_or_else(|| base_state.clone());
580        Ok((state, new_snapshot))
581    }
582
583    /// Extracts optimized expressions from `Op::CreateItem` operations for views
584    /// and materialized views. These can be used to populate a `LocalExpressionCache`
585    /// to avoid re-optimization during `apply_updates`.
586    fn extract_expressions_from_ops(
587        ops: &[Op],
588        optimizer_features: &OptimizerFeatures,
589    ) -> BTreeMap<GlobalId, LocalExpressions> {
590        let mut exprs = BTreeMap::new();
591
592        for op in ops {
593            if let Op::CreateItem { item, .. } = op {
594                match item {
595                    CatalogItem::View(view) => {
596                        exprs.insert(
597                            view.global_id,
598                            LocalExpressions {
599                                local_mir: (*view.locally_optimized_expr).clone(),
600                                optimizer_features: optimizer_features.clone(),
601                            },
602                        );
603                    }
604                    CatalogItem::MaterializedView(mv) => {
605                        exprs.insert(
606                            mv.global_id_writes(),
607                            LocalExpressions {
608                                local_mir: (*mv.locally_optimized_expr).clone(),
609                                optimizer_features: optimizer_features.clone(),
610                            },
611                        );
612                    }
613                    CatalogItem::Table(_)
614                    | CatalogItem::Source(_)
615                    | CatalogItem::Log(_)
616                    | CatalogItem::Sink(_)
617                    | CatalogItem::Index(_)
618                    | CatalogItem::Type(_)
619                    | CatalogItem::Func(_)
620                    | CatalogItem::Secret(_)
621                    | CatalogItem::Connection(_)
622                    | CatalogItem::ContinualTask(_) => {}
623                }
624            }
625        }
626
627        exprs
628    }
629
630    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
631    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
632    ///
633    /// `mode` controls whether storage prepare-state side effects are allowed.
634    /// In `DryRun` mode, this method may update the in-memory state returned to
635    /// the caller, but it must not trigger controller side effects.
636    ///
637    #[instrument(name = "catalog::transact_inner")]
638    async fn transact_inner(
639        mode: TransactInnerMode,
640        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
641        oracle_write_ts: mz_repr::Timestamp,
642        session: Option<&ConnMeta>,
643        ops: Vec<Op>,
644        temporary_ids: BTreeSet<CatalogItemId>,
645        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
646        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
647        audit_events: &mut Vec<VersionedEvent>,
648        tx: &mut Transaction<'_>,
649        state: &CatalogState,
650    ) -> Result<Option<CatalogState>, AdapterError> {
651        // We come up with new catalog state, builtin state updates, and parsed
652        // catalog updates (for deriving catalog implications) in two phases:
653        //
654        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
655        //    one-by-one. This will give us the full list of updates to apply to
656        //    the catalog, which will allow us to apply it in one batch, which
657        //    in turn will allow the apply machinery to consolidate the updates.
658        // 2. We do one final apply call with all updates, which gives us the
659        //    final builtin table updates and parsed catalog updates.
660        //
661        // The reason is that the loop that is working off ops first does a
662        // transact_op to derive the state updates for that op, and then calls
663        // apply_updates on the catalog state. And successive ops might expect
664        // the catalog state to reflect the modified state _after_ applying
665        // previous ops.
666        //
667        // We want to, however, have one final apply_state that takes all the
668        // accumulated updates to derive the required controller updates and the
669        // builtin table updates.
670        //
671        // We won't win any DDL throughput benchmarks, but so far that's not
672        // what we're optimizing for and there would probably be other
673        // bottlenecks before we hit this one as a bottleneck.
674        //
675        // We could work around this by refactoring how the interplay of
676        // transact_op and apply_updates works, but that's a larger undertaking.
677        let mut preliminary_state = Cow::Borrowed(state);
678
679        // The final state that we will return, if modified.
680        let mut state = Cow::Borrowed(state);
681
682        if ops.is_empty() {
683            return Ok(None);
684        }
685
686        // Extract optimized expressions from CreateItem ops to avoid re-optimization
687        // during apply_updates. We extract before the loop since `ops` is moved there.
688        let optimizer_features = OptimizerFeatures::from(state.system_config());
689        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
690
691        let mut storage_collections_to_create = BTreeSet::new();
692        let mut storage_collections_to_drop = BTreeSet::new();
693        let mut storage_collections_to_register = BTreeMap::new();
694
695        let mut updates = Vec::new();
696
697        for op in ops {
698            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
699                oracle_write_ts,
700                session,
701                op,
702                &temporary_ids,
703                audit_events,
704                tx,
705                &*preliminary_state,
706                &mut storage_collections_to_create,
707                &mut storage_collections_to_drop,
708                &mut storage_collections_to_register,
709            )
710            .await?;
711
712            // Certain builtin tables are not derived from the durable catalog state, so they need
713            // to be updated ad-hoc based on the current transaction. This is weird and will not
714            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
715            // this instance crashes after committing the durable catalog but before applying the
716            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
717            // world. Currently, this works fine and there are no correctness issues because
718            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
719            // state of all builtin tables to the correct values.
720            if let Some(builtin_table_update) = weird_builtin_table_update {
721                builtin_table_updates.push(builtin_table_update);
722            }
723
724            // Temporary items are not stored in the durable catalog, so they need to be handled
725            // separately for updating state and builtin tables.
726            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
727            // in a multi-subscriber catalog world.
728            let upper = tx.upper();
729            let temporary_item_updates =
730                temporary_item_updates
731                    .into_iter()
732                    .map(|(item, diff)| StateUpdate {
733                        kind: StateUpdateKind::TemporaryItem(item),
734                        ts: upper,
735                        diff,
736                    });
737
738            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
739            op_updates.extend(temporary_item_updates);
740            if !op_updates.is_empty() {
741                // Clone the cache so each apply_updates call has access to cached expressions.
742                // The cache uses `remove` semantics, so we need a fresh clone for each call.
743                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
744                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
745                    .to_mut()
746                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
747                    .await;
748            }
749            updates.append(&mut op_updates);
750        }
751
752        if !updates.is_empty() {
753            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
754            let (op_builtin_table_updates, op_catalog_updates) = state
755                .to_mut()
756                .apply_updates(updates.clone(), &mut local_expr_cache)
757                .await;
758            let op_builtin_table_updates = state
759                .to_mut()
760                .resolve_builtin_table_updates(op_builtin_table_updates);
761            builtin_table_updates.extend(op_builtin_table_updates);
762            parsed_catalog_updates.extend(op_catalog_updates);
763        }
764
765        match mode {
766            TransactInnerMode::Commit => {
767                // `storage_collections` can be `None` in tests.
768                if let Some(c) = storage_collections {
769                    c.prepare_state(
770                        tx,
771                        storage_collections_to_create,
772                        storage_collections_to_drop,
773                        storage_collections_to_register,
774                    )
775                    .await?;
776                }
777            }
778            TransactInnerMode::DryRun => {
779                debug_assert!(
780                    storage_collections.is_none(),
781                    "dry-run mode must not prepare storage state"
782                );
783            }
784        }
785
786        let updates = tx.get_and_commit_op_updates();
787        if !updates.is_empty() {
788            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
789            let (op_builtin_table_updates, op_catalog_updates) = state
790                .to_mut()
791                .apply_updates(updates.clone(), &mut local_expr_cache)
792                .await;
793            let op_builtin_table_updates = state
794                .to_mut()
795                .resolve_builtin_table_updates(op_builtin_table_updates);
796            builtin_table_updates.extend(op_builtin_table_updates);
797            parsed_catalog_updates.extend(op_catalog_updates);
798        }
799
800        match state {
801            Cow::Owned(state) => Ok(Some(state)),
802            Cow::Borrowed(_) => Ok(None),
803        }
804    }
805
806    /// Performs the transaction operation described by `op`. This function prepares the changes in
807    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
808    /// changes.
809    ///
810    /// Optionally returns a builtin table update for any builtin table updates than cannot be
811    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
812    /// scenarios and ideally in the future don't exist.
813    #[instrument]
814    async fn transact_op(
815        oracle_write_ts: mz_repr::Timestamp,
816        session: Option<&ConnMeta>,
817        op: Op,
818        temporary_ids: &BTreeSet<CatalogItemId>,
819        audit_events: &mut Vec<VersionedEvent>,
820        tx: &mut Transaction<'_>,
821        state: &CatalogState,
822        storage_collections_to_create: &mut BTreeSet<GlobalId>,
823        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
824        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
825    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
826        let mut weird_builtin_table_update = None;
827        let mut temporary_item_updates = Vec::new();
828
829        match op {
830            Op::AlterRetainHistory { id, value, window } => {
831                let entry = state.get_entry(&id);
832                if id.is_system() {
833                    let name = entry.name();
834                    let full_name =
835                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
836                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
837                        full_name.to_string(),
838                    ))));
839                }
840
841                let mut new_entry = entry.clone();
842                let previous = new_entry
843                    .item
844                    .update_retain_history(value.clone(), window)
845                    .map_err(|_| {
846                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
847                            "planner should have rejected invalid alter retain history item type"
848                                .to_string(),
849                        )))
850                    })?;
851
852                if Self::should_audit_log_item(new_entry.item()) {
853                    let details =
854                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
855                            id: id.to_string(),
856                            old_history: previous.map(|previous| previous.to_string()),
857                            new_history: value.map(|v| v.to_string()),
858                        });
859                    CatalogState::add_to_audit_log(
860                        &state.system_configuration,
861                        oracle_write_ts,
862                        session,
863                        tx,
864                        audit_events,
865                        EventType::Alter,
866                        catalog_type_to_audit_object_type(new_entry.item().typ()),
867                        details,
868                    )?;
869                }
870
871                tx.update_item(id, new_entry.into())?;
872
873                Self::log_update(state, &id);
874            }
875            Op::AlterSourceTimestampInterval {
876                id,
877                value,
878                interval,
879            } => {
880                let entry = state.get_entry(&id);
881                if id.is_system() {
882                    let name = entry.name();
883                    let full_name =
884                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
885                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
886                        full_name.to_string(),
887                    ))));
888                }
889
890                let mut new_entry = entry.clone();
891                new_entry
892                    .item
893                    .update_timestamp_interval(value, interval)
894                    .map_err(|_| {
895                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
896                            "planner should have rejected invalid alter timestamp interval item type"
897                                .to_string(),
898                        )))
899                    })?;
900
901                tx.update_item(id, new_entry.into())?;
902
903                Self::log_update(state, &id);
904            }
905            Op::AlterRole {
906                id,
907                name,
908                attributes,
909                nopassword,
910                vars,
911            } => {
912                state.ensure_not_reserved_role(&id)?;
913
914                let mut existing_role = state.get_role(&id).clone();
915                let password = attributes.password.clone();
916                let scram_iterations = attributes
917                    .scram_iterations
918                    .unwrap_or_else(|| state.system_config().scram_iterations());
919                existing_role.attributes = attributes.into();
920                existing_role.vars = vars;
921                let password_action = if nopassword {
922                    PasswordAction::Clear
923                } else if let Some(password) = password {
924                    PasswordAction::Set(PasswordConfig {
925                        password,
926                        scram_iterations,
927                    })
928                } else {
929                    PasswordAction::NoChange
930                };
931                tx.update_role(id, existing_role.into(), password_action)?;
932
933                CatalogState::add_to_audit_log(
934                    &state.system_configuration,
935                    oracle_write_ts,
936                    session,
937                    tx,
938                    audit_events,
939                    EventType::Alter,
940                    ObjectType::Role,
941                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
942                        id: id.to_string(),
943                        name: name.clone(),
944                    }),
945                )?;
946
947                info!("update role {name} ({id})");
948            }
949            Op::AlterNetworkPolicy {
950                id,
951                rules,
952                name,
953                owner_id: _owner_id,
954            } => {
955                let existing_policy = state.get_network_policy(&id).clone();
956                let mut policy: NetworkPolicy = existing_policy.into();
957                policy.rules = rules;
958                if is_reserved_name(&name) {
959                    return Err(AdapterError::Catalog(Error::new(
960                        ErrorKind::ReservedNetworkPolicyName(name),
961                    )));
962                }
963                tx.update_network_policy(id, policy.clone())?;
964
965                CatalogState::add_to_audit_log(
966                    &state.system_configuration,
967                    oracle_write_ts,
968                    session,
969                    tx,
970                    audit_events,
971                    EventType::Alter,
972                    ObjectType::NetworkPolicy,
973                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
974                        id: id.to_string(),
975                        name: name.clone(),
976                    }),
977                )?;
978
979                info!("update network policy {name} ({id})");
980            }
981            Op::AlterAddColumn {
982                id,
983                new_global_id,
984                name,
985                typ,
986                sql,
987            } => {
988                let mut new_entry = state.get_entry(&id).clone();
989                let version = new_entry.item.add_column(name, typ, sql)?;
990                // All versions of a table share the same shard, so it shouldn't matter what
991                // GlobalId we use here.
992                let shard_id = state
993                    .storage_metadata()
994                    .get_collection_shard(new_entry.latest_global_id())?;
995
996                // TODO(alter_table): Support adding columns to sources.
997                let CatalogItem::Table(table) = &mut new_entry.item else {
998                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
999                };
1000                table.collections.insert(version, new_global_id);
1001
1002                tx.update_item(id, new_entry.into())?;
1003                storage_collections_to_register.insert(new_global_id, shard_id);
1004            }
1005            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1006                let mut new_entry = state.get_entry(&id).clone();
1007                let replacement = state.get_entry(&replacement_id);
1008
1009                let new_audit_events =
1010                    apply_replacement_audit_events(state, &new_entry, replacement);
1011
1012                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1013                    return Err(AdapterError::internal(
1014                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1015                        "id must refer to a materialized view",
1016                    ));
1017                };
1018                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1019                    return Err(AdapterError::internal(
1020                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1021                        "replacement_id must refer to a materialized view",
1022                    ));
1023                };
1024
1025                mv.apply_replacement(replacement_mv.clone());
1026
1027                tx.remove_item(replacement_id)?;
1028
1029                new_entry.id = replacement_id;
1030                tx_replace_item(tx, state, id, new_entry)?;
1031
1032                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1033                tx.drop_comments(&[comment_id].into())?;
1034
1035                for (event_type, details) in new_audit_events {
1036                    CatalogState::add_to_audit_log(
1037                        &state.system_configuration,
1038                        oracle_write_ts,
1039                        session,
1040                        tx,
1041                        audit_events,
1042                        event_type,
1043                        ObjectType::MaterializedView,
1044                        details,
1045                    )?;
1046                }
1047            }
1048            Op::CreateDatabase { name, owner_id } => {
1049                let database_owner_privileges = vec![rbac::owner_privilege(
1050                    mz_sql::catalog::ObjectType::Database,
1051                    owner_id,
1052                )];
1053                let database_default_privileges = state
1054                    .default_privileges
1055                    .get_applicable_privileges(
1056                        owner_id,
1057                        None,
1058                        None,
1059                        mz_sql::catalog::ObjectType::Database,
1060                    )
1061                    .map(|item| item.mz_acl_item(owner_id));
1062                let database_privileges: Vec<_> = merge_mz_acl_items(
1063                    database_owner_privileges
1064                        .into_iter()
1065                        .chain(database_default_privileges),
1066                )
1067                .collect();
1068
1069                let schema_owner_privileges = vec![rbac::owner_privilege(
1070                    mz_sql::catalog::ObjectType::Schema,
1071                    owner_id,
1072                )];
1073                let schema_default_privileges = state
1074                    .default_privileges
1075                    .get_applicable_privileges(
1076                        owner_id,
1077                        None,
1078                        None,
1079                        mz_sql::catalog::ObjectType::Schema,
1080                    )
1081                    .map(|item| item.mz_acl_item(owner_id))
1082                    // Special default privilege on public schemas.
1083                    .chain(std::iter::once(MzAclItem {
1084                        grantee: RoleId::Public,
1085                        grantor: owner_id,
1086                        acl_mode: AclMode::USAGE,
1087                    }));
1088                let schema_privileges: Vec<_> = merge_mz_acl_items(
1089                    schema_owner_privileges
1090                        .into_iter()
1091                        .chain(schema_default_privileges),
1092                )
1093                .collect();
1094
1095                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1096                let (database_id, _) = tx.insert_user_database(
1097                    &name,
1098                    owner_id,
1099                    database_privileges.clone(),
1100                    &temporary_oids,
1101                )?;
1102                let (schema_id, _) = tx.insert_user_schema(
1103                    database_id,
1104                    DEFAULT_SCHEMA,
1105                    owner_id,
1106                    schema_privileges.clone(),
1107                    &temporary_oids,
1108                )?;
1109                CatalogState::add_to_audit_log(
1110                    &state.system_configuration,
1111                    oracle_write_ts,
1112                    session,
1113                    tx,
1114                    audit_events,
1115                    EventType::Create,
1116                    ObjectType::Database,
1117                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1118                        id: database_id.to_string(),
1119                        name: name.clone(),
1120                    }),
1121                )?;
1122                info!("create database {}", name);
1123
1124                CatalogState::add_to_audit_log(
1125                    &state.system_configuration,
1126                    oracle_write_ts,
1127                    session,
1128                    tx,
1129                    audit_events,
1130                    EventType::Create,
1131                    ObjectType::Schema,
1132                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1133                        id: schema_id.to_string(),
1134                        name: DEFAULT_SCHEMA.to_string(),
1135                        database_name: Some(name),
1136                    }),
1137                )?;
1138            }
1139            Op::CreateSchema {
1140                database_id,
1141                schema_name,
1142                owner_id,
1143            } => {
1144                if is_reserved_name(&schema_name) {
1145                    return Err(AdapterError::Catalog(Error::new(
1146                        ErrorKind::ReservedSchemaName(schema_name),
1147                    )));
1148                }
1149                let database_id = match database_id {
1150                    ResolvedDatabaseSpecifier::Id(id) => id,
1151                    ResolvedDatabaseSpecifier::Ambient => {
1152                        return Err(AdapterError::Catalog(Error::new(
1153                            ErrorKind::ReadOnlySystemSchema(schema_name),
1154                        )));
1155                    }
1156                };
1157                let owner_privileges = vec![rbac::owner_privilege(
1158                    mz_sql::catalog::ObjectType::Schema,
1159                    owner_id,
1160                )];
1161                let default_privileges = state
1162                    .default_privileges
1163                    .get_applicable_privileges(
1164                        owner_id,
1165                        Some(database_id),
1166                        None,
1167                        mz_sql::catalog::ObjectType::Schema,
1168                    )
1169                    .map(|item| item.mz_acl_item(owner_id));
1170                let privileges: Vec<_> =
1171                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1172                        .collect();
1173                let (schema_id, _) = tx.insert_user_schema(
1174                    database_id,
1175                    &schema_name,
1176                    owner_id,
1177                    privileges.clone(),
1178                    &state.get_temporary_oids().collect(),
1179                )?;
1180                CatalogState::add_to_audit_log(
1181                    &state.system_configuration,
1182                    oracle_write_ts,
1183                    session,
1184                    tx,
1185                    audit_events,
1186                    EventType::Create,
1187                    ObjectType::Schema,
1188                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1189                        id: schema_id.to_string(),
1190                        name: schema_name.clone(),
1191                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1192                    }),
1193                )?;
1194            }
1195            Op::CreateRole { name, attributes } => {
1196                if is_reserved_role_name(&name) {
1197                    return Err(AdapterError::Catalog(Error::new(
1198                        ErrorKind::ReservedRoleName(name),
1199                    )));
1200                }
1201                let membership = RoleMembership::new();
1202                let vars = RoleVars::default();
1203                let (id, _) = tx.insert_user_role(
1204                    name.clone(),
1205                    attributes.clone(),
1206                    membership.clone(),
1207                    vars.clone(),
1208                    &state.get_temporary_oids().collect(),
1209                )?;
1210                CatalogState::add_to_audit_log(
1211                    &state.system_configuration,
1212                    oracle_write_ts,
1213                    session,
1214                    tx,
1215                    audit_events,
1216                    EventType::Create,
1217                    ObjectType::Role,
1218                    EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1219                        id: id.to_string(),
1220                        name: name.clone(),
1221                        auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1222                            AutoProvisionSource::Oidc => "oidc".to_string(),
1223                            AutoProvisionSource::Frontegg => "frontegg".to_string(),
1224                            AutoProvisionSource::None => "none".to_string(),
1225                        }),
1226                    }),
1227                )?;
1228                info!("create role {}", name);
1229            }
1230            Op::CreateCluster {
1231                id,
1232                name,
1233                introspection_sources,
1234                owner_id,
1235                config,
1236            } => {
1237                if is_reserved_name(&name) {
1238                    return Err(AdapterError::Catalog(Error::new(
1239                        ErrorKind::ReservedClusterName(name),
1240                    )));
1241                }
1242                let owner_privileges = vec![rbac::owner_privilege(
1243                    mz_sql::catalog::ObjectType::Cluster,
1244                    owner_id,
1245                )];
1246                let default_privileges = state
1247                    .default_privileges
1248                    .get_applicable_privileges(
1249                        owner_id,
1250                        None,
1251                        None,
1252                        mz_sql::catalog::ObjectType::Cluster,
1253                    )
1254                    .map(|item| item.mz_acl_item(owner_id));
1255                let privileges: Vec<_> =
1256                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1257                        .collect();
1258                let introspection_source_ids: Vec<_> = introspection_sources
1259                    .iter()
1260                    .map(|introspection_source| {
1261                        Transaction::allocate_introspection_source_index_id(
1262                            &id,
1263                            introspection_source.variant,
1264                        )
1265                    })
1266                    .collect();
1267
1268                let introspection_sources = introspection_sources
1269                    .into_iter()
1270                    .zip_eq(introspection_source_ids)
1271                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1272                    .collect();
1273
1274                tx.insert_user_cluster(
1275                    id,
1276                    &name,
1277                    introspection_sources,
1278                    owner_id,
1279                    privileges.clone(),
1280                    config.clone().into(),
1281                    &state.get_temporary_oids().collect(),
1282                )?;
1283                CatalogState::add_to_audit_log(
1284                    &state.system_configuration,
1285                    oracle_write_ts,
1286                    session,
1287                    tx,
1288                    audit_events,
1289                    EventType::Create,
1290                    ObjectType::Cluster,
1291                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1292                        id: id.to_string(),
1293                        name: name.clone(),
1294                    }),
1295                )?;
1296                info!("create cluster {}", name);
1297            }
1298            Op::CreateClusterReplica {
1299                cluster_id,
1300                name,
1301                config,
1302                owner_id,
1303                reason,
1304            } => {
1305                if is_reserved_name(&name) {
1306                    return Err(AdapterError::Catalog(Error::new(
1307                        ErrorKind::ReservedReplicaName(name),
1308                    )));
1309                }
1310                let cluster = state.get_cluster(cluster_id);
1311                let id =
1312                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1313                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1314                    size,
1315                    billed_as,
1316                    internal,
1317                    ..
1318                }) = &config.location
1319                {
1320                    let (reason, scheduling_policies) = reason.into_audit_log();
1321                    let details = EventDetails::CreateClusterReplicaV4(
1322                        mz_audit_log::CreateClusterReplicaV4 {
1323                            cluster_id: cluster_id.to_string(),
1324                            cluster_name: cluster.name.clone(),
1325                            replica_id: Some(id.to_string()),
1326                            replica_name: name.clone(),
1327                            logical_size: size.clone(),
1328                            billed_as: billed_as.clone(),
1329                            internal: *internal,
1330                            reason,
1331                            scheduling_policies,
1332                        },
1333                    );
1334                    CatalogState::add_to_audit_log(
1335                        &state.system_configuration,
1336                        oracle_write_ts,
1337                        session,
1338                        tx,
1339                        audit_events,
1340                        EventType::Create,
1341                        ObjectType::ClusterReplica,
1342                        details,
1343                    )?;
1344                }
1345            }
1346            Op::CreateItem {
1347                id,
1348                name,
1349                item,
1350                owner_id,
1351            } => {
1352                state.check_unstable_dependencies(&item)?;
1353
1354                match &item {
1355                    CatalogItem::Table(table) => {
1356                        let gids: Vec<_> = table.global_ids().collect();
1357                        assert_eq!(gids.len(), 1);
1358                        storage_collections_to_create.extend(gids);
1359                    }
1360                    CatalogItem::Source(source) => {
1361                        storage_collections_to_create.insert(source.global_id());
1362                    }
1363                    CatalogItem::MaterializedView(mv) => {
1364                        let mv_gid = mv.global_id_writes();
1365                        if let Some(target_id) = mv.replacement_target {
1366                            let target_gid = state.get_entry(&target_id).latest_global_id();
1367                            let shard_id =
1368                                state.storage_metadata().get_collection_shard(target_gid)?;
1369                            storage_collections_to_register.insert(mv_gid, shard_id);
1370                        } else {
1371                            storage_collections_to_create.insert(mv_gid);
1372                        }
1373                    }
1374                    CatalogItem::ContinualTask(ct) => {
1375                        storage_collections_to_create.insert(ct.global_id());
1376                    }
1377                    CatalogItem::Sink(sink) => {
1378                        storage_collections_to_create.insert(sink.global_id());
1379                    }
1380                    CatalogItem::Log(_)
1381                    | CatalogItem::View(_)
1382                    | CatalogItem::Index(_)
1383                    | CatalogItem::Type(_)
1384                    | CatalogItem::Func(_)
1385                    | CatalogItem::Secret(_)
1386                    | CatalogItem::Connection(_) => (),
1387                }
1388
1389                let system_user = session.map_or(false, |s| s.user().is_system_user());
1390                if !system_user {
1391                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1392                        let cluster_name = state.clusters_by_id[&id].name.clone();
1393                        return Err(AdapterError::Catalog(Error::new(
1394                            ErrorKind::ReadOnlyCluster(cluster_name),
1395                        )));
1396                    }
1397                }
1398
1399                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1400                let default_privileges = state
1401                    .default_privileges
1402                    .get_applicable_privileges(
1403                        owner_id,
1404                        name.qualifiers.database_spec.id(),
1405                        Some(name.qualifiers.schema_spec.into()),
1406                        item.typ().into(),
1407                    )
1408                    .map(|item| item.mz_acl_item(owner_id));
1409                // mz_support can read all progress sources.
1410                let progress_source_privilege = if item.is_progress_source() {
1411                    Some(MzAclItem {
1412                        grantee: MZ_SUPPORT_ROLE_ID,
1413                        grantor: owner_id,
1414                        acl_mode: AclMode::SELECT,
1415                    })
1416                } else {
1417                    None
1418                };
1419                let privileges: Vec<_> = merge_mz_acl_items(
1420                    owner_privileges
1421                        .into_iter()
1422                        .chain(default_privileges)
1423                        .chain(progress_source_privilege),
1424                )
1425                .collect();
1426
1427                let temporary_oids = state.get_temporary_oids().collect();
1428
1429                if item.is_temporary() {
1430                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1431                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1432                    {
1433                        return Err(AdapterError::Catalog(Error::new(
1434                            ErrorKind::InvalidTemporarySchema,
1435                        )));
1436                    }
1437                    let oid = tx.allocate_oid(&temporary_oids)?;
1438
1439                    let schema_id = name.qualifiers.schema_spec.clone().into();
1440                    let item_type = item.typ();
1441                    let (create_sql, global_id, versions) = item.to_serialized();
1442
1443                    let item = TemporaryItem {
1444                        id,
1445                        oid,
1446                        global_id,
1447                        schema_id,
1448                        name: name.item.clone(),
1449                        create_sql,
1450                        conn_id: item.conn_id().cloned(),
1451                        owner_id,
1452                        privileges: privileges.clone(),
1453                        extra_versions: versions,
1454                    };
1455                    temporary_item_updates.push((item, StateDiff::Addition));
1456
1457                    info!(
1458                        "create temporary {} {} ({})",
1459                        item_type,
1460                        state.resolve_full_name(&name, None),
1461                        id
1462                    );
1463                } else {
1464                    if let Some(temp_id) =
1465                        item.uses()
1466                            .iter()
1467                            .find(|id| match state.try_get_entry(*id) {
1468                                Some(entry) => entry.item().is_temporary(),
1469                                None => temporary_ids.contains(id),
1470                            })
1471                    {
1472                        let temp_item = state.get_entry(temp_id);
1473                        return Err(AdapterError::Catalog(Error::new(
1474                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1475                        )));
1476                    }
1477                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1478                        && !system_user
1479                    {
1480                        let schema_name = state
1481                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1482                            .schema;
1483                        return Err(AdapterError::Catalog(Error::new(
1484                            ErrorKind::ReadOnlySystemSchema(schema_name),
1485                        )));
1486                    }
1487                    let schema_id = name.qualifiers.schema_spec.clone().into();
1488                    let item_type = item.typ();
1489                    let (create_sql, global_id, versions) = item.to_serialized();
1490                    tx.insert_user_item(
1491                        id,
1492                        global_id,
1493                        schema_id,
1494                        &name.item,
1495                        create_sql,
1496                        owner_id,
1497                        privileges.clone(),
1498                        &temporary_oids,
1499                        versions,
1500                    )?;
1501                    info!(
1502                        "create {} {} ({})",
1503                        item_type,
1504                        state.resolve_full_name(&name, None),
1505                        id
1506                    );
1507                }
1508
1509                if Self::should_audit_log_item(&item) {
1510                    let name = Self::full_name_detail(
1511                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1512                    );
1513                    let details = match &item {
1514                        CatalogItem::Source(s) => {
1515                            let cluster_id = match s.data_source {
1516                                // Ingestion exports don't have their own cluster, but
1517                                // run on their ingestion's cluster.
1518                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1519                                    match state.get_entry(&ingestion_id).cluster_id() {
1520                                        Some(cluster_id) => Some(cluster_id.to_string()),
1521                                        None => None,
1522                                    }
1523                                }
1524                                _ => match item.cluster_id() {
1525                                    Some(cluster_id) => Some(cluster_id.to_string()),
1526                                    None => None,
1527                                },
1528                            };
1529
1530                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1531                                id: id.to_string(),
1532                                cluster_id,
1533                                name,
1534                                external_type: s.source_type().to_string(),
1535                            })
1536                        }
1537                        CatalogItem::Sink(s) => {
1538                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1539                                id: id.to_string(),
1540                                cluster_id: Some(s.cluster_id.to_string()),
1541                                name,
1542                                external_type: s.sink_type().to_string(),
1543                            })
1544                        }
1545                        CatalogItem::Index(i) => {
1546                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1547                                id: id.to_string(),
1548                                name,
1549                                cluster_id: i.cluster_id.to_string(),
1550                            })
1551                        }
1552                        CatalogItem::MaterializedView(mv) => {
1553                            EventDetails::CreateMaterializedViewV1(
1554                                mz_audit_log::CreateMaterializedViewV1 {
1555                                    id: id.to_string(),
1556                                    name,
1557                                    cluster_id: mv.cluster_id.to_string(),
1558                                    replacement_target_id: mv
1559                                        .replacement_target
1560                                        .map(|id| id.to_string()),
1561                                },
1562                            )
1563                        }
1564                        CatalogItem::Table(_)
1565                        | CatalogItem::Log(_)
1566                        | CatalogItem::View(_)
1567                        | CatalogItem::Type(_)
1568                        | CatalogItem::Func(_)
1569                        | CatalogItem::Secret(_)
1570                        | CatalogItem::Connection(_)
1571                        | CatalogItem::ContinualTask(_) => {
1572                            EventDetails::IdFullNameV1(IdFullNameV1 {
1573                                id: id.to_string(),
1574                                name,
1575                            })
1576                        }
1577                    };
1578                    CatalogState::add_to_audit_log(
1579                        &state.system_configuration,
1580                        oracle_write_ts,
1581                        session,
1582                        tx,
1583                        audit_events,
1584                        EventType::Create,
1585                        catalog_type_to_audit_object_type(item.typ()),
1586                        details,
1587                    )?;
1588                }
1589            }
1590            Op::CreateNetworkPolicy {
1591                rules,
1592                name,
1593                owner_id,
1594            } => {
1595                if state.network_policies_by_name.contains_key(&name) {
1596                    return Err(AdapterError::PlanError(PlanError::Catalog(
1597                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1598                    )));
1599                }
1600                if is_reserved_name(&name) {
1601                    return Err(AdapterError::Catalog(Error::new(
1602                        ErrorKind::ReservedNetworkPolicyName(name),
1603                    )));
1604                }
1605
1606                let owner_privileges = vec![rbac::owner_privilege(
1607                    mz_sql::catalog::ObjectType::NetworkPolicy,
1608                    owner_id,
1609                )];
1610                let default_privileges = state
1611                    .default_privileges
1612                    .get_applicable_privileges(
1613                        owner_id,
1614                        None,
1615                        None,
1616                        mz_sql::catalog::ObjectType::NetworkPolicy,
1617                    )
1618                    .map(|item| item.mz_acl_item(owner_id));
1619                let privileges: Vec<_> =
1620                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1621                        .collect();
1622
1623                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1624                let id = tx.insert_user_network_policy(
1625                    name.clone(),
1626                    rules,
1627                    privileges,
1628                    owner_id,
1629                    &temporary_oids,
1630                )?;
1631
1632                CatalogState::add_to_audit_log(
1633                    &state.system_configuration,
1634                    oracle_write_ts,
1635                    session,
1636                    tx,
1637                    audit_events,
1638                    EventType::Create,
1639                    ObjectType::NetworkPolicy,
1640                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1641                        id: id.to_string(),
1642                        name: name.clone(),
1643                    }),
1644                )?;
1645
1646                info!("created network policy {name} ({id})");
1647            }
1648            Op::Comment {
1649                object_id,
1650                sub_component,
1651                comment,
1652            } => {
1653                tx.update_comment(object_id, sub_component, comment)?;
1654                let entry = state.get_comment_id_entry(&object_id);
1655                let should_log = entry
1656                    .map(|entry| Self::should_audit_log_item(entry.item()))
1657                    // Things that aren't catalog entries can't be temp, so should be logged.
1658                    .unwrap_or(true);
1659                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1660                // comments won't be logged for now.
1661                if let (Some(conn_id), true) =
1662                    (session.map(|session| session.conn_id()), should_log)
1663                {
1664                    CatalogState::add_to_audit_log(
1665                        &state.system_configuration,
1666                        oracle_write_ts,
1667                        session,
1668                        tx,
1669                        audit_events,
1670                        EventType::Comment,
1671                        comment_id_to_audit_object_type(object_id),
1672                        EventDetails::IdNameV1(IdNameV1 {
1673                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1674                            id: format!("{object_id:?}"),
1675                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1676                        }),
1677                    )?;
1678                }
1679            }
1680            Op::UpdateSourceReferences {
1681                source_id,
1682                references,
1683            } => {
1684                tx.update_source_references(
1685                    source_id,
1686                    references
1687                        .references
1688                        .into_iter()
1689                        .map(|reference| reference.into())
1690                        .collect(),
1691                    references.updated_at,
1692                )?;
1693            }
1694            Op::DropObjects(drop_object_infos) => {
1695                // Generate all of the objects that need to get dropped.
1696                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1697
1698                // Drop any associated comments.
1699                tx.drop_comments(&delta.comments)?;
1700
1701                // Drop any items.
1702                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1703                    delta
1704                        .items
1705                        .iter()
1706                        .map(|id| id)
1707                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1708                tx.remove_items(&durable_items_to_drop)?;
1709                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1710                    let entry = state.get_entry(&id);
1711                    (entry.clone().into(), StateDiff::Retraction)
1712                }));
1713
1714                for item_id in delta.items {
1715                    let entry = state.get_entry(&item_id);
1716
1717                    if entry.item().is_storage_collection() {
1718                        storage_collections_to_drop.extend(entry.global_ids());
1719                    }
1720
1721                    if state.source_references.contains_key(&item_id) {
1722                        tx.remove_source_references(item_id)?;
1723                    }
1724
1725                    if Self::should_audit_log_item(entry.item()) {
1726                        CatalogState::add_to_audit_log(
1727                            &state.system_configuration,
1728                            oracle_write_ts,
1729                            session,
1730                            tx,
1731                            audit_events,
1732                            EventType::Drop,
1733                            catalog_type_to_audit_object_type(entry.item().typ()),
1734                            EventDetails::IdFullNameV1(IdFullNameV1 {
1735                                id: item_id.to_string(),
1736                                name: Self::full_name_detail(&state.resolve_full_name(
1737                                    entry.name(),
1738                                    session.map(|session| session.conn_id()),
1739                                )),
1740                            }),
1741                        )?;
1742                    }
1743                    info!(
1744                        "drop {} {} ({})",
1745                        entry.item_type(),
1746                        state.resolve_full_name(entry.name(), entry.conn_id()),
1747                        item_id
1748                    );
1749                }
1750
1751                // Drop any schemas.
1752                let schemas = delta
1753                    .schemas
1754                    .iter()
1755                    .map(|(schema_spec, database_spec)| {
1756                        (SchemaId::from(schema_spec), *database_spec)
1757                    })
1758                    .collect();
1759                tx.remove_schemas(&schemas)?;
1760
1761                for (schema_spec, database_spec) in delta.schemas {
1762                    let schema = state.get_schema(
1763                        &database_spec,
1764                        &schema_spec,
1765                        session
1766                            .map(|session| session.conn_id())
1767                            .unwrap_or(&SYSTEM_CONN_ID),
1768                    );
1769
1770                    let schema_id = SchemaId::from(schema_spec);
1771                    let database_id = match database_spec {
1772                        ResolvedDatabaseSpecifier::Ambient => None,
1773                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1774                    };
1775
1776                    CatalogState::add_to_audit_log(
1777                        &state.system_configuration,
1778                        oracle_write_ts,
1779                        session,
1780                        tx,
1781                        audit_events,
1782                        EventType::Drop,
1783                        ObjectType::Schema,
1784                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1785                            id: schema_id.to_string(),
1786                            name: schema.name.schema.to_string(),
1787                            database_name: database_id
1788                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1789                        }),
1790                    )?;
1791                }
1792
1793                // Drop any databases.
1794                tx.remove_databases(&delta.databases)?;
1795
1796                for database_id in delta.databases {
1797                    let database = state.get_database(&database_id).clone();
1798
1799                    CatalogState::add_to_audit_log(
1800                        &state.system_configuration,
1801                        oracle_write_ts,
1802                        session,
1803                        tx,
1804                        audit_events,
1805                        EventType::Drop,
1806                        ObjectType::Database,
1807                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1808                            id: database_id.to_string(),
1809                            name: database.name.clone(),
1810                        }),
1811                    )?;
1812                }
1813
1814                // Drop any roles.
1815                tx.remove_user_roles(&delta.roles)?;
1816
1817                for role_id in delta.roles {
1818                    let role = state
1819                        .roles_by_id
1820                        .get(&role_id)
1821                        .expect("catalog out of sync");
1822
1823                    CatalogState::add_to_audit_log(
1824                        &state.system_configuration,
1825                        oracle_write_ts,
1826                        session,
1827                        tx,
1828                        audit_events,
1829                        EventType::Drop,
1830                        ObjectType::Role,
1831                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1832                            id: role.id.to_string(),
1833                            name: role.name.clone(),
1834                        }),
1835                    )?;
1836                    info!("drop role {}", role.name());
1837                }
1838
1839                // Drop any network policies.
1840                tx.remove_network_policies(&delta.network_policies)?;
1841
1842                for network_policy_id in delta.network_policies {
1843                    let policy = state
1844                        .network_policies_by_id
1845                        .get(&network_policy_id)
1846                        .expect("catalog out of sync");
1847
1848                    CatalogState::add_to_audit_log(
1849                        &state.system_configuration,
1850                        oracle_write_ts,
1851                        session,
1852                        tx,
1853                        audit_events,
1854                        EventType::Drop,
1855                        ObjectType::NetworkPolicy,
1856                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1857                            id: policy.id.to_string(),
1858                            name: policy.name.clone(),
1859                        }),
1860                    )?;
1861                    info!("drop network policy {}", policy.name.clone());
1862                }
1863
1864                // Drop any replicas.
1865                let replicas = delta.replicas.keys().copied().collect();
1866                tx.remove_cluster_replicas(&replicas)?;
1867
1868                for (replica_id, (cluster_id, reason)) in delta.replicas {
1869                    let cluster = state.get_cluster(cluster_id);
1870                    let replica = cluster.replica(replica_id).expect("Must exist");
1871
1872                    let (reason, scheduling_policies) = reason.into_audit_log();
1873                    let details =
1874                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1875                            cluster_id: cluster_id.to_string(),
1876                            cluster_name: cluster.name.clone(),
1877                            replica_id: Some(replica_id.to_string()),
1878                            replica_name: replica.name.clone(),
1879                            reason,
1880                            scheduling_policies,
1881                        });
1882                    CatalogState::add_to_audit_log(
1883                        &state.system_configuration,
1884                        oracle_write_ts,
1885                        session,
1886                        tx,
1887                        audit_events,
1888                        EventType::Drop,
1889                        ObjectType::ClusterReplica,
1890                        details,
1891                    )?;
1892                }
1893
1894                // Drop any clusters.
1895                tx.remove_clusters(&delta.clusters)?;
1896
1897                for cluster_id in delta.clusters {
1898                    let cluster = state.get_cluster(cluster_id);
1899
1900                    CatalogState::add_to_audit_log(
1901                        &state.system_configuration,
1902                        oracle_write_ts,
1903                        session,
1904                        tx,
1905                        audit_events,
1906                        EventType::Drop,
1907                        ObjectType::Cluster,
1908                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1909                            id: cluster.id.to_string(),
1910                            name: cluster.name.clone(),
1911                        }),
1912                    )?;
1913                }
1914            }
1915            Op::GrantRole {
1916                role_id,
1917                member_id,
1918                grantor_id,
1919            } => {
1920                state.ensure_not_reserved_role(&member_id)?;
1921                state.ensure_grantable_role(&role_id)?;
1922                if state.collect_role_membership(&role_id).contains(&member_id) {
1923                    let group_role = state.get_role(&role_id);
1924                    let member_role = state.get_role(&member_id);
1925                    return Err(AdapterError::Catalog(Error::new(
1926                        ErrorKind::CircularRoleMembership {
1927                            role_name: group_role.name().to_string(),
1928                            member_name: member_role.name().to_string(),
1929                        },
1930                    )));
1931                }
1932                let mut member_role = state.get_role(&member_id).clone();
1933                member_role.membership.map.insert(role_id, grantor_id);
1934                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1935
1936                CatalogState::add_to_audit_log(
1937                    &state.system_configuration,
1938                    oracle_write_ts,
1939                    session,
1940                    tx,
1941                    audit_events,
1942                    EventType::Grant,
1943                    ObjectType::Role,
1944                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1945                        role_id: role_id.to_string(),
1946                        member_id: member_id.to_string(),
1947                        grantor_id: grantor_id.to_string(),
1948                        executed_by: session
1949                            .map(|session| session.authenticated_role_id())
1950                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1951                            .to_string(),
1952                    }),
1953                )?;
1954            }
1955            Op::RevokeRole {
1956                role_id,
1957                member_id,
1958                grantor_id,
1959            } => {
1960                state.ensure_not_reserved_role(&member_id)?;
1961                state.ensure_grantable_role(&role_id)?;
1962                let mut member_role = state.get_role(&member_id).clone();
1963                member_role.membership.map.remove(&role_id);
1964                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1965
1966                CatalogState::add_to_audit_log(
1967                    &state.system_configuration,
1968                    oracle_write_ts,
1969                    session,
1970                    tx,
1971                    audit_events,
1972                    EventType::Revoke,
1973                    ObjectType::Role,
1974                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1975                        role_id: role_id.to_string(),
1976                        member_id: member_id.to_string(),
1977                        grantor_id: grantor_id.to_string(),
1978                        executed_by: session
1979                            .map(|session| session.authenticated_role_id())
1980                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1981                            .to_string(),
1982                    }),
1983                )?;
1984            }
1985            Op::UpdatePrivilege {
1986                target_id,
1987                privilege,
1988                variant,
1989            } => {
1990                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1991                    UpdatePrivilegeVariant::Grant => {
1992                        privileges.grant(privilege);
1993                    }
1994                    UpdatePrivilegeVariant::Revoke => {
1995                        privileges.revoke(&privilege);
1996                    }
1997                };
1998                match &target_id {
1999                    SystemObjectId::Object(object_id) => match object_id {
2000                        ObjectId::Cluster(id) => {
2001                            let mut cluster = state.get_cluster(*id).clone();
2002                            update_privilege_fn(&mut cluster.privileges);
2003                            tx.update_cluster(*id, cluster.into())?;
2004                        }
2005                        ObjectId::Database(id) => {
2006                            let mut database = state.get_database(id).clone();
2007                            update_privilege_fn(&mut database.privileges);
2008                            tx.update_database(*id, database.into())?;
2009                        }
2010                        ObjectId::NetworkPolicy(id) => {
2011                            let mut policy = state.get_network_policy(id).clone();
2012                            update_privilege_fn(&mut policy.privileges);
2013                            tx.update_network_policy(*id, policy.into())?;
2014                        }
2015                        ObjectId::Schema((database_spec, schema_spec)) => {
2016                            let schema_id = schema_spec.clone().into();
2017                            let mut schema = state
2018                                .get_schema(
2019                                    database_spec,
2020                                    schema_spec,
2021                                    session
2022                                        .map(|session| session.conn_id())
2023                                        .unwrap_or(&SYSTEM_CONN_ID),
2024                                )
2025                                .clone();
2026                            update_privilege_fn(&mut schema.privileges);
2027                            tx.update_schema(schema_id, schema.into())?;
2028                        }
2029                        ObjectId::Item(id) => {
2030                            let entry = state.get_entry(id);
2031                            let mut new_entry = entry.clone();
2032                            update_privilege_fn(&mut new_entry.privileges);
2033                            if !new_entry.item().is_temporary() {
2034                                tx.update_item(*id, new_entry.into())?;
2035                            } else {
2036                                temporary_item_updates
2037                                    .push((entry.clone().into(), StateDiff::Retraction));
2038                                temporary_item_updates
2039                                    .push((new_entry.into(), StateDiff::Addition));
2040                            }
2041                        }
2042                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2043                    },
2044                    SystemObjectId::System => {
2045                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2046                        update_privilege_fn(&mut system_privileges);
2047                        let new_privilege =
2048                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2049                        tx.set_system_privilege(
2050                            privilege.grantee,
2051                            privilege.grantor,
2052                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2053                        )?;
2054                    }
2055                }
2056                let object_type = state.get_system_object_type(&target_id);
2057                let object_id_str = match &target_id {
2058                    SystemObjectId::System => "SYSTEM".to_string(),
2059                    SystemObjectId::Object(id) => id.to_string(),
2060                };
2061                CatalogState::add_to_audit_log(
2062                    &state.system_configuration,
2063                    oracle_write_ts,
2064                    session,
2065                    tx,
2066                    audit_events,
2067                    variant.into(),
2068                    system_object_type_to_audit_object_type(&object_type),
2069                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2070                        object_id: object_id_str,
2071                        grantee_id: privilege.grantee.to_string(),
2072                        grantor_id: privilege.grantor.to_string(),
2073                        privileges: privilege.acl_mode.to_string(),
2074                    }),
2075                )?;
2076            }
2077            Op::UpdateDefaultPrivilege {
2078                privilege_object,
2079                privilege_acl_item,
2080                variant,
2081            } => {
2082                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2083                match variant {
2084                    UpdatePrivilegeVariant::Grant => default_privileges
2085                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2086                    UpdatePrivilegeVariant::Revoke => {
2087                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2088                    }
2089                }
2090                let new_acl_mode = default_privileges
2091                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2092                tx.set_default_privilege(
2093                    privilege_object.role_id,
2094                    privilege_object.database_id,
2095                    privilege_object.schema_id,
2096                    privilege_object.object_type,
2097                    privilege_acl_item.grantee,
2098                    new_acl_mode.cloned(),
2099                )?;
2100                CatalogState::add_to_audit_log(
2101                    &state.system_configuration,
2102                    oracle_write_ts,
2103                    session,
2104                    tx,
2105                    audit_events,
2106                    variant.into(),
2107                    object_type_to_audit_object_type(privilege_object.object_type),
2108                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2109                        role_id: privilege_object.role_id.to_string(),
2110                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2111                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2112                        grantee_id: privilege_acl_item.grantee.to_string(),
2113                        privileges: privilege_acl_item.acl_mode.to_string(),
2114                    }),
2115                )?;
2116            }
2117            Op::RenameCluster {
2118                id,
2119                name,
2120                to_name,
2121                check_reserved_names,
2122            } => {
2123                if id.is_system() {
2124                    return Err(AdapterError::Catalog(Error::new(
2125                        ErrorKind::ReadOnlyCluster(name.clone()),
2126                    )));
2127                }
2128                if check_reserved_names && is_reserved_name(&to_name) {
2129                    return Err(AdapterError::Catalog(Error::new(
2130                        ErrorKind::ReservedClusterName(to_name),
2131                    )));
2132                }
2133                tx.rename_cluster(id, &name, &to_name)?;
2134                CatalogState::add_to_audit_log(
2135                    &state.system_configuration,
2136                    oracle_write_ts,
2137                    session,
2138                    tx,
2139                    audit_events,
2140                    EventType::Alter,
2141                    ObjectType::Cluster,
2142                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2143                        id: id.to_string(),
2144                        old_name: name.clone(),
2145                        new_name: to_name.clone(),
2146                    }),
2147                )?;
2148                info!("rename cluster {name} to {to_name}");
2149            }
2150            Op::RenameClusterReplica {
2151                cluster_id,
2152                replica_id,
2153                name,
2154                to_name,
2155            } => {
2156                if is_reserved_name(&to_name) {
2157                    return Err(AdapterError::Catalog(Error::new(
2158                        ErrorKind::ReservedReplicaName(to_name),
2159                    )));
2160                }
2161                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2162                CatalogState::add_to_audit_log(
2163                    &state.system_configuration,
2164                    oracle_write_ts,
2165                    session,
2166                    tx,
2167                    audit_events,
2168                    EventType::Alter,
2169                    ObjectType::ClusterReplica,
2170                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2171                        cluster_id: cluster_id.to_string(),
2172                        replica_id: replica_id.to_string(),
2173                        old_name: name.replica.as_str().to_string(),
2174                        new_name: to_name.clone(),
2175                    }),
2176                )?;
2177                info!("rename cluster replica {name} to {to_name}");
2178            }
2179            Op::RenameItem {
2180                id,
2181                to_name,
2182                current_full_name,
2183            } => {
2184                let mut updates = Vec::new();
2185
2186                let entry = state.get_entry(&id);
2187                if let CatalogItem::Type(_) = entry.item() {
2188                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2189                        current_full_name.to_string(),
2190                    ))));
2191                }
2192
2193                if entry.id().is_system() {
2194                    let name = state
2195                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2196                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2197                        name.to_string(),
2198                    ))));
2199                }
2200
2201                let mut to_full_name = current_full_name.clone();
2202                to_full_name.item.clone_from(&to_name);
2203
2204                let mut to_qualified_name = entry.name().clone();
2205                to_qualified_name.item.clone_from(&to_name);
2206
2207                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2208                    id: id.to_string(),
2209                    old_name: Self::full_name_detail(&current_full_name),
2210                    new_name: Self::full_name_detail(&to_full_name),
2211                });
2212                if Self::should_audit_log_item(entry.item()) {
2213                    CatalogState::add_to_audit_log(
2214                        &state.system_configuration,
2215                        oracle_write_ts,
2216                        session,
2217                        tx,
2218                        audit_events,
2219                        EventType::Alter,
2220                        catalog_type_to_audit_object_type(entry.item().typ()),
2221                        details,
2222                    )?;
2223                }
2224
2225                // Rename item itself.
2226                let mut new_entry = entry.clone();
2227                new_entry.name.item.clone_from(&to_name);
2228                new_entry.item = entry
2229                    .item()
2230                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2231                    .map_err(|e| {
2232                        Error::new(ErrorKind::from(AmbiguousRename {
2233                            depender: state
2234                                .resolve_full_name(entry.name(), entry.conn_id())
2235                                .to_string(),
2236                            dependee: state
2237                                .resolve_full_name(entry.name(), entry.conn_id())
2238                                .to_string(),
2239                            message: e,
2240                        }))
2241                    })?;
2242
2243                for id in entry.referenced_by() {
2244                    let dependent_item = state.get_entry(id);
2245                    let mut to_entry = dependent_item.clone();
2246                    to_entry.item = dependent_item
2247                        .item()
2248                        .rename_item_refs(
2249                            current_full_name.clone(),
2250                            to_full_name.item.clone(),
2251                            false,
2252                        )
2253                        .map_err(|e| {
2254                            Error::new(ErrorKind::from(AmbiguousRename {
2255                                depender: state
2256                                    .resolve_full_name(
2257                                        dependent_item.name(),
2258                                        dependent_item.conn_id(),
2259                                    )
2260                                    .to_string(),
2261                                dependee: state
2262                                    .resolve_full_name(entry.name(), entry.conn_id())
2263                                    .to_string(),
2264                                message: e,
2265                            }))
2266                        })?;
2267
2268                    if !to_entry.item().is_temporary() {
2269                        tx.update_item(*id, to_entry.into())?;
2270                    } else {
2271                        temporary_item_updates
2272                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2273                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2274                    }
2275                    updates.push(*id);
2276                }
2277                if !new_entry.item().is_temporary() {
2278                    tx.update_item(id, new_entry.into())?;
2279                } else {
2280                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2281                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2282                }
2283
2284                updates.push(id);
2285                for id in updates {
2286                    Self::log_update(state, &id);
2287                }
2288            }
2289            Op::RenameSchema {
2290                database_spec,
2291                schema_spec,
2292                new_name,
2293                check_reserved_names,
2294            } => {
2295                if check_reserved_names && is_reserved_name(&new_name) {
2296                    return Err(AdapterError::Catalog(Error::new(
2297                        ErrorKind::ReservedSchemaName(new_name),
2298                    )));
2299                }
2300
2301                let conn_id = session
2302                    .map(|session| session.conn_id())
2303                    .unwrap_or(&SYSTEM_CONN_ID);
2304
2305                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2306                let cur_name = schema.name().schema.clone();
2307
2308                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2309                    return Err(AdapterError::Catalog(Error::new(
2310                        ErrorKind::AmbientSchemaRename(cur_name),
2311                    )));
2312                };
2313                let database = state.get_database(&database_id);
2314                let database_name = &database.name;
2315
2316                let mut updates = Vec::new();
2317                let mut items_to_update = BTreeMap::new();
2318
2319                let mut update_item = |id| {
2320                    if items_to_update.contains_key(id) {
2321                        return Ok(());
2322                    }
2323
2324                    let entry = state.get_entry(id);
2325
2326                    // Update our item.
2327                    let mut new_entry = entry.clone();
2328                    new_entry.item = entry
2329                        .item
2330                        .rename_schema_refs(database_name, &cur_name, &new_name)
2331                        .map_err(|(s, _i)| {
2332                            Error::new(ErrorKind::from(AmbiguousRename {
2333                                depender: state
2334                                    .resolve_full_name(entry.name(), entry.conn_id())
2335                                    .to_string(),
2336                                dependee: format!("{database_name}.{cur_name}"),
2337                                message: format!("ambiguous reference to schema named {s}"),
2338                            }))
2339                        })?;
2340
2341                    // Queue updates for Catalog storage and Builtin Tables.
2342                    if !new_entry.item().is_temporary() {
2343                        items_to_update.insert(*id, new_entry.into());
2344                    } else {
2345                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2346                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2347                    }
2348                    updates.push(id);
2349
2350                    Ok::<_, AdapterError>(())
2351                };
2352
2353                // Update all of the items in the schema.
2354                for (_name, item_id) in &schema.items {
2355                    // Update the item itself.
2356                    update_item(item_id)?;
2357
2358                    // Update everything that depends on this item.
2359                    for id in state.get_entry(item_id).referenced_by() {
2360                        update_item(id)?;
2361                    }
2362                }
2363                // Note: When updating the transaction it's very important that we update the
2364                // items as a whole group, otherwise we exhibit quadratic behavior.
2365                tx.update_items(items_to_update)?;
2366
2367                // Renaming temporary schemas is not supported.
2368                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2369                    let schema_name = schema.name().schema.clone();
2370                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2371                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2372                    )));
2373                };
2374
2375                // Add an entry to the audit log.
2376                let database_name = database_spec
2377                    .id()
2378                    .map(|id| state.get_database(&id).name.clone());
2379                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2380                    id: schema_id.to_string(),
2381                    old_name: schema.name().schema.clone(),
2382                    new_name: new_name.clone(),
2383                    database_name,
2384                });
2385                CatalogState::add_to_audit_log(
2386                    &state.system_configuration,
2387                    oracle_write_ts,
2388                    session,
2389                    tx,
2390                    audit_events,
2391                    EventType::Alter,
2392                    mz_audit_log::ObjectType::Schema,
2393                    details,
2394                )?;
2395
2396                // Update the schema itself.
2397                let mut new_schema = schema.clone();
2398                new_schema.name.schema.clone_from(&new_name);
2399                tx.update_schema(schema_id, new_schema.into())?;
2400
2401                for id in updates {
2402                    Self::log_update(state, id);
2403                }
2404            }
2405            Op::UpdateOwner { id, new_owner } => {
2406                let conn_id = session
2407                    .map(|session| session.conn_id())
2408                    .unwrap_or(&SYSTEM_CONN_ID);
2409                let old_owner = state
2410                    .get_owner_id(&id, conn_id)
2411                    .expect("cannot update the owner of an object without an owner");
2412                match &id {
2413                    ObjectId::Cluster(id) => {
2414                        let mut cluster = state.get_cluster(*id).clone();
2415                        if id.is_system() {
2416                            return Err(AdapterError::Catalog(Error::new(
2417                                ErrorKind::ReadOnlyCluster(cluster.name),
2418                            )));
2419                        }
2420                        Self::update_privilege_owners(
2421                            &mut cluster.privileges,
2422                            cluster.owner_id,
2423                            new_owner,
2424                        );
2425                        cluster.owner_id = new_owner;
2426                        tx.update_cluster(*id, cluster.into())?;
2427                    }
2428                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2429                        let cluster = state.get_cluster(*cluster_id);
2430                        let mut replica = cluster
2431                            .replica(*replica_id)
2432                            .expect("catalog out of sync")
2433                            .clone();
2434                        if replica_id.is_system() {
2435                            return Err(AdapterError::Catalog(Error::new(
2436                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2437                            )));
2438                        }
2439                        replica.owner_id = new_owner;
2440                        tx.update_cluster_replica(*replica_id, replica.into())?;
2441                    }
2442                    ObjectId::Database(id) => {
2443                        let mut database = state.get_database(id).clone();
2444                        if id.is_system() {
2445                            return Err(AdapterError::Catalog(Error::new(
2446                                ErrorKind::ReadOnlyDatabase(database.name),
2447                            )));
2448                        }
2449                        Self::update_privilege_owners(
2450                            &mut database.privileges,
2451                            database.owner_id,
2452                            new_owner,
2453                        );
2454                        database.owner_id = new_owner;
2455                        tx.update_database(*id, database.clone().into())?;
2456                    }
2457                    ObjectId::Schema((database_spec, schema_spec)) => {
2458                        let schema_id: SchemaId = schema_spec.clone().into();
2459                        let mut schema = state
2460                            .get_schema(database_spec, schema_spec, conn_id)
2461                            .clone();
2462                        if schema_id.is_system() {
2463                            let name = schema.name();
2464                            let full_name = state.resolve_full_schema_name(name);
2465                            return Err(AdapterError::Catalog(Error::new(
2466                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2467                            )));
2468                        }
2469                        Self::update_privilege_owners(
2470                            &mut schema.privileges,
2471                            schema.owner_id,
2472                            new_owner,
2473                        );
2474                        schema.owner_id = new_owner;
2475                        tx.update_schema(schema_id, schema.into())?;
2476                    }
2477                    ObjectId::Item(id) => {
2478                        let entry = state.get_entry(id);
2479                        let mut new_entry = entry.clone();
2480                        if id.is_system() {
2481                            let full_name = state.resolve_full_name(
2482                                new_entry.name(),
2483                                session.map(|session| session.conn_id()),
2484                            );
2485                            return Err(AdapterError::Catalog(Error::new(
2486                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2487                            )));
2488                        }
2489                        Self::update_privilege_owners(
2490                            &mut new_entry.privileges,
2491                            new_entry.owner_id,
2492                            new_owner,
2493                        );
2494                        new_entry.owner_id = new_owner;
2495                        if !new_entry.item().is_temporary() {
2496                            tx.update_item(*id, new_entry.into())?;
2497                        } else {
2498                            temporary_item_updates
2499                                .push((entry.clone().into(), StateDiff::Retraction));
2500                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2501                        }
2502                    }
2503                    ObjectId::NetworkPolicy(id) => {
2504                        let mut policy = state.get_network_policy(id).clone();
2505                        if id.is_system() {
2506                            return Err(AdapterError::Catalog(Error::new(
2507                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2508                            )));
2509                        }
2510                        Self::update_privilege_owners(
2511                            &mut policy.privileges,
2512                            policy.owner_id,
2513                            new_owner,
2514                        );
2515                        policy.owner_id = new_owner;
2516                        tx.update_network_policy(*id, policy.into())?;
2517                    }
2518                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2519                }
2520                let object_type = state.get_object_type(&id);
2521                CatalogState::add_to_audit_log(
2522                    &state.system_configuration,
2523                    oracle_write_ts,
2524                    session,
2525                    tx,
2526                    audit_events,
2527                    EventType::Alter,
2528                    object_type_to_audit_object_type(object_type),
2529                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2530                        object_id: id.to_string(),
2531                        old_owner_id: old_owner.to_string(),
2532                        new_owner_id: new_owner.to_string(),
2533                    }),
2534                )?;
2535            }
2536            Op::UpdateClusterConfig { id, name, config } => {
2537                let mut cluster = state.get_cluster(id).clone();
2538                cluster.config = config;
2539                tx.update_cluster(id, cluster.into())?;
2540                info!("update cluster {}", name);
2541
2542                CatalogState::add_to_audit_log(
2543                    &state.system_configuration,
2544                    oracle_write_ts,
2545                    session,
2546                    tx,
2547                    audit_events,
2548                    EventType::Alter,
2549                    ObjectType::Cluster,
2550                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2551                        id: id.to_string(),
2552                        name,
2553                    }),
2554                )?;
2555            }
2556            Op::UpdateClusterReplicaConfig {
2557                replica_id,
2558                cluster_id,
2559                config,
2560            } => {
2561                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2562                info!("update replica {}", replica.name);
2563                tx.update_cluster_replica(
2564                    replica_id,
2565                    mz_catalog::durable::ClusterReplica {
2566                        cluster_id,
2567                        replica_id,
2568                        name: replica.name.clone(),
2569                        config: config.clone().into(),
2570                        owner_id: replica.owner_id,
2571                    },
2572                )?;
2573            }
2574            Op::UpdateItem { id, name, to_item } => {
2575                let mut entry = state.get_entry(&id).clone();
2576                entry.name = name.clone();
2577                entry.item = to_item.clone();
2578                tx.update_item(id, entry.into())?;
2579
2580                if Self::should_audit_log_item(&to_item) {
2581                    let mut full_name = Self::full_name_detail(
2582                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2583                    );
2584                    full_name.item = name.item;
2585
2586                    CatalogState::add_to_audit_log(
2587                        &state.system_configuration,
2588                        oracle_write_ts,
2589                        session,
2590                        tx,
2591                        audit_events,
2592                        EventType::Alter,
2593                        catalog_type_to_audit_object_type(to_item.typ()),
2594                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2595                            id: id.to_string(),
2596                            name: full_name,
2597                        }),
2598                    )?;
2599                }
2600
2601                Self::log_update(state, &id);
2602            }
2603            Op::UpdateSystemConfiguration { name, value } => {
2604                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2605                tx.upsert_system_config(&name, parsed_value.clone())?;
2606                // This mirrors some "system vars" into the catalog storage
2607                // "config" collection so that we can toggle the flag with
2608                // Launch Darkly, but use it in boot before Launch Darkly is
2609                // available.
2610                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2611                    let with_0dt_deployment_max_wait =
2612                        Duration::parse(VarInput::Flat(&parsed_value))
2613                            .expect("parsing succeeded above");
2614                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2615                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2616                    let with_0dt_deployment_ddl_check_interval =
2617                        Duration::parse(VarInput::Flat(&parsed_value))
2618                            .expect("parsing succeeded above");
2619                    tx.set_0dt_deployment_ddl_check_interval(
2620                        with_0dt_deployment_ddl_check_interval,
2621                    )?;
2622                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2623                    let panic_after_timeout =
2624                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2625                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2626                }
2627
2628                CatalogState::add_to_audit_log(
2629                    &state.system_configuration,
2630                    oracle_write_ts,
2631                    session,
2632                    tx,
2633                    audit_events,
2634                    EventType::Alter,
2635                    ObjectType::System,
2636                    EventDetails::SetV1(mz_audit_log::SetV1 {
2637                        name,
2638                        value: Some(value.borrow().to_vec().join(", ")),
2639                    }),
2640                )?;
2641            }
2642            Op::ResetSystemConfiguration { name } => {
2643                tx.remove_system_config(&name);
2644                // This mirrors some "system vars" into the catalog storage
2645                // "config" collection so that we can toggle the flag with
2646                // Launch Darkly, but use it in boot before Launch Darkly is
2647                // available.
2648                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2649                    tx.reset_0dt_deployment_max_wait()?;
2650                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2651                    tx.reset_0dt_deployment_ddl_check_interval()?;
2652                }
2653
2654                CatalogState::add_to_audit_log(
2655                    &state.system_configuration,
2656                    oracle_write_ts,
2657                    session,
2658                    tx,
2659                    audit_events,
2660                    EventType::Alter,
2661                    ObjectType::System,
2662                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2663                )?;
2664            }
2665            Op::ResetAllSystemConfiguration => {
2666                tx.clear_system_configs();
2667                tx.reset_0dt_deployment_max_wait()?;
2668                tx.reset_0dt_deployment_ddl_check_interval()?;
2669
2670                CatalogState::add_to_audit_log(
2671                    &state.system_configuration,
2672                    oracle_write_ts,
2673                    session,
2674                    tx,
2675                    audit_events,
2676                    EventType::Alter,
2677                    ObjectType::System,
2678                    EventDetails::ResetAllV1,
2679                )?;
2680            }
2681            Op::WeirdStorageUsageUpdates {
2682                object_id,
2683                size_bytes,
2684                collection_timestamp,
2685            } => {
2686                let id = tx.allocate_storage_usage_ids()?;
2687                let metric =
2688                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2689                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2690                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2691                weird_builtin_table_update = Some(builtin_table_update);
2692            }
2693            Op::InjectAuditEvents { events } => {
2694                for event in events {
2695                    let id = tx.allocate_audit_log_id()?;
2696                    let ev = VersionedEvent::new(
2697                        id,
2698                        event.event_type,
2699                        event.object_type,
2700                        event.details,
2701                        event.user,
2702                        oracle_write_ts.into(),
2703                    );
2704                    audit_events.push(ev.clone());
2705                    tx.insert_audit_log_event(ev);
2706                }
2707            }
2708        };
2709        Ok((weird_builtin_table_update, temporary_item_updates))
2710    }
2711
2712    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2713        let entry = state.get_entry(id);
2714        info!(
2715            "update {} {} ({})",
2716            entry.item_type(),
2717            state.resolve_full_name(entry.name(), entry.conn_id()),
2718            id
2719        );
2720    }
2721
2722    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2723    /// implementation:
2724    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2725    fn update_privilege_owners(
2726        privileges: &mut PrivilegeMap,
2727        old_owner: RoleId,
2728        new_owner: RoleId,
2729    ) {
2730        // TODO(jkosh44) Would be nice not to clone every privilege.
2731        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2732
2733        let mut new_present = false;
2734        for privilege in flat_privileges.iter_mut() {
2735            // Old owner's granted privilege are updated to be granted by the new
2736            // owner.
2737            if privilege.grantor == old_owner {
2738                privilege.grantor = new_owner;
2739            } else if privilege.grantor == new_owner {
2740                new_present = true;
2741            }
2742            // Old owner's privileges is given to the new owner.
2743            if privilege.grantee == old_owner {
2744                privilege.grantee = new_owner;
2745            } else if privilege.grantee == new_owner {
2746                new_present = true;
2747            }
2748        }
2749
2750        // If the old privilege list contained references to the new owner, we may
2751        // have created duplicate entries. Here we try and consolidate them. This
2752        // is inspired by PostgreSQL's algorithm but not identical.
2753        if new_present {
2754            // Group privileges by (grantee, grantor).
2755            let privilege_map: BTreeMap<_, Vec<_>> =
2756                flat_privileges
2757                    .into_iter()
2758                    .fold(BTreeMap::new(), |mut accum, privilege| {
2759                        accum
2760                            .entry((privilege.grantee, privilege.grantor))
2761                            .or_default()
2762                            .push(privilege);
2763                        accum
2764                    });
2765
2766            // Consolidate and update all privileges.
2767            flat_privileges = privilege_map
2768                .into_iter()
2769                .map(|((grantee, grantor), values)|
2770                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2771                    values.into_iter().fold(
2772                        MzAclItem::empty(grantee, grantor),
2773                        |mut accum, mz_aclitem| {
2774                            accum.acl_mode =
2775                                accum.acl_mode.union(mz_aclitem.acl_mode);
2776                            accum
2777                        },
2778                    ))
2779                .collect();
2780        }
2781
2782        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2783    }
2784}
2785
2786/// Prepare the given transaction for replacing a catalog item with a new version.
2787///
2788/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2789/// dependent objects to refer to that new ID (at a previous version).
2790///
2791/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2792/// for catalog items. We currently think that there are no use cases that require this assumption,
2793/// but no way to know for sure.
2794fn tx_replace_item(
2795    tx: &mut Transaction<'_>,
2796    state: &CatalogState,
2797    id: CatalogItemId,
2798    new_entry: CatalogEntry,
2799) -> Result<(), AdapterError> {
2800    let new_id = new_entry.id;
2801
2802    // Rewrite dependent objects to point to the new ID.
2803    for use_id in new_entry.referenced_by() {
2804        // The dependent might be dropped in the same tx, so check.
2805        if tx.get_item(use_id).is_none() {
2806            continue;
2807        }
2808
2809        let mut dependent = state.get_entry(use_id).clone();
2810        dependent.item = dependent.item.replace_item_refs(id, new_id);
2811        tx.update_item(*use_id, dependent.into())?;
2812    }
2813
2814    // Move comments to the new ID.
2815    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2816    let new_comment_id = new_entry.comment_object_id();
2817    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2818        tx.drop_comments(&[old_comment_id].into())?;
2819        for (sub, comment) in comments {
2820            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2821        }
2822    }
2823
2824    let mz_catalog::durable::Item {
2825        id: _,
2826        oid,
2827        global_id,
2828        schema_id,
2829        name,
2830        create_sql,
2831        owner_id,
2832        privileges,
2833        extra_versions,
2834    } = new_entry.into();
2835
2836    tx.remove_item(id)?;
2837    tx.insert_item(
2838        new_id,
2839        oid,
2840        global_id,
2841        schema_id,
2842        &name,
2843        create_sql,
2844        owner_id,
2845        privileges,
2846        extra_versions,
2847    )?;
2848
2849    Ok(())
2850}
2851
2852/// Generate audit events for a replacement apply operation.
2853fn apply_replacement_audit_events(
2854    state: &CatalogState,
2855    target: &CatalogEntry,
2856    replacement: &CatalogEntry,
2857) -> Vec<(EventType, EventDetails)> {
2858    let mut events = Vec::new();
2859
2860    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2861    let target_id_name = IdFullNameV1 {
2862        id: target.id().to_string(),
2863        name: Catalog::full_name_detail(&target_name),
2864    };
2865    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2866    let replacement_id_name = IdFullNameV1 {
2867        id: replacement.id().to_string(),
2868        name: Catalog::full_name_detail(&replacement_name),
2869    };
2870
2871    if Catalog::should_audit_log_item(&replacement.item) {
2872        events.push((
2873            EventType::Drop,
2874            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2875        ));
2876    }
2877
2878    if Catalog::should_audit_log_item(&target.item) {
2879        events.push((
2880            EventType::Alter,
2881            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2882                target: target_id_name.clone(),
2883                replacement: replacement_id_name,
2884            }),
2885        ));
2886
2887        if let Some(old_cluster_id) = target.cluster_id()
2888            && let Some(new_cluster_id) = replacement.cluster_id()
2889            && old_cluster_id != new_cluster_id
2890        {
2891            // When the replacement is applied, the target takes on the ID of the replacement, so
2892            // we should use that ID for subsequent events.
2893            events.push((
2894                EventType::Alter,
2895                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2896                    id: replacement.id().to_string(),
2897                    name: target_id_name.name,
2898                    old_cluster_id: old_cluster_id.to_string(),
2899                    new_cluster_id: new_cluster_id.to_string(),
2900                }),
2901            ));
2902        }
2903    }
2904
2905    events
2906}
2907
2908/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2909///
2910/// Note: Previously we used to omit a single `Op::DropObject` for every object
2911/// we needed to drop. But removing a batch of objects from a durable Catalog
2912/// Transaction is O(n) where `n` is the number of objects that exist in the
2913/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2914/// `DROP ... CASCADE` statement.
2915#[derive(Debug, Default)]
2916pub(crate) struct ObjectsToDrop {
2917    pub comments: BTreeSet<CommentObjectId>,
2918    pub databases: BTreeSet<DatabaseId>,
2919    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2920    pub clusters: BTreeSet<ClusterId>,
2921    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2922    pub roles: BTreeSet<RoleId>,
2923    pub items: Vec<CatalogItemId>,
2924    pub network_policies: BTreeSet<NetworkPolicyId>,
2925}
2926
2927impl ObjectsToDrop {
2928    pub fn generate(
2929        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2930        state: &CatalogState,
2931        session: Option<&ConnMeta>,
2932    ) -> Result<Self, AdapterError> {
2933        let mut delta = ObjectsToDrop::default();
2934
2935        for drop_object_info in drop_object_infos {
2936            delta.add_item(drop_object_info, state, session)?;
2937        }
2938
2939        Ok(delta)
2940    }
2941
2942    fn add_item(
2943        &mut self,
2944        drop_object_info: DropObjectInfo,
2945        state: &CatalogState,
2946        session: Option<&ConnMeta>,
2947    ) -> Result<(), AdapterError> {
2948        self.comments
2949            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2950
2951        match drop_object_info {
2952            DropObjectInfo::Database(database_id) => {
2953                let database = &state.database_by_id[&database_id];
2954                if database_id.is_system() {
2955                    return Err(AdapterError::Catalog(Error::new(
2956                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2957                    )));
2958                }
2959
2960                self.databases.insert(database_id);
2961            }
2962            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2963                let schema = state.get_schema(
2964                    &database_spec,
2965                    &schema_spec,
2966                    session
2967                        .map(|session| session.conn_id())
2968                        .unwrap_or(&SYSTEM_CONN_ID),
2969                );
2970                let schema_id: SchemaId = schema_spec.into();
2971                if schema_id.is_system() {
2972                    let name = schema.name();
2973                    let full_name = state.resolve_full_schema_name(name);
2974                    return Err(AdapterError::Catalog(Error::new(
2975                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2976                    )));
2977                }
2978
2979                self.schemas.insert(schema_spec, database_spec);
2980            }
2981            DropObjectInfo::Role(role_id) => {
2982                let name = state.get_role(&role_id).name().to_string();
2983                if role_id.is_system() || role_id.is_predefined() {
2984                    return Err(AdapterError::Catalog(Error::new(
2985                        ErrorKind::ReservedRoleName(name.clone()),
2986                    )));
2987                }
2988                state.ensure_not_reserved_role(&role_id)?;
2989
2990                self.roles.insert(role_id);
2991            }
2992            DropObjectInfo::Cluster(cluster_id) => {
2993                let cluster = state.get_cluster(cluster_id);
2994                let name = &cluster.name;
2995                if cluster_id.is_system() {
2996                    return Err(AdapterError::Catalog(Error::new(
2997                        ErrorKind::ReadOnlyCluster(name.clone()),
2998                    )));
2999                }
3000
3001                self.clusters.insert(cluster_id);
3002            }
3003            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3004                let cluster = state.get_cluster(cluster_id);
3005                let replica = cluster.replica(replica_id).expect("Must exist");
3006
3007                self.replicas
3008                    .insert(replica.replica_id, (cluster.id, reason));
3009
3010                // Implicitly drop materialized views that target this replica.
3011                // When the target replica is gone, no replica advances the
3012                // persist shard's upper frontier, causing reads to hang.
3013                for (_id, entry) in state.get_entries() {
3014                    if let CatalogItem::MaterializedView(mv) = entry.item() {
3015                        if mv.target_replica == Some(replica_id)
3016                            && !self.items.contains(&entry.id())
3017                        {
3018                            tracing::warn!(
3019                                "implicitly dropping materialized view {} because target replica was dropped",
3020                                entry.name().item,
3021                            );
3022                            self.items.push(entry.id());
3023                        }
3024                    }
3025                }
3026            }
3027            DropObjectInfo::Item(item_id) => {
3028                let entry = state.get_entry(&item_id);
3029                if item_id.is_system() {
3030                    let name = entry.name();
3031                    let full_name =
3032                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
3033                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3034                        full_name.to_string(),
3035                    ))));
3036                }
3037
3038                self.items.push(item_id);
3039            }
3040            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3041                let policy = state.get_network_policy(&network_policy_id);
3042                let name = &policy.name;
3043                if network_policy_id.is_system() {
3044                    return Err(AdapterError::Catalog(Error::new(
3045                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3046                    )));
3047                }
3048
3049                self.network_policies.insert(network_policy_id);
3050            }
3051        }
3052
3053        Ok(())
3054    }
3055}
3056
3057#[cfg(test)]
3058mod tests {
3059    use mz_catalog::SYSTEM_CONN_ID;
3060    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3061    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3062    use mz_repr::role_id::RoleId;
3063    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3064    use mz_sql::DEFAULT_SCHEMA;
3065    use mz_sql::catalog::CatalogDatabase;
3066    use mz_sql::names::{
3067        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3068    };
3069    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3070
3071    use crate::catalog::{Catalog, Op};
3072    use crate::session::DEFAULT_DATABASE_NAME;
3073
3074    #[mz_ore::test]
3075    fn test_update_privilege_owners() {
3076        let old_owner = RoleId::User(1);
3077        let new_owner = RoleId::User(2);
3078        let other_role = RoleId::User(3);
3079
3080        // older owner exists as grantor.
3081        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3082            MzAclItem {
3083                grantee: other_role,
3084                grantor: old_owner,
3085                acl_mode: AclMode::UPDATE,
3086            },
3087            MzAclItem {
3088                grantee: other_role,
3089                grantor: new_owner,
3090                acl_mode: AclMode::SELECT,
3091            },
3092        ]);
3093        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3094        assert_eq!(1, privileges.all_values().count());
3095        assert_eq!(
3096            vec![MzAclItem {
3097                grantee: other_role,
3098                grantor: new_owner,
3099                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3100            }],
3101            privileges.all_values_owned().collect::<Vec<_>>()
3102        );
3103
3104        // older owner exists as grantee.
3105        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3106            MzAclItem {
3107                grantee: old_owner,
3108                grantor: other_role,
3109                acl_mode: AclMode::UPDATE,
3110            },
3111            MzAclItem {
3112                grantee: new_owner,
3113                grantor: other_role,
3114                acl_mode: AclMode::SELECT,
3115            },
3116        ]);
3117        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3118        assert_eq!(1, privileges.all_values().count());
3119        assert_eq!(
3120            vec![MzAclItem {
3121                grantee: new_owner,
3122                grantor: other_role,
3123                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3124            }],
3125            privileges.all_values_owned().collect::<Vec<_>>()
3126        );
3127
3128        // older owner exists as grantee and grantor.
3129        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3130            MzAclItem {
3131                grantee: old_owner,
3132                grantor: old_owner,
3133                acl_mode: AclMode::UPDATE,
3134            },
3135            MzAclItem {
3136                grantee: new_owner,
3137                grantor: new_owner,
3138                acl_mode: AclMode::SELECT,
3139            },
3140        ]);
3141        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3142        assert_eq!(1, privileges.all_values().count());
3143        assert_eq!(
3144            vec![MzAclItem {
3145                grantee: new_owner,
3146                grantor: new_owner,
3147                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3148            }],
3149            privileges.all_values_owned().collect::<Vec<_>>()
3150        );
3151    }
3152
3153    /// Verifies that `transact_incremental_dry_run` processes only new ops
3154    /// against the accumulated state, not all ops from scratch. Two paths are
3155    /// compared:
3156    ///   - Incremental: two separate calls, each with one op
3157    ///   - All-at-once: one call with both ops
3158    /// Both must produce equivalent catalog state.
3159    #[mz_ore::test(tokio::test)]
3160    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3161    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3162        Catalog::with_debug(|catalog| async move {
3163            // Resolve default database and schema.
3164            let database = catalog
3165                .resolve_database(DEFAULT_DATABASE_NAME)
3166                .expect("default database");
3167            let database_name = database.name.clone();
3168            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3169            let schema = catalog
3170                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3171                .expect("default schema");
3172            let schema_name = schema.name.schema.clone();
3173            let schema_spec = schema.id.clone();
3174
3175            // Allocate IDs for two tables.
3176            let (id_t1, global_id_t1) = catalog
3177                .allocate_user_id_for_test()
3178                .await
3179                .expect("allocate id for t1");
3180            let (id_t2, global_id_t2) = catalog
3181                .allocate_user_id_for_test()
3182                .await
3183                .expect("allocate id for t2");
3184
3185            let oracle_write_ts = catalog.current_upper().await;
3186
3187            // Build two CreateItem ops.
3188            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3189                id,
3190                name: QualifiedItemName {
3191                    qualifiers: ItemQualifiers {
3192                        database_spec: database_spec.clone(),
3193                        schema_spec: schema_spec.clone(),
3194                    },
3195                    item: name.to_string(),
3196                },
3197                item: CatalogItem::Table(Table {
3198                    create_sql: Some(format!(
3199                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3200                    )),
3201                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3202                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3203                    conn_id: None,
3204                    resolved_ids: ResolvedIds::empty(),
3205                    custom_logical_compaction_window: None,
3206                    is_retained_metrics_object: false,
3207                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3208                }),
3209                owner_id: MZ_SYSTEM_ROLE_ID,
3210            };
3211
3212            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3213            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3214
3215            let base_state = catalog.state().clone();
3216
3217            // --- Path A: Incremental (two separate dry-run calls) ---
3218
3219            // First call: only op_t1, no previous snapshot.
3220            let (state_after_t1, snapshot_after_t1) = catalog
3221                .transact_incremental_dry_run(
3222                    &base_state,
3223                    vec![op_t1.clone()],
3224                    None,
3225                    None,
3226                    oracle_write_ts,
3227                )
3228                .await
3229                .expect("first dry run");
3230
3231            // After first dry run: t1 exists, t2 does not.
3232            assert!(
3233                state_after_t1.try_get_entry(&id_t1).is_some(),
3234                "t1 should exist after first dry run"
3235            );
3236            assert_eq!(
3237                state_after_t1
3238                    .try_get_entry(&id_t1)
3239                    .expect("t1 entry")
3240                    .name()
3241                    .item,
3242                "t1"
3243            );
3244            assert!(
3245                state_after_t1.try_get_entry(&id_t2).is_none(),
3246                "t2 should NOT exist after first dry run"
3247            );
3248
3249            // Second call: only op_t2, using state/snapshot from first call.
3250            let (state_incremental, _) = catalog
3251                .transact_incremental_dry_run(
3252                    &state_after_t1,
3253                    vec![op_t2.clone()],
3254                    None,
3255                    Some(snapshot_after_t1),
3256                    oracle_write_ts,
3257                )
3258                .await
3259                .expect("second dry run");
3260
3261            // After second dry run: both t1 and t2 exist.
3262            assert!(
3263                state_incremental.try_get_entry(&id_t1).is_some(),
3264                "t1 should exist in incremental result"
3265            );
3266            assert!(
3267                state_incremental.try_get_entry(&id_t2).is_some(),
3268                "t2 should exist in incremental result"
3269            );
3270
3271            // --- Path B: All-at-once (single dry-run call with both ops) ---
3272
3273            let (state_all_at_once, _) = catalog
3274                .transact_incremental_dry_run(
3275                    &base_state,
3276                    vec![op_t1.clone(), op_t2.clone()],
3277                    None,
3278                    None,
3279                    oracle_write_ts,
3280                )
3281                .await
3282                .expect("all-at-once dry run");
3283
3284            assert!(
3285                state_all_at_once.try_get_entry(&id_t1).is_some(),
3286                "t1 should exist in all-at-once result"
3287            );
3288            assert!(
3289                state_all_at_once.try_get_entry(&id_t2).is_some(),
3290                "t2 should exist in all-at-once result"
3291            );
3292
3293            // --- Compare: both paths produce equivalent items ---
3294
3295            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3296            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3297            assert_eq!(inc_t1.name(), all_t1.name());
3298            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3299
3300            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3301            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3302            assert_eq!(inc_t2.name(), all_t2.name());
3303            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3304
3305            catalog.expire().await;
3306        })
3307        .await
3308    }
3309}