Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
51    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
52    RoleAttributesRaw, RoleMembership, RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use tracing::{info, trace};
66
67use crate::AdapterError;
68use crate::catalog::state::LocalExpressionCache;
69use crate::catalog::{
70    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
71    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
72    is_reserved_role_name, object_type_to_audit_object_type,
73    system_object_type_to_audit_object_type,
74};
75use crate::coord::ConnMeta;
76use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
77use crate::coord::cluster_scheduling::SchedulingDecision;
78use crate::util::ResultExt;
79
80#[derive(Debug, Clone)]
81pub enum Op {
82    AlterRetainHistory {
83        id: CatalogItemId,
84        value: Option<Value>,
85        window: CompactionWindow,
86    },
87    AlterSourceTimestampInterval {
88        id: CatalogItemId,
89        value: Option<Value>,
90        interval: Duration,
91    },
92    AlterRole {
93        id: RoleId,
94        name: String,
95        attributes: RoleAttributesRaw,
96        nopassword: bool,
97        vars: RoleVars,
98    },
99    AlterNetworkPolicy {
100        id: NetworkPolicyId,
101        rules: Vec<NetworkPolicyRule>,
102        name: String,
103        owner_id: RoleId,
104    },
105    AlterAddColumn {
106        id: CatalogItemId,
107        new_global_id: GlobalId,
108        name: ColumnName,
109        typ: SqlColumnType,
110        sql: RawDataType,
111    },
112    AlterMaterializedViewApplyReplacement {
113        id: CatalogItemId,
114        replacement_id: CatalogItemId,
115    },
116    CreateDatabase {
117        name: String,
118        owner_id: RoleId,
119    },
120    CreateSchema {
121        database_id: ResolvedDatabaseSpecifier,
122        schema_name: String,
123        owner_id: RoleId,
124    },
125    CreateRole {
126        name: String,
127        attributes: RoleAttributesRaw,
128    },
129    CreateCluster {
130        id: ClusterId,
131        name: String,
132        introspection_sources: Vec<&'static BuiltinLog>,
133        owner_id: RoleId,
134        config: ClusterConfig,
135    },
136    CreateClusterReplica {
137        cluster_id: ClusterId,
138        name: String,
139        config: ReplicaConfig,
140        owner_id: RoleId,
141        reason: ReplicaCreateDropReason,
142    },
143    CreateItem {
144        id: CatalogItemId,
145        name: QualifiedItemName,
146        item: CatalogItem,
147        owner_id: RoleId,
148    },
149    CreateNetworkPolicy {
150        rules: Vec<NetworkPolicyRule>,
151        name: String,
152        owner_id: RoleId,
153    },
154    Comment {
155        object_id: CommentObjectId,
156        sub_component: Option<usize>,
157        comment: Option<String>,
158    },
159    DropObjects(Vec<DropObjectInfo>),
160    GrantRole {
161        role_id: RoleId,
162        member_id: RoleId,
163        grantor_id: RoleId,
164    },
165    RenameCluster {
166        id: ClusterId,
167        name: String,
168        to_name: String,
169        check_reserved_names: bool,
170    },
171    RenameClusterReplica {
172        cluster_id: ClusterId,
173        replica_id: ReplicaId,
174        name: QualifiedReplica,
175        to_name: String,
176    },
177    RenameItem {
178        id: CatalogItemId,
179        current_full_name: FullItemName,
180        to_name: String,
181    },
182    RenameSchema {
183        database_spec: ResolvedDatabaseSpecifier,
184        schema_spec: SchemaSpecifier,
185        new_name: String,
186        check_reserved_names: bool,
187    },
188    UpdateOwner {
189        id: ObjectId,
190        new_owner: RoleId,
191    },
192    UpdatePrivilege {
193        target_id: SystemObjectId,
194        privilege: MzAclItem,
195        variant: UpdatePrivilegeVariant,
196    },
197    UpdateDefaultPrivilege {
198        privilege_object: DefaultPrivilegeObject,
199        privilege_acl_item: DefaultPrivilegeAclItem,
200        variant: UpdatePrivilegeVariant,
201    },
202    RevokeRole {
203        role_id: RoleId,
204        member_id: RoleId,
205        grantor_id: RoleId,
206    },
207    UpdateClusterConfig {
208        id: ClusterId,
209        name: String,
210        config: ClusterConfig,
211    },
212    UpdateClusterReplicaConfig {
213        cluster_id: ClusterId,
214        replica_id: ReplicaId,
215        config: ReplicaConfig,
216    },
217    UpdateItem {
218        id: CatalogItemId,
219        name: QualifiedItemName,
220        to_item: CatalogItem,
221    },
222    UpdateSourceReferences {
223        source_id: CatalogItemId,
224        references: SourceReferences,
225    },
226    UpdateSystemConfiguration {
227        name: String,
228        value: OwnedVarInput,
229    },
230    ResetSystemConfiguration {
231        name: String,
232    },
233    ResetAllSystemConfiguration,
234    /// Performs updates to the storage usage table, which probably should be a builtin source.
235    ///
236    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
237    /// might not work. If a process crashes after collecting storage usage events
238    /// but before updating the builtin table, then another listening catalog
239    /// will never know to update the builtin table.
240    WeirdStorageUsageUpdates {
241        object_id: Option<String>,
242        size_bytes: u64,
243        collection_timestamp: EpochMillis,
244    },
245}
246
247/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
248/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
249/// the `Op::DropObjects`.
250#[derive(Debug, Clone)]
251pub enum DropObjectInfo {
252    Cluster(ClusterId),
253    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
254    Database(DatabaseId),
255    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
256    Role(RoleId),
257    Item(CatalogItemId),
258    NetworkPolicy(NetworkPolicyId),
259}
260
261impl DropObjectInfo {
262    /// Creates a `DropObjectInfo` from an `ObjectId`.
263    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
264    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
265        match id {
266            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
267            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
268                cluster_id,
269                replica_id,
270                ReplicaCreateDropReason::Manual,
271            )),
272            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
273            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
274            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
275            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
276            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
277        }
278    }
279
280    /// Creates an `ObjectId` from a `DropObjectInfo`.
281    /// Loses the `ReplicaCreateDropReason` if there is one!
282    fn to_object_id(&self) -> ObjectId {
283        match &self {
284            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
285            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
286                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
287            }
288            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
289            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
290            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
291            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
292            DropObjectInfo::NetworkPolicy(network_policy_id) => {
293                ObjectId::NetworkPolicy(network_policy_id.clone())
294            }
295        }
296    }
297}
298
299/// The reason for creating or dropping a replica.
300#[derive(Debug, Clone)]
301pub enum ReplicaCreateDropReason {
302    /// The user initiated the replica create or drop, e.g., by
303    /// - creating/dropping a cluster,
304    /// - ALTERing various options on a managed cluster,
305    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
306    Manual,
307    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
308    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
309    ClusterScheduling(Vec<SchedulingDecision>),
310}
311
312impl ReplicaCreateDropReason {
313    pub fn into_audit_log(
314        self,
315    ) -> (
316        CreateOrDropClusterReplicaReasonV1,
317        Option<SchedulingDecisionsWithReasonsV2>,
318    ) {
319        let (reason, scheduling_policies) = match self {
320            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
321            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
322                CreateOrDropClusterReplicaReasonV1::Schedule,
323                Some(scheduling_decisions),
324            ),
325        };
326        (
327            reason,
328            scheduling_policies
329                .as_ref()
330                .map(SchedulingDecision::reasons_to_audit_log_reasons),
331        )
332    }
333}
334
335pub struct TransactionResult {
336    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
337    /// Parsed catalog updates from which we will derive catalog implications.
338    pub catalog_updates: Vec<ParsedStateUpdate>,
339    pub audit_events: Vec<VersionedEvent>,
340}
341
342#[derive(Debug, Clone, Copy)]
343enum TransactInnerMode {
344    /// Prepare storage state and return a commit-ready transaction state.
345    ///
346    /// This mode is used by durable catalog transactions.
347    Commit,
348    /// Execute a dry run against a durable transaction that will not be
349    /// committed.
350    ///
351    /// This mode still validates and applies updates to an in-memory
352    /// `CatalogState`, but it must not call `prepare_state` because dry runs
353    /// must not trigger controller side effects.
354    DryRun,
355}
356
357impl Catalog {
358    fn should_audit_log_item(item: &CatalogItem) -> bool {
359        !item.is_temporary()
360    }
361
362    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
363    /// within a connection id.
364    fn temporary_ids(
365        &self,
366        ops: &[Op],
367        temporary_drops: BTreeSet<(&ConnectionId, String)>,
368    ) -> Result<BTreeSet<CatalogItemId>, Error> {
369        let mut creating = BTreeSet::new();
370        let mut temporary_ids = BTreeSet::new();
371        for op in ops.iter() {
372            if let Op::CreateItem {
373                id,
374                name,
375                item,
376                owner_id: _,
377            } = op
378            {
379                if let Some(conn_id) = item.conn_id() {
380                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
381                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
382                        || creating.contains(&(conn_id, &name.item))
383                    {
384                        return Err(
385                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
386                        );
387                    } else {
388                        creating.insert((conn_id, &name.item));
389                        temporary_ids.insert(id.clone());
390                    }
391                }
392            }
393        }
394        Ok(temporary_ids)
395    }
396
397    #[instrument(name = "catalog::transact")]
398    pub async fn transact(
399        &mut self,
400        // n.b. this is an option to prevent us from needing to build out a
401        // dummy impl of `StorageController` for tests.
402        storage_collections: Option<
403            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
404        >,
405        oracle_write_ts: mz_repr::Timestamp,
406        session: Option<&ConnMeta>,
407        ops: Vec<Op>,
408    ) -> Result<TransactionResult, AdapterError> {
409        trace!("transact: {:?}", ops);
410        fail::fail_point!("catalog_transact", |arg| {
411            Err(AdapterError::Unstructured(anyhow::anyhow!(
412                "failpoint: {arg:?}"
413            )))
414        });
415
416        let drop_ids: BTreeSet<CatalogItemId> = ops
417            .iter()
418            .filter_map(|op| match op {
419                Op::DropObjects(drop_object_infos) => {
420                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
421                    let item_ids = ids.filter_map(|id| match id {
422                        ObjectId::Item(id) => Some(id),
423                        _ => None,
424                    });
425                    Some(item_ids)
426                }
427                _ => None,
428            })
429            .flatten()
430            .collect();
431        let temporary_drops = drop_ids
432            .iter()
433            .filter_map(|id| {
434                let entry = self.get_entry(id);
435                match entry.item().conn_id() {
436                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
437                    None => None,
438                }
439            })
440            .collect();
441        let dropped_global_ids = drop_ids
442            .iter()
443            .flat_map(|item_id| self.get_global_ids(item_id))
444            .collect();
445
446        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
447        let mut builtin_table_updates = vec![];
448        let mut catalog_updates = vec![];
449        let mut audit_events = vec![];
450        let mut storage = self.storage().await;
451        let mut tx = storage
452            .transaction()
453            .await
454            .unwrap_or_terminate("starting catalog transaction");
455
456        let new_state = Self::transact_inner(
457            TransactInnerMode::Commit,
458            storage_collections,
459            oracle_write_ts,
460            session,
461            ops,
462            temporary_ids,
463            &mut builtin_table_updates,
464            &mut catalog_updates,
465            &mut audit_events,
466            &mut tx,
467            &self.state,
468        )
469        .await?;
470
471        // The user closure was successful, apply the updates. Terminate the
472        // process if this fails, because we have to restart envd due to
473        // indeterminate catalog state, which we only reconcile during catalog
474        // init.
475        tx.commit(oracle_write_ts)
476            .await
477            .unwrap_or_terminate("catalog storage transaction commit must succeed");
478
479        // Dropping here keeps the mutable borrow on self, preventing us accidentally
480        // mutating anything until after f is executed.
481        drop(storage);
482        if let Some(new_state) = new_state {
483            self.transient_revision += 1;
484            self.state = new_state;
485        }
486
487        // Drop in-memory planning metadata.
488        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
489        if self.state.system_config().enable_mz_notices() {
490            // Generate retractions for the Builtin tables.
491            self.state().pack_optimizer_notices(
492                &mut builtin_table_updates,
493                dropped_notices.iter(),
494                Diff::MINUS_ONE,
495            );
496        }
497
498        Ok(TransactionResult {
499            builtin_table_updates,
500            catalog_updates,
501            audit_events,
502        })
503    }
504
505    /// Performs an incremental dry-run catalog transaction: processes only the
506    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
507    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
508    ///
509    /// The durable transaction is intentionally never committed and no storage
510    /// controller prepare-state side effects are run.
511    ///
512    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
513    /// snapshot (which represents the tx state after the previous dry run),
514    /// ensuring it starts in sync with `base_state`. If `None` (first
515    /// statement), a fresh transaction is loaded from durable storage.
516    ///
517    /// Returns the new accumulated state and a snapshot of the transaction's
518    /// state for use in subsequent incremental dry runs.
519    pub async fn transact_incremental_dry_run(
520        &self,
521        base_state: &CatalogState,
522        ops: Vec<Op>,
523        session: Option<&ConnMeta>,
524        prev_snapshot: Option<Snapshot>,
525        oracle_write_ts: mz_repr::Timestamp,
526    ) -> Result<(CatalogState, Snapshot), AdapterError> {
527        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
528        // but we still need to check for collisions.
529        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
530
531        let mut builtin_table_updates = vec![];
532        let mut catalog_updates = vec![];
533        let mut audit_events = vec![];
534        let mut storage = self.storage().await;
535        let mut tx = if let Some(snapshot) = prev_snapshot {
536            // Restore transaction from saved snapshot so it starts in sync
537            // with the accumulated CatalogState from previous dry runs.
538            storage
539                .transaction_from_snapshot(snapshot)
540                .unwrap_or_terminate("starting catalog transaction from snapshot")
541        } else {
542            // First statement: fresh transaction from durable storage, which
543            // is in sync with the real catalog state.
544            storage
545                .transaction()
546                .await
547                .unwrap_or_terminate("starting catalog transaction")
548        };
549
550        // Process only the new ops against the accumulated state in dry-run mode.
551        let new_state = Self::transact_inner(
552            TransactInnerMode::DryRun,
553            None,
554            oracle_write_ts,
555            session,
556            ops,
557            temporary_ids,
558            &mut builtin_table_updates,
559            &mut catalog_updates,
560            &mut audit_events,
561            &mut tx,
562            base_state,
563        )
564        .await?;
565
566        // Save the transaction's current state as a snapshot for the next
567        // incremental dry run.
568        let new_snapshot = tx.current_snapshot();
569
570        // Transaction is NOT committed — drop it.
571        drop(storage);
572
573        // transact_inner returns Some(state) when ops produced changes.
574        let state = new_state.unwrap_or_else(|| base_state.clone());
575        Ok((state, new_snapshot))
576    }
577
578    /// Extracts optimized expressions from `Op::CreateItem` operations for views
579    /// and materialized views. These can be used to populate a `LocalExpressionCache`
580    /// to avoid re-optimization during `apply_updates`.
581    fn extract_expressions_from_ops(
582        ops: &[Op],
583        optimizer_features: &OptimizerFeatures,
584    ) -> BTreeMap<GlobalId, LocalExpressions> {
585        let mut exprs = BTreeMap::new();
586
587        for op in ops {
588            if let Op::CreateItem { item, .. } = op {
589                match item {
590                    CatalogItem::View(view) => {
591                        exprs.insert(
592                            view.global_id,
593                            LocalExpressions {
594                                local_mir: (*view.optimized_expr).clone(),
595                                optimizer_features: optimizer_features.clone(),
596                            },
597                        );
598                    }
599                    CatalogItem::MaterializedView(mv) => {
600                        exprs.insert(
601                            mv.global_id_writes(),
602                            LocalExpressions {
603                                local_mir: (*mv.optimized_expr).clone(),
604                                optimizer_features: optimizer_features.clone(),
605                            },
606                        );
607                    }
608                    _ => {}
609                }
610            }
611        }
612
613        exprs
614    }
615
616    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
617    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
618    ///
619    /// `mode` controls whether storage prepare-state side effects are allowed.
620    /// In `DryRun` mode, this method may update the in-memory state returned to
621    /// the caller, but it must not trigger controller side effects.
622    ///
623    #[instrument(name = "catalog::transact_inner")]
624    async fn transact_inner(
625        mode: TransactInnerMode,
626        storage_collections: Option<
627            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
628        >,
629        oracle_write_ts: mz_repr::Timestamp,
630        session: Option<&ConnMeta>,
631        ops: Vec<Op>,
632        temporary_ids: BTreeSet<CatalogItemId>,
633        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
634        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
635        audit_events: &mut Vec<VersionedEvent>,
636        tx: &mut Transaction<'_>,
637        state: &CatalogState,
638    ) -> Result<Option<CatalogState>, AdapterError> {
639        // We come up with new catalog state, builtin state updates, and parsed
640        // catalog updates (for deriving catalog implications) in two phases:
641        //
642        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
643        //    one-by-one. This will give us the full list of updates to apply to
644        //    the catalog, which will allow us to apply it in one batch, which
645        //    in turn will allow the apply machinery to consolidate the updates.
646        // 2. We do one final apply call with all updates, which gives us the
647        //    final builtin table updates and parsed catalog updates.
648        //
649        // The reason is that the loop that is working off ops first does a
650        // transact_op to derive the state updates for that op, and then calls
651        // apply_updates on the catalog state. And successive ops might expect
652        // the catalog state to reflect the modified state _after_ applying
653        // previous ops.
654        //
655        // We want to, however, have one final apply_state that takes all the
656        // accumulated updates to derive the required controller updates and the
657        // builtin table updates.
658        //
659        // We won't win any DDL throughput benchmarks, but so far that's not
660        // what we're optimizing for and there would probably be other
661        // bottlenecks before we hit this one as a bottleneck.
662        //
663        // We could work around this by refactoring how the interplay of
664        // transact_op and apply_updates works, but that's a larger undertaking.
665        let mut preliminary_state = Cow::Borrowed(state);
666
667        // The final state that we will return, if modified.
668        let mut state = Cow::Borrowed(state);
669
670        if ops.is_empty() {
671            return Ok(None);
672        }
673
674        // Extract optimized expressions from CreateItem ops to avoid re-optimization
675        // during apply_updates. We extract before the loop since `ops` is moved there.
676        let optimizer_features = OptimizerFeatures::from(state.system_config());
677        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
678
679        let mut storage_collections_to_create = BTreeSet::new();
680        let mut storage_collections_to_drop = BTreeSet::new();
681        let mut storage_collections_to_register = BTreeMap::new();
682
683        let mut updates = Vec::new();
684
685        for op in ops {
686            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
687                oracle_write_ts,
688                session,
689                op,
690                &temporary_ids,
691                audit_events,
692                tx,
693                &*preliminary_state,
694                &mut storage_collections_to_create,
695                &mut storage_collections_to_drop,
696                &mut storage_collections_to_register,
697            )
698            .await?;
699
700            // Certain builtin tables are not derived from the durable catalog state, so they need
701            // to be updated ad-hoc based on the current transaction. This is weird and will not
702            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
703            // this instance crashes after committing the durable catalog but before applying the
704            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
705            // world. Currently, this works fine and there are no correctness issues because
706            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
707            // state of all builtin tables to the correct values.
708            if let Some(builtin_table_update) = weird_builtin_table_update {
709                builtin_table_updates.push(builtin_table_update);
710            }
711
712            // Temporary items are not stored in the durable catalog, so they need to be handled
713            // separately for updating state and builtin tables.
714            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
715            // in a multi-subscriber catalog world.
716            let upper = tx.upper();
717            let temporary_item_updates =
718                temporary_item_updates
719                    .into_iter()
720                    .map(|(item, diff)| StateUpdate {
721                        kind: StateUpdateKind::TemporaryItem(item),
722                        ts: upper,
723                        diff,
724                    });
725
726            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
727            op_updates.extend(temporary_item_updates);
728            if !op_updates.is_empty() {
729                // Clone the cache so each apply_updates call has access to cached expressions.
730                // The cache uses `remove` semantics, so we need a fresh clone for each call.
731                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
732                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
733                    .to_mut()
734                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
735                    .await;
736            }
737            updates.append(&mut op_updates);
738        }
739
740        if !updates.is_empty() {
741            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
742            let (op_builtin_table_updates, op_catalog_updates) = state
743                .to_mut()
744                .apply_updates(updates.clone(), &mut local_expr_cache)
745                .await;
746            let op_builtin_table_updates = state
747                .to_mut()
748                .resolve_builtin_table_updates(op_builtin_table_updates);
749            builtin_table_updates.extend(op_builtin_table_updates);
750            parsed_catalog_updates.extend(op_catalog_updates);
751        }
752
753        match mode {
754            TransactInnerMode::Commit => {
755                // `storage_collections` can be `None` in tests.
756                if let Some(c) = storage_collections {
757                    c.prepare_state(
758                        tx,
759                        storage_collections_to_create,
760                        storage_collections_to_drop,
761                        storage_collections_to_register,
762                    )
763                    .await?;
764                }
765            }
766            TransactInnerMode::DryRun => {
767                debug_assert!(
768                    storage_collections.is_none(),
769                    "dry-run mode must not prepare storage state"
770                );
771            }
772        }
773
774        let updates = tx.get_and_commit_op_updates();
775        if !updates.is_empty() {
776            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
777            let (op_builtin_table_updates, op_catalog_updates) = state
778                .to_mut()
779                .apply_updates(updates.clone(), &mut local_expr_cache)
780                .await;
781            let op_builtin_table_updates = state
782                .to_mut()
783                .resolve_builtin_table_updates(op_builtin_table_updates);
784            builtin_table_updates.extend(op_builtin_table_updates);
785            parsed_catalog_updates.extend(op_catalog_updates);
786        }
787
788        match state {
789            Cow::Owned(state) => Ok(Some(state)),
790            Cow::Borrowed(_) => Ok(None),
791        }
792    }
793
794    /// Performs the transaction operation described by `op`. This function prepares the changes in
795    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
796    /// changes.
797    ///
798    /// Optionally returns a builtin table update for any builtin table updates than cannot be
799    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
800    /// scenarios and ideally in the future don't exist.
801    #[instrument]
802    async fn transact_op(
803        oracle_write_ts: mz_repr::Timestamp,
804        session: Option<&ConnMeta>,
805        op: Op,
806        temporary_ids: &BTreeSet<CatalogItemId>,
807        audit_events: &mut Vec<VersionedEvent>,
808        tx: &mut Transaction<'_>,
809        state: &CatalogState,
810        storage_collections_to_create: &mut BTreeSet<GlobalId>,
811        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
812        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
813    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
814        let mut weird_builtin_table_update = None;
815        let mut temporary_item_updates = Vec::new();
816
817        match op {
818            Op::AlterRetainHistory { id, value, window } => {
819                let entry = state.get_entry(&id);
820                if id.is_system() {
821                    let name = entry.name();
822                    let full_name =
823                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
824                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
825                        full_name.to_string(),
826                    ))));
827                }
828
829                let mut new_entry = entry.clone();
830                let previous = new_entry
831                    .item
832                    .update_retain_history(value.clone(), window)
833                    .map_err(|_| {
834                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
835                            "planner should have rejected invalid alter retain history item type"
836                                .to_string(),
837                        )))
838                    })?;
839
840                if Self::should_audit_log_item(new_entry.item()) {
841                    let details =
842                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
843                            id: id.to_string(),
844                            old_history: previous.map(|previous| previous.to_string()),
845                            new_history: value.map(|v| v.to_string()),
846                        });
847                    CatalogState::add_to_audit_log(
848                        &state.system_configuration,
849                        oracle_write_ts,
850                        session,
851                        tx,
852                        audit_events,
853                        EventType::Alter,
854                        catalog_type_to_audit_object_type(new_entry.item().typ()),
855                        details,
856                    )?;
857                }
858
859                tx.update_item(id, new_entry.into())?;
860
861                Self::log_update(state, &id);
862            }
863            Op::AlterSourceTimestampInterval {
864                id,
865                value,
866                interval,
867            } => {
868                let entry = state.get_entry(&id);
869                if id.is_system() {
870                    let name = entry.name();
871                    let full_name =
872                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
873                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
874                        full_name.to_string(),
875                    ))));
876                }
877
878                let mut new_entry = entry.clone();
879                new_entry
880                    .item
881                    .update_timestamp_interval(value, interval)
882                    .map_err(|_| {
883                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
884                            "planner should have rejected invalid alter timestamp interval item type"
885                                .to_string(),
886                        )))
887                    })?;
888
889                tx.update_item(id, new_entry.into())?;
890
891                Self::log_update(state, &id);
892            }
893            Op::AlterRole {
894                id,
895                name,
896                attributes,
897                nopassword,
898                vars,
899            } => {
900                state.ensure_not_reserved_role(&id)?;
901
902                let mut existing_role = state.get_role(&id).clone();
903                let password = attributes.password.clone();
904                let scram_iterations = attributes
905                    .scram_iterations
906                    .unwrap_or_else(|| state.system_config().scram_iterations());
907                existing_role.attributes = attributes.into();
908                existing_role.vars = vars;
909                let password_action = if nopassword {
910                    PasswordAction::Clear
911                } else if let Some(password) = password {
912                    PasswordAction::Set(PasswordConfig {
913                        password,
914                        scram_iterations,
915                    })
916                } else {
917                    PasswordAction::NoChange
918                };
919                tx.update_role(id, existing_role.into(), password_action)?;
920
921                CatalogState::add_to_audit_log(
922                    &state.system_configuration,
923                    oracle_write_ts,
924                    session,
925                    tx,
926                    audit_events,
927                    EventType::Alter,
928                    ObjectType::Role,
929                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
930                        id: id.to_string(),
931                        name: name.clone(),
932                    }),
933                )?;
934
935                info!("update role {name} ({id})");
936            }
937            Op::AlterNetworkPolicy {
938                id,
939                rules,
940                name,
941                owner_id: _owner_id,
942            } => {
943                let existing_policy = state.get_network_policy(&id).clone();
944                let mut policy: NetworkPolicy = existing_policy.into();
945                policy.rules = rules;
946                if is_reserved_name(&name) {
947                    return Err(AdapterError::Catalog(Error::new(
948                        ErrorKind::ReservedNetworkPolicyName(name),
949                    )));
950                }
951                tx.update_network_policy(id, policy.clone())?;
952
953                CatalogState::add_to_audit_log(
954                    &state.system_configuration,
955                    oracle_write_ts,
956                    session,
957                    tx,
958                    audit_events,
959                    EventType::Alter,
960                    ObjectType::NetworkPolicy,
961                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
962                        id: id.to_string(),
963                        name: name.clone(),
964                    }),
965                )?;
966
967                info!("update network policy {name} ({id})");
968            }
969            Op::AlterAddColumn {
970                id,
971                new_global_id,
972                name,
973                typ,
974                sql,
975            } => {
976                let mut new_entry = state.get_entry(&id).clone();
977                let version = new_entry.item.add_column(name, typ, sql)?;
978                // All versions of a table share the same shard, so it shouldn't matter what
979                // GlobalId we use here.
980                let shard_id = state
981                    .storage_metadata()
982                    .get_collection_shard(new_entry.latest_global_id())?;
983
984                // TODO(alter_table): Support adding columns to sources.
985                let CatalogItem::Table(table) = &mut new_entry.item else {
986                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
987                };
988                table.collections.insert(version, new_global_id);
989
990                tx.update_item(id, new_entry.into())?;
991                storage_collections_to_register.insert(new_global_id, shard_id);
992            }
993            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
994                let mut new_entry = state.get_entry(&id).clone();
995                let replacement = state.get_entry(&replacement_id);
996
997                let new_audit_events =
998                    apply_replacement_audit_events(state, &new_entry, replacement);
999
1000                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1001                    return Err(AdapterError::internal(
1002                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1003                        "id must refer to a materialized view",
1004                    ));
1005                };
1006                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1007                    return Err(AdapterError::internal(
1008                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1009                        "replacement_id must refer to a materialized view",
1010                    ));
1011                };
1012
1013                mv.apply_replacement(replacement_mv.clone());
1014
1015                tx.remove_item(replacement_id)?;
1016
1017                new_entry.id = replacement_id;
1018                tx_replace_item(tx, state, id, new_entry)?;
1019
1020                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1021                tx.drop_comments(&[comment_id].into())?;
1022
1023                for (event_type, details) in new_audit_events {
1024                    CatalogState::add_to_audit_log(
1025                        &state.system_configuration,
1026                        oracle_write_ts,
1027                        session,
1028                        tx,
1029                        audit_events,
1030                        event_type,
1031                        ObjectType::MaterializedView,
1032                        details,
1033                    )?;
1034                }
1035            }
1036            Op::CreateDatabase { name, owner_id } => {
1037                let database_owner_privileges = vec![rbac::owner_privilege(
1038                    mz_sql::catalog::ObjectType::Database,
1039                    owner_id,
1040                )];
1041                let database_default_privileges = state
1042                    .default_privileges
1043                    .get_applicable_privileges(
1044                        owner_id,
1045                        None,
1046                        None,
1047                        mz_sql::catalog::ObjectType::Database,
1048                    )
1049                    .map(|item| item.mz_acl_item(owner_id));
1050                let database_privileges: Vec<_> = merge_mz_acl_items(
1051                    database_owner_privileges
1052                        .into_iter()
1053                        .chain(database_default_privileges),
1054                )
1055                .collect();
1056
1057                let schema_owner_privileges = vec![rbac::owner_privilege(
1058                    mz_sql::catalog::ObjectType::Schema,
1059                    owner_id,
1060                )];
1061                let schema_default_privileges = state
1062                    .default_privileges
1063                    .get_applicable_privileges(
1064                        owner_id,
1065                        None,
1066                        None,
1067                        mz_sql::catalog::ObjectType::Schema,
1068                    )
1069                    .map(|item| item.mz_acl_item(owner_id))
1070                    // Special default privilege on public schemas.
1071                    .chain(std::iter::once(MzAclItem {
1072                        grantee: RoleId::Public,
1073                        grantor: owner_id,
1074                        acl_mode: AclMode::USAGE,
1075                    }));
1076                let schema_privileges: Vec<_> = merge_mz_acl_items(
1077                    schema_owner_privileges
1078                        .into_iter()
1079                        .chain(schema_default_privileges),
1080                )
1081                .collect();
1082
1083                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1084                let (database_id, _) = tx.insert_user_database(
1085                    &name,
1086                    owner_id,
1087                    database_privileges.clone(),
1088                    &temporary_oids,
1089                )?;
1090                let (schema_id, _) = tx.insert_user_schema(
1091                    database_id,
1092                    DEFAULT_SCHEMA,
1093                    owner_id,
1094                    schema_privileges.clone(),
1095                    &temporary_oids,
1096                )?;
1097                CatalogState::add_to_audit_log(
1098                    &state.system_configuration,
1099                    oracle_write_ts,
1100                    session,
1101                    tx,
1102                    audit_events,
1103                    EventType::Create,
1104                    ObjectType::Database,
1105                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1106                        id: database_id.to_string(),
1107                        name: name.clone(),
1108                    }),
1109                )?;
1110                info!("create database {}", name);
1111
1112                CatalogState::add_to_audit_log(
1113                    &state.system_configuration,
1114                    oracle_write_ts,
1115                    session,
1116                    tx,
1117                    audit_events,
1118                    EventType::Create,
1119                    ObjectType::Schema,
1120                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1121                        id: schema_id.to_string(),
1122                        name: DEFAULT_SCHEMA.to_string(),
1123                        database_name: Some(name),
1124                    }),
1125                )?;
1126            }
1127            Op::CreateSchema {
1128                database_id,
1129                schema_name,
1130                owner_id,
1131            } => {
1132                if is_reserved_name(&schema_name) {
1133                    return Err(AdapterError::Catalog(Error::new(
1134                        ErrorKind::ReservedSchemaName(schema_name),
1135                    )));
1136                }
1137                let database_id = match database_id {
1138                    ResolvedDatabaseSpecifier::Id(id) => id,
1139                    ResolvedDatabaseSpecifier::Ambient => {
1140                        return Err(AdapterError::Catalog(Error::new(
1141                            ErrorKind::ReadOnlySystemSchema(schema_name),
1142                        )));
1143                    }
1144                };
1145                let owner_privileges = vec![rbac::owner_privilege(
1146                    mz_sql::catalog::ObjectType::Schema,
1147                    owner_id,
1148                )];
1149                let default_privileges = state
1150                    .default_privileges
1151                    .get_applicable_privileges(
1152                        owner_id,
1153                        Some(database_id),
1154                        None,
1155                        mz_sql::catalog::ObjectType::Schema,
1156                    )
1157                    .map(|item| item.mz_acl_item(owner_id));
1158                let privileges: Vec<_> =
1159                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1160                        .collect();
1161                let (schema_id, _) = tx.insert_user_schema(
1162                    database_id,
1163                    &schema_name,
1164                    owner_id,
1165                    privileges.clone(),
1166                    &state.get_temporary_oids().collect(),
1167                )?;
1168                CatalogState::add_to_audit_log(
1169                    &state.system_configuration,
1170                    oracle_write_ts,
1171                    session,
1172                    tx,
1173                    audit_events,
1174                    EventType::Create,
1175                    ObjectType::Schema,
1176                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1177                        id: schema_id.to_string(),
1178                        name: schema_name.clone(),
1179                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1180                    }),
1181                )?;
1182            }
1183            Op::CreateRole { name, attributes } => {
1184                if is_reserved_role_name(&name) {
1185                    return Err(AdapterError::Catalog(Error::new(
1186                        ErrorKind::ReservedRoleName(name),
1187                    )));
1188                }
1189                let membership = RoleMembership::new();
1190                let vars = RoleVars::default();
1191                let (id, _) = tx.insert_user_role(
1192                    name.clone(),
1193                    attributes.clone(),
1194                    membership.clone(),
1195                    vars.clone(),
1196                    &state.get_temporary_oids().collect(),
1197                )?;
1198                CatalogState::add_to_audit_log(
1199                    &state.system_configuration,
1200                    oracle_write_ts,
1201                    session,
1202                    tx,
1203                    audit_events,
1204                    EventType::Create,
1205                    ObjectType::Role,
1206                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1207                        id: id.to_string(),
1208                        name: name.clone(),
1209                    }),
1210                )?;
1211                info!("create role {}", name);
1212            }
1213            Op::CreateCluster {
1214                id,
1215                name,
1216                introspection_sources,
1217                owner_id,
1218                config,
1219            } => {
1220                if is_reserved_name(&name) {
1221                    return Err(AdapterError::Catalog(Error::new(
1222                        ErrorKind::ReservedClusterName(name),
1223                    )));
1224                }
1225                let owner_privileges = vec![rbac::owner_privilege(
1226                    mz_sql::catalog::ObjectType::Cluster,
1227                    owner_id,
1228                )];
1229                let default_privileges = state
1230                    .default_privileges
1231                    .get_applicable_privileges(
1232                        owner_id,
1233                        None,
1234                        None,
1235                        mz_sql::catalog::ObjectType::Cluster,
1236                    )
1237                    .map(|item| item.mz_acl_item(owner_id));
1238                let privileges: Vec<_> =
1239                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1240                        .collect();
1241                let introspection_source_ids: Vec<_> = introspection_sources
1242                    .iter()
1243                    .map(|introspection_source| {
1244                        Transaction::allocate_introspection_source_index_id(
1245                            &id,
1246                            introspection_source.variant,
1247                        )
1248                    })
1249                    .collect();
1250
1251                let introspection_sources = introspection_sources
1252                    .into_iter()
1253                    .zip_eq(introspection_source_ids)
1254                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1255                    .collect();
1256
1257                tx.insert_user_cluster(
1258                    id,
1259                    &name,
1260                    introspection_sources,
1261                    owner_id,
1262                    privileges.clone(),
1263                    config.clone().into(),
1264                    &state.get_temporary_oids().collect(),
1265                )?;
1266                CatalogState::add_to_audit_log(
1267                    &state.system_configuration,
1268                    oracle_write_ts,
1269                    session,
1270                    tx,
1271                    audit_events,
1272                    EventType::Create,
1273                    ObjectType::Cluster,
1274                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1275                        id: id.to_string(),
1276                        name: name.clone(),
1277                    }),
1278                )?;
1279                info!("create cluster {}", name);
1280            }
1281            Op::CreateClusterReplica {
1282                cluster_id,
1283                name,
1284                config,
1285                owner_id,
1286                reason,
1287            } => {
1288                if is_reserved_name(&name) {
1289                    return Err(AdapterError::Catalog(Error::new(
1290                        ErrorKind::ReservedReplicaName(name),
1291                    )));
1292                }
1293                let cluster = state.get_cluster(cluster_id);
1294                let id =
1295                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1296                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1297                    size,
1298                    billed_as,
1299                    internal,
1300                    ..
1301                }) = &config.location
1302                {
1303                    let (reason, scheduling_policies) = reason.into_audit_log();
1304                    let details = EventDetails::CreateClusterReplicaV4(
1305                        mz_audit_log::CreateClusterReplicaV4 {
1306                            cluster_id: cluster_id.to_string(),
1307                            cluster_name: cluster.name.clone(),
1308                            replica_id: Some(id.to_string()),
1309                            replica_name: name.clone(),
1310                            logical_size: size.clone(),
1311                            billed_as: billed_as.clone(),
1312                            internal: *internal,
1313                            reason,
1314                            scheduling_policies,
1315                        },
1316                    );
1317                    CatalogState::add_to_audit_log(
1318                        &state.system_configuration,
1319                        oracle_write_ts,
1320                        session,
1321                        tx,
1322                        audit_events,
1323                        EventType::Create,
1324                        ObjectType::ClusterReplica,
1325                        details,
1326                    )?;
1327                }
1328            }
1329            Op::CreateItem {
1330                id,
1331                name,
1332                item,
1333                owner_id,
1334            } => {
1335                state.check_unstable_dependencies(&item)?;
1336
1337                match &item {
1338                    CatalogItem::Table(table) => {
1339                        let gids: Vec<_> = table.global_ids().collect();
1340                        assert_eq!(gids.len(), 1);
1341                        storage_collections_to_create.extend(gids);
1342                    }
1343                    CatalogItem::Source(source) => {
1344                        storage_collections_to_create.insert(source.global_id());
1345                    }
1346                    CatalogItem::MaterializedView(mv) => {
1347                        let mv_gid = mv.global_id_writes();
1348                        if let Some(target_id) = mv.replacement_target {
1349                            let target_gid = state.get_entry(&target_id).latest_global_id();
1350                            let shard_id =
1351                                state.storage_metadata().get_collection_shard(target_gid)?;
1352                            storage_collections_to_register.insert(mv_gid, shard_id);
1353                        } else {
1354                            storage_collections_to_create.insert(mv_gid);
1355                        }
1356                    }
1357                    CatalogItem::ContinualTask(ct) => {
1358                        storage_collections_to_create.insert(ct.global_id());
1359                    }
1360                    CatalogItem::Sink(sink) => {
1361                        storage_collections_to_create.insert(sink.global_id());
1362                    }
1363                    CatalogItem::Log(_)
1364                    | CatalogItem::View(_)
1365                    | CatalogItem::Index(_)
1366                    | CatalogItem::Type(_)
1367                    | CatalogItem::Func(_)
1368                    | CatalogItem::Secret(_)
1369                    | CatalogItem::Connection(_) => (),
1370                }
1371
1372                let system_user = session.map_or(false, |s| s.user().is_system_user());
1373                if !system_user {
1374                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1375                        let cluster_name = state.clusters_by_id[&id].name.clone();
1376                        return Err(AdapterError::Catalog(Error::new(
1377                            ErrorKind::ReadOnlyCluster(cluster_name),
1378                        )));
1379                    }
1380                }
1381
1382                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1383                let default_privileges = state
1384                    .default_privileges
1385                    .get_applicable_privileges(
1386                        owner_id,
1387                        name.qualifiers.database_spec.id(),
1388                        Some(name.qualifiers.schema_spec.into()),
1389                        item.typ().into(),
1390                    )
1391                    .map(|item| item.mz_acl_item(owner_id));
1392                // mz_support can read all progress sources.
1393                let progress_source_privilege = if item.is_progress_source() {
1394                    Some(MzAclItem {
1395                        grantee: MZ_SUPPORT_ROLE_ID,
1396                        grantor: owner_id,
1397                        acl_mode: AclMode::SELECT,
1398                    })
1399                } else {
1400                    None
1401                };
1402                let privileges: Vec<_> = merge_mz_acl_items(
1403                    owner_privileges
1404                        .into_iter()
1405                        .chain(default_privileges)
1406                        .chain(progress_source_privilege),
1407                )
1408                .collect();
1409
1410                let temporary_oids = state.get_temporary_oids().collect();
1411
1412                if item.is_temporary() {
1413                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1414                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1415                    {
1416                        return Err(AdapterError::Catalog(Error::new(
1417                            ErrorKind::InvalidTemporarySchema,
1418                        )));
1419                    }
1420                    let oid = tx.allocate_oid(&temporary_oids)?;
1421
1422                    let schema_id = name.qualifiers.schema_spec.clone().into();
1423                    let item_type = item.typ();
1424                    let (create_sql, global_id, versions) = item.to_serialized();
1425
1426                    let item = TemporaryItem {
1427                        id,
1428                        oid,
1429                        global_id,
1430                        schema_id,
1431                        name: name.item.clone(),
1432                        create_sql,
1433                        conn_id: item.conn_id().cloned(),
1434                        owner_id,
1435                        privileges: privileges.clone(),
1436                        extra_versions: versions,
1437                    };
1438                    temporary_item_updates.push((item, StateDiff::Addition));
1439
1440                    info!(
1441                        "create temporary {} {} ({})",
1442                        item_type,
1443                        state.resolve_full_name(&name, None),
1444                        id
1445                    );
1446                } else {
1447                    if let Some(temp_id) =
1448                        item.uses()
1449                            .iter()
1450                            .find(|id| match state.try_get_entry(*id) {
1451                                Some(entry) => entry.item().is_temporary(),
1452                                None => temporary_ids.contains(id),
1453                            })
1454                    {
1455                        let temp_item = state.get_entry(temp_id);
1456                        return Err(AdapterError::Catalog(Error::new(
1457                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1458                        )));
1459                    }
1460                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1461                        && !system_user
1462                    {
1463                        let schema_name = state
1464                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1465                            .schema;
1466                        return Err(AdapterError::Catalog(Error::new(
1467                            ErrorKind::ReadOnlySystemSchema(schema_name),
1468                        )));
1469                    }
1470                    let schema_id = name.qualifiers.schema_spec.clone().into();
1471                    let item_type = item.typ();
1472                    let (create_sql, global_id, versions) = item.to_serialized();
1473                    tx.insert_user_item(
1474                        id,
1475                        global_id,
1476                        schema_id,
1477                        &name.item,
1478                        create_sql,
1479                        owner_id,
1480                        privileges.clone(),
1481                        &temporary_oids,
1482                        versions,
1483                    )?;
1484                    info!(
1485                        "create {} {} ({})",
1486                        item_type,
1487                        state.resolve_full_name(&name, None),
1488                        id
1489                    );
1490                }
1491
1492                if Self::should_audit_log_item(&item) {
1493                    let name = Self::full_name_detail(
1494                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1495                    );
1496                    let details = match &item {
1497                        CatalogItem::Source(s) => {
1498                            let cluster_id = match s.data_source {
1499                                // Ingestion exports don't have their own cluster, but
1500                                // run on their ingestion's cluster.
1501                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1502                                    match state.get_entry(&ingestion_id).cluster_id() {
1503                                        Some(cluster_id) => Some(cluster_id.to_string()),
1504                                        None => None,
1505                                    }
1506                                }
1507                                _ => match item.cluster_id() {
1508                                    Some(cluster_id) => Some(cluster_id.to_string()),
1509                                    None => None,
1510                                },
1511                            };
1512
1513                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1514                                id: id.to_string(),
1515                                cluster_id,
1516                                name,
1517                                external_type: s.source_type().to_string(),
1518                            })
1519                        }
1520                        CatalogItem::Sink(s) => {
1521                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1522                                id: id.to_string(),
1523                                cluster_id: Some(s.cluster_id.to_string()),
1524                                name,
1525                                external_type: s.sink_type().to_string(),
1526                            })
1527                        }
1528                        CatalogItem::Index(i) => {
1529                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1530                                id: id.to_string(),
1531                                name,
1532                                cluster_id: i.cluster_id.to_string(),
1533                            })
1534                        }
1535                        CatalogItem::MaterializedView(mv) => {
1536                            EventDetails::CreateMaterializedViewV1(
1537                                mz_audit_log::CreateMaterializedViewV1 {
1538                                    id: id.to_string(),
1539                                    name,
1540                                    cluster_id: mv.cluster_id.to_string(),
1541                                    replacement_target_id: mv
1542                                        .replacement_target
1543                                        .map(|id| id.to_string()),
1544                                },
1545                            )
1546                        }
1547                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1548                            id: id.to_string(),
1549                            name,
1550                        }),
1551                    };
1552                    CatalogState::add_to_audit_log(
1553                        &state.system_configuration,
1554                        oracle_write_ts,
1555                        session,
1556                        tx,
1557                        audit_events,
1558                        EventType::Create,
1559                        catalog_type_to_audit_object_type(item.typ()),
1560                        details,
1561                    )?;
1562                }
1563            }
1564            Op::CreateNetworkPolicy {
1565                rules,
1566                name,
1567                owner_id,
1568            } => {
1569                if state.network_policies_by_name.contains_key(&name) {
1570                    return Err(AdapterError::PlanError(PlanError::Catalog(
1571                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1572                    )));
1573                }
1574                if is_reserved_name(&name) {
1575                    return Err(AdapterError::Catalog(Error::new(
1576                        ErrorKind::ReservedNetworkPolicyName(name),
1577                    )));
1578                }
1579
1580                let owner_privileges = vec![rbac::owner_privilege(
1581                    mz_sql::catalog::ObjectType::NetworkPolicy,
1582                    owner_id,
1583                )];
1584                let default_privileges = state
1585                    .default_privileges
1586                    .get_applicable_privileges(
1587                        owner_id,
1588                        None,
1589                        None,
1590                        mz_sql::catalog::ObjectType::NetworkPolicy,
1591                    )
1592                    .map(|item| item.mz_acl_item(owner_id));
1593                let privileges: Vec<_> =
1594                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1595                        .collect();
1596
1597                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1598                let id = tx.insert_user_network_policy(
1599                    name.clone(),
1600                    rules,
1601                    privileges,
1602                    owner_id,
1603                    &temporary_oids,
1604                )?;
1605
1606                CatalogState::add_to_audit_log(
1607                    &state.system_configuration,
1608                    oracle_write_ts,
1609                    session,
1610                    tx,
1611                    audit_events,
1612                    EventType::Create,
1613                    ObjectType::NetworkPolicy,
1614                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1615                        id: id.to_string(),
1616                        name: name.clone(),
1617                    }),
1618                )?;
1619
1620                info!("created network policy {name} ({id})");
1621            }
1622            Op::Comment {
1623                object_id,
1624                sub_component,
1625                comment,
1626            } => {
1627                tx.update_comment(object_id, sub_component, comment)?;
1628                let entry = state.get_comment_id_entry(&object_id);
1629                let should_log = entry
1630                    .map(|entry| Self::should_audit_log_item(entry.item()))
1631                    // Things that aren't catalog entries can't be temp, so should be logged.
1632                    .unwrap_or(true);
1633                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1634                // comments won't be logged for now.
1635                if let (Some(conn_id), true) =
1636                    (session.map(|session| session.conn_id()), should_log)
1637                {
1638                    CatalogState::add_to_audit_log(
1639                        &state.system_configuration,
1640                        oracle_write_ts,
1641                        session,
1642                        tx,
1643                        audit_events,
1644                        EventType::Comment,
1645                        comment_id_to_audit_object_type(object_id),
1646                        EventDetails::IdNameV1(IdNameV1 {
1647                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1648                            id: format!("{object_id:?}"),
1649                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1650                        }),
1651                    )?;
1652                }
1653            }
1654            Op::UpdateSourceReferences {
1655                source_id,
1656                references,
1657            } => {
1658                tx.update_source_references(
1659                    source_id,
1660                    references
1661                        .references
1662                        .into_iter()
1663                        .map(|reference| reference.into())
1664                        .collect(),
1665                    references.updated_at,
1666                )?;
1667            }
1668            Op::DropObjects(drop_object_infos) => {
1669                // Generate all of the objects that need to get dropped.
1670                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1671
1672                // Drop any associated comments.
1673                tx.drop_comments(&delta.comments)?;
1674
1675                // Drop any items.
1676                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1677                    delta
1678                        .items
1679                        .iter()
1680                        .map(|id| id)
1681                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1682                tx.remove_items(&durable_items_to_drop)?;
1683                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1684                    let entry = state.get_entry(&id);
1685                    (entry.clone().into(), StateDiff::Retraction)
1686                }));
1687
1688                for item_id in delta.items {
1689                    let entry = state.get_entry(&item_id);
1690
1691                    // Drop associated storage collections, unless the dropped item is a
1692                    // replacement, in which case the replacement target owns the storage
1693                    // collection.
1694                    if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1695                    {
1696                        storage_collections_to_drop.extend(entry.global_ids());
1697                    }
1698
1699                    if state.source_references.contains_key(&item_id) {
1700                        tx.remove_source_references(item_id)?;
1701                    }
1702
1703                    if Self::should_audit_log_item(entry.item()) {
1704                        CatalogState::add_to_audit_log(
1705                            &state.system_configuration,
1706                            oracle_write_ts,
1707                            session,
1708                            tx,
1709                            audit_events,
1710                            EventType::Drop,
1711                            catalog_type_to_audit_object_type(entry.item().typ()),
1712                            EventDetails::IdFullNameV1(IdFullNameV1 {
1713                                id: item_id.to_string(),
1714                                name: Self::full_name_detail(&state.resolve_full_name(
1715                                    entry.name(),
1716                                    session.map(|session| session.conn_id()),
1717                                )),
1718                            }),
1719                        )?;
1720                    }
1721                    info!(
1722                        "drop {} {} ({})",
1723                        entry.item_type(),
1724                        state.resolve_full_name(entry.name(), entry.conn_id()),
1725                        item_id
1726                    );
1727                }
1728
1729                // Drop any schemas.
1730                let schemas = delta
1731                    .schemas
1732                    .iter()
1733                    .map(|(schema_spec, database_spec)| {
1734                        (SchemaId::from(schema_spec), *database_spec)
1735                    })
1736                    .collect();
1737                tx.remove_schemas(&schemas)?;
1738
1739                for (schema_spec, database_spec) in delta.schemas {
1740                    let schema = state.get_schema(
1741                        &database_spec,
1742                        &schema_spec,
1743                        session
1744                            .map(|session| session.conn_id())
1745                            .unwrap_or(&SYSTEM_CONN_ID),
1746                    );
1747
1748                    let schema_id = SchemaId::from(schema_spec);
1749                    let database_id = match database_spec {
1750                        ResolvedDatabaseSpecifier::Ambient => None,
1751                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1752                    };
1753
1754                    CatalogState::add_to_audit_log(
1755                        &state.system_configuration,
1756                        oracle_write_ts,
1757                        session,
1758                        tx,
1759                        audit_events,
1760                        EventType::Drop,
1761                        ObjectType::Schema,
1762                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1763                            id: schema_id.to_string(),
1764                            name: schema.name.schema.to_string(),
1765                            database_name: database_id
1766                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1767                        }),
1768                    )?;
1769                }
1770
1771                // Drop any databases.
1772                tx.remove_databases(&delta.databases)?;
1773
1774                for database_id in delta.databases {
1775                    let database = state.get_database(&database_id).clone();
1776
1777                    CatalogState::add_to_audit_log(
1778                        &state.system_configuration,
1779                        oracle_write_ts,
1780                        session,
1781                        tx,
1782                        audit_events,
1783                        EventType::Drop,
1784                        ObjectType::Database,
1785                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1786                            id: database_id.to_string(),
1787                            name: database.name.clone(),
1788                        }),
1789                    )?;
1790                }
1791
1792                // Drop any roles.
1793                tx.remove_user_roles(&delta.roles)?;
1794
1795                for role_id in delta.roles {
1796                    let role = state
1797                        .roles_by_id
1798                        .get(&role_id)
1799                        .expect("catalog out of sync");
1800
1801                    CatalogState::add_to_audit_log(
1802                        &state.system_configuration,
1803                        oracle_write_ts,
1804                        session,
1805                        tx,
1806                        audit_events,
1807                        EventType::Drop,
1808                        ObjectType::Role,
1809                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1810                            id: role.id.to_string(),
1811                            name: role.name.clone(),
1812                        }),
1813                    )?;
1814                    info!("drop role {}", role.name());
1815                }
1816
1817                // Drop any network policies.
1818                tx.remove_network_policies(&delta.network_policies)?;
1819
1820                for network_policy_id in delta.network_policies {
1821                    let policy = state
1822                        .network_policies_by_id
1823                        .get(&network_policy_id)
1824                        .expect("catalog out of sync");
1825
1826                    CatalogState::add_to_audit_log(
1827                        &state.system_configuration,
1828                        oracle_write_ts,
1829                        session,
1830                        tx,
1831                        audit_events,
1832                        EventType::Drop,
1833                        ObjectType::NetworkPolicy,
1834                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1835                            id: policy.id.to_string(),
1836                            name: policy.name.clone(),
1837                        }),
1838                    )?;
1839                    info!("drop network policy {}", policy.name.clone());
1840                }
1841
1842                // Drop any replicas.
1843                let replicas = delta.replicas.keys().copied().collect();
1844                tx.remove_cluster_replicas(&replicas)?;
1845
1846                for (replica_id, (cluster_id, reason)) in delta.replicas {
1847                    let cluster = state.get_cluster(cluster_id);
1848                    let replica = cluster.replica(replica_id).expect("Must exist");
1849
1850                    let (reason, scheduling_policies) = reason.into_audit_log();
1851                    let details =
1852                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1853                            cluster_id: cluster_id.to_string(),
1854                            cluster_name: cluster.name.clone(),
1855                            replica_id: Some(replica_id.to_string()),
1856                            replica_name: replica.name.clone(),
1857                            reason,
1858                            scheduling_policies,
1859                        });
1860                    CatalogState::add_to_audit_log(
1861                        &state.system_configuration,
1862                        oracle_write_ts,
1863                        session,
1864                        tx,
1865                        audit_events,
1866                        EventType::Drop,
1867                        ObjectType::ClusterReplica,
1868                        details,
1869                    )?;
1870                }
1871
1872                // Drop any clusters.
1873                tx.remove_clusters(&delta.clusters)?;
1874
1875                for cluster_id in delta.clusters {
1876                    let cluster = state.get_cluster(cluster_id);
1877
1878                    CatalogState::add_to_audit_log(
1879                        &state.system_configuration,
1880                        oracle_write_ts,
1881                        session,
1882                        tx,
1883                        audit_events,
1884                        EventType::Drop,
1885                        ObjectType::Cluster,
1886                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1887                            id: cluster.id.to_string(),
1888                            name: cluster.name.clone(),
1889                        }),
1890                    )?;
1891                }
1892            }
1893            Op::GrantRole {
1894                role_id,
1895                member_id,
1896                grantor_id,
1897            } => {
1898                state.ensure_not_reserved_role(&member_id)?;
1899                state.ensure_grantable_role(&role_id)?;
1900                if state.collect_role_membership(&role_id).contains(&member_id) {
1901                    let group_role = state.get_role(&role_id);
1902                    let member_role = state.get_role(&member_id);
1903                    return Err(AdapterError::Catalog(Error::new(
1904                        ErrorKind::CircularRoleMembership {
1905                            role_name: group_role.name().to_string(),
1906                            member_name: member_role.name().to_string(),
1907                        },
1908                    )));
1909                }
1910                let mut member_role = state.get_role(&member_id).clone();
1911                member_role.membership.map.insert(role_id, grantor_id);
1912                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1913
1914                CatalogState::add_to_audit_log(
1915                    &state.system_configuration,
1916                    oracle_write_ts,
1917                    session,
1918                    tx,
1919                    audit_events,
1920                    EventType::Grant,
1921                    ObjectType::Role,
1922                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1923                        role_id: role_id.to_string(),
1924                        member_id: member_id.to_string(),
1925                        grantor_id: grantor_id.to_string(),
1926                        executed_by: session
1927                            .map(|session| session.authenticated_role_id())
1928                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1929                            .to_string(),
1930                    }),
1931                )?;
1932            }
1933            Op::RevokeRole {
1934                role_id,
1935                member_id,
1936                grantor_id,
1937            } => {
1938                state.ensure_not_reserved_role(&member_id)?;
1939                state.ensure_grantable_role(&role_id)?;
1940                let mut member_role = state.get_role(&member_id).clone();
1941                member_role.membership.map.remove(&role_id);
1942                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1943
1944                CatalogState::add_to_audit_log(
1945                    &state.system_configuration,
1946                    oracle_write_ts,
1947                    session,
1948                    tx,
1949                    audit_events,
1950                    EventType::Revoke,
1951                    ObjectType::Role,
1952                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1953                        role_id: role_id.to_string(),
1954                        member_id: member_id.to_string(),
1955                        grantor_id: grantor_id.to_string(),
1956                        executed_by: session
1957                            .map(|session| session.authenticated_role_id())
1958                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1959                            .to_string(),
1960                    }),
1961                )?;
1962            }
1963            Op::UpdatePrivilege {
1964                target_id,
1965                privilege,
1966                variant,
1967            } => {
1968                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1969                    UpdatePrivilegeVariant::Grant => {
1970                        privileges.grant(privilege);
1971                    }
1972                    UpdatePrivilegeVariant::Revoke => {
1973                        privileges.revoke(&privilege);
1974                    }
1975                };
1976                match &target_id {
1977                    SystemObjectId::Object(object_id) => match object_id {
1978                        ObjectId::Cluster(id) => {
1979                            let mut cluster = state.get_cluster(*id).clone();
1980                            update_privilege_fn(&mut cluster.privileges);
1981                            tx.update_cluster(*id, cluster.into())?;
1982                        }
1983                        ObjectId::Database(id) => {
1984                            let mut database = state.get_database(id).clone();
1985                            update_privilege_fn(&mut database.privileges);
1986                            tx.update_database(*id, database.into())?;
1987                        }
1988                        ObjectId::NetworkPolicy(id) => {
1989                            let mut policy = state.get_network_policy(id).clone();
1990                            update_privilege_fn(&mut policy.privileges);
1991                            tx.update_network_policy(*id, policy.into())?;
1992                        }
1993                        ObjectId::Schema((database_spec, schema_spec)) => {
1994                            let schema_id = schema_spec.clone().into();
1995                            let mut schema = state
1996                                .get_schema(
1997                                    database_spec,
1998                                    schema_spec,
1999                                    session
2000                                        .map(|session| session.conn_id())
2001                                        .unwrap_or(&SYSTEM_CONN_ID),
2002                                )
2003                                .clone();
2004                            update_privilege_fn(&mut schema.privileges);
2005                            tx.update_schema(schema_id, schema.into())?;
2006                        }
2007                        ObjectId::Item(id) => {
2008                            let entry = state.get_entry(id);
2009                            let mut new_entry = entry.clone();
2010                            update_privilege_fn(&mut new_entry.privileges);
2011                            if !new_entry.item().is_temporary() {
2012                                tx.update_item(*id, new_entry.into())?;
2013                            } else {
2014                                temporary_item_updates
2015                                    .push((entry.clone().into(), StateDiff::Retraction));
2016                                temporary_item_updates
2017                                    .push((new_entry.into(), StateDiff::Addition));
2018                            }
2019                        }
2020                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2021                    },
2022                    SystemObjectId::System => {
2023                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2024                        update_privilege_fn(&mut system_privileges);
2025                        let new_privilege =
2026                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2027                        tx.set_system_privilege(
2028                            privilege.grantee,
2029                            privilege.grantor,
2030                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2031                        )?;
2032                    }
2033                }
2034                let object_type = state.get_system_object_type(&target_id);
2035                let object_id_str = match &target_id {
2036                    SystemObjectId::System => "SYSTEM".to_string(),
2037                    SystemObjectId::Object(id) => id.to_string(),
2038                };
2039                CatalogState::add_to_audit_log(
2040                    &state.system_configuration,
2041                    oracle_write_ts,
2042                    session,
2043                    tx,
2044                    audit_events,
2045                    variant.into(),
2046                    system_object_type_to_audit_object_type(&object_type),
2047                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2048                        object_id: object_id_str,
2049                        grantee_id: privilege.grantee.to_string(),
2050                        grantor_id: privilege.grantor.to_string(),
2051                        privileges: privilege.acl_mode.to_string(),
2052                    }),
2053                )?;
2054            }
2055            Op::UpdateDefaultPrivilege {
2056                privilege_object,
2057                privilege_acl_item,
2058                variant,
2059            } => {
2060                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2061                match variant {
2062                    UpdatePrivilegeVariant::Grant => default_privileges
2063                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2064                    UpdatePrivilegeVariant::Revoke => {
2065                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2066                    }
2067                }
2068                let new_acl_mode = default_privileges
2069                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2070                tx.set_default_privilege(
2071                    privilege_object.role_id,
2072                    privilege_object.database_id,
2073                    privilege_object.schema_id,
2074                    privilege_object.object_type,
2075                    privilege_acl_item.grantee,
2076                    new_acl_mode.cloned(),
2077                )?;
2078                CatalogState::add_to_audit_log(
2079                    &state.system_configuration,
2080                    oracle_write_ts,
2081                    session,
2082                    tx,
2083                    audit_events,
2084                    variant.into(),
2085                    object_type_to_audit_object_type(privilege_object.object_type),
2086                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2087                        role_id: privilege_object.role_id.to_string(),
2088                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2089                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2090                        grantee_id: privilege_acl_item.grantee.to_string(),
2091                        privileges: privilege_acl_item.acl_mode.to_string(),
2092                    }),
2093                )?;
2094            }
2095            Op::RenameCluster {
2096                id,
2097                name,
2098                to_name,
2099                check_reserved_names,
2100            } => {
2101                if id.is_system() {
2102                    return Err(AdapterError::Catalog(Error::new(
2103                        ErrorKind::ReadOnlyCluster(name.clone()),
2104                    )));
2105                }
2106                if check_reserved_names && is_reserved_name(&to_name) {
2107                    return Err(AdapterError::Catalog(Error::new(
2108                        ErrorKind::ReservedClusterName(to_name),
2109                    )));
2110                }
2111                tx.rename_cluster(id, &name, &to_name)?;
2112                CatalogState::add_to_audit_log(
2113                    &state.system_configuration,
2114                    oracle_write_ts,
2115                    session,
2116                    tx,
2117                    audit_events,
2118                    EventType::Alter,
2119                    ObjectType::Cluster,
2120                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2121                        id: id.to_string(),
2122                        old_name: name.clone(),
2123                        new_name: to_name.clone(),
2124                    }),
2125                )?;
2126                info!("rename cluster {name} to {to_name}");
2127            }
2128            Op::RenameClusterReplica {
2129                cluster_id,
2130                replica_id,
2131                name,
2132                to_name,
2133            } => {
2134                if is_reserved_name(&to_name) {
2135                    return Err(AdapterError::Catalog(Error::new(
2136                        ErrorKind::ReservedReplicaName(to_name),
2137                    )));
2138                }
2139                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2140                CatalogState::add_to_audit_log(
2141                    &state.system_configuration,
2142                    oracle_write_ts,
2143                    session,
2144                    tx,
2145                    audit_events,
2146                    EventType::Alter,
2147                    ObjectType::ClusterReplica,
2148                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2149                        cluster_id: cluster_id.to_string(),
2150                        replica_id: replica_id.to_string(),
2151                        old_name: name.replica.as_str().to_string(),
2152                        new_name: to_name.clone(),
2153                    }),
2154                )?;
2155                info!("rename cluster replica {name} to {to_name}");
2156            }
2157            Op::RenameItem {
2158                id,
2159                to_name,
2160                current_full_name,
2161            } => {
2162                let mut updates = Vec::new();
2163
2164                let entry = state.get_entry(&id);
2165                if let CatalogItem::Type(_) = entry.item() {
2166                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2167                        current_full_name.to_string(),
2168                    ))));
2169                }
2170
2171                if entry.id().is_system() {
2172                    let name = state
2173                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2174                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2175                        name.to_string(),
2176                    ))));
2177                }
2178
2179                let mut to_full_name = current_full_name.clone();
2180                to_full_name.item.clone_from(&to_name);
2181
2182                let mut to_qualified_name = entry.name().clone();
2183                to_qualified_name.item.clone_from(&to_name);
2184
2185                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2186                    id: id.to_string(),
2187                    old_name: Self::full_name_detail(&current_full_name),
2188                    new_name: Self::full_name_detail(&to_full_name),
2189                });
2190                if Self::should_audit_log_item(entry.item()) {
2191                    CatalogState::add_to_audit_log(
2192                        &state.system_configuration,
2193                        oracle_write_ts,
2194                        session,
2195                        tx,
2196                        audit_events,
2197                        EventType::Alter,
2198                        catalog_type_to_audit_object_type(entry.item().typ()),
2199                        details,
2200                    )?;
2201                }
2202
2203                // Rename item itself.
2204                let mut new_entry = entry.clone();
2205                new_entry.name.item.clone_from(&to_name);
2206                new_entry.item = entry
2207                    .item()
2208                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2209                    .map_err(|e| {
2210                        Error::new(ErrorKind::from(AmbiguousRename {
2211                            depender: state
2212                                .resolve_full_name(entry.name(), entry.conn_id())
2213                                .to_string(),
2214                            dependee: state
2215                                .resolve_full_name(entry.name(), entry.conn_id())
2216                                .to_string(),
2217                            message: e,
2218                        }))
2219                    })?;
2220
2221                for id in entry.referenced_by() {
2222                    let dependent_item = state.get_entry(id);
2223                    let mut to_entry = dependent_item.clone();
2224                    to_entry.item = dependent_item
2225                        .item()
2226                        .rename_item_refs(
2227                            current_full_name.clone(),
2228                            to_full_name.item.clone(),
2229                            false,
2230                        )
2231                        .map_err(|e| {
2232                            Error::new(ErrorKind::from(AmbiguousRename {
2233                                depender: state
2234                                    .resolve_full_name(
2235                                        dependent_item.name(),
2236                                        dependent_item.conn_id(),
2237                                    )
2238                                    .to_string(),
2239                                dependee: state
2240                                    .resolve_full_name(entry.name(), entry.conn_id())
2241                                    .to_string(),
2242                                message: e,
2243                            }))
2244                        })?;
2245
2246                    if !to_entry.item().is_temporary() {
2247                        tx.update_item(*id, to_entry.into())?;
2248                    } else {
2249                        temporary_item_updates
2250                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2251                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2252                    }
2253                    updates.push(*id);
2254                }
2255                if !new_entry.item().is_temporary() {
2256                    tx.update_item(id, new_entry.into())?;
2257                } else {
2258                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2259                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2260                }
2261
2262                updates.push(id);
2263                for id in updates {
2264                    Self::log_update(state, &id);
2265                }
2266            }
2267            Op::RenameSchema {
2268                database_spec,
2269                schema_spec,
2270                new_name,
2271                check_reserved_names,
2272            } => {
2273                if check_reserved_names && is_reserved_name(&new_name) {
2274                    return Err(AdapterError::Catalog(Error::new(
2275                        ErrorKind::ReservedSchemaName(new_name),
2276                    )));
2277                }
2278
2279                let conn_id = session
2280                    .map(|session| session.conn_id())
2281                    .unwrap_or(&SYSTEM_CONN_ID);
2282
2283                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2284                let cur_name = schema.name().schema.clone();
2285
2286                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2287                    return Err(AdapterError::Catalog(Error::new(
2288                        ErrorKind::AmbientSchemaRename(cur_name),
2289                    )));
2290                };
2291                let database = state.get_database(&database_id);
2292                let database_name = &database.name;
2293
2294                let mut updates = Vec::new();
2295                let mut items_to_update = BTreeMap::new();
2296
2297                let mut update_item = |id| {
2298                    if items_to_update.contains_key(id) {
2299                        return Ok(());
2300                    }
2301
2302                    let entry = state.get_entry(id);
2303
2304                    // Update our item.
2305                    let mut new_entry = entry.clone();
2306                    new_entry.item = entry
2307                        .item
2308                        .rename_schema_refs(database_name, &cur_name, &new_name)
2309                        .map_err(|(s, _i)| {
2310                            Error::new(ErrorKind::from(AmbiguousRename {
2311                                depender: state
2312                                    .resolve_full_name(entry.name(), entry.conn_id())
2313                                    .to_string(),
2314                                dependee: format!("{database_name}.{cur_name}"),
2315                                message: format!("ambiguous reference to schema named {s}"),
2316                            }))
2317                        })?;
2318
2319                    // Queue updates for Catalog storage and Builtin Tables.
2320                    if !new_entry.item().is_temporary() {
2321                        items_to_update.insert(*id, new_entry.into());
2322                    } else {
2323                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2324                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2325                    }
2326                    updates.push(id);
2327
2328                    Ok::<_, AdapterError>(())
2329                };
2330
2331                // Update all of the items in the schema.
2332                for (_name, item_id) in &schema.items {
2333                    // Update the item itself.
2334                    update_item(item_id)?;
2335
2336                    // Update everything that depends on this item.
2337                    for id in state.get_entry(item_id).referenced_by() {
2338                        update_item(id)?;
2339                    }
2340                }
2341                // Note: When updating the transaction it's very important that we update the
2342                // items as a whole group, otherwise we exhibit quadratic behavior.
2343                tx.update_items(items_to_update)?;
2344
2345                // Renaming temporary schemas is not supported.
2346                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2347                    let schema_name = schema.name().schema.clone();
2348                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2349                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2350                    )));
2351                };
2352
2353                // Add an entry to the audit log.
2354                let database_name = database_spec
2355                    .id()
2356                    .map(|id| state.get_database(&id).name.clone());
2357                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2358                    id: schema_id.to_string(),
2359                    old_name: schema.name().schema.clone(),
2360                    new_name: new_name.clone(),
2361                    database_name,
2362                });
2363                CatalogState::add_to_audit_log(
2364                    &state.system_configuration,
2365                    oracle_write_ts,
2366                    session,
2367                    tx,
2368                    audit_events,
2369                    EventType::Alter,
2370                    mz_audit_log::ObjectType::Schema,
2371                    details,
2372                )?;
2373
2374                // Update the schema itself.
2375                let mut new_schema = schema.clone();
2376                new_schema.name.schema.clone_from(&new_name);
2377                tx.update_schema(schema_id, new_schema.into())?;
2378
2379                for id in updates {
2380                    Self::log_update(state, id);
2381                }
2382            }
2383            Op::UpdateOwner { id, new_owner } => {
2384                let conn_id = session
2385                    .map(|session| session.conn_id())
2386                    .unwrap_or(&SYSTEM_CONN_ID);
2387                let old_owner = state
2388                    .get_owner_id(&id, conn_id)
2389                    .expect("cannot update the owner of an object without an owner");
2390                match &id {
2391                    ObjectId::Cluster(id) => {
2392                        let mut cluster = state.get_cluster(*id).clone();
2393                        if id.is_system() {
2394                            return Err(AdapterError::Catalog(Error::new(
2395                                ErrorKind::ReadOnlyCluster(cluster.name),
2396                            )));
2397                        }
2398                        Self::update_privilege_owners(
2399                            &mut cluster.privileges,
2400                            cluster.owner_id,
2401                            new_owner,
2402                        );
2403                        cluster.owner_id = new_owner;
2404                        tx.update_cluster(*id, cluster.into())?;
2405                    }
2406                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2407                        let cluster = state.get_cluster(*cluster_id);
2408                        let mut replica = cluster
2409                            .replica(*replica_id)
2410                            .expect("catalog out of sync")
2411                            .clone();
2412                        if replica_id.is_system() {
2413                            return Err(AdapterError::Catalog(Error::new(
2414                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2415                            )));
2416                        }
2417                        replica.owner_id = new_owner;
2418                        tx.update_cluster_replica(*replica_id, replica.into())?;
2419                    }
2420                    ObjectId::Database(id) => {
2421                        let mut database = state.get_database(id).clone();
2422                        if id.is_system() {
2423                            return Err(AdapterError::Catalog(Error::new(
2424                                ErrorKind::ReadOnlyDatabase(database.name),
2425                            )));
2426                        }
2427                        Self::update_privilege_owners(
2428                            &mut database.privileges,
2429                            database.owner_id,
2430                            new_owner,
2431                        );
2432                        database.owner_id = new_owner;
2433                        tx.update_database(*id, database.clone().into())?;
2434                    }
2435                    ObjectId::Schema((database_spec, schema_spec)) => {
2436                        let schema_id: SchemaId = schema_spec.clone().into();
2437                        let mut schema = state
2438                            .get_schema(database_spec, schema_spec, conn_id)
2439                            .clone();
2440                        if schema_id.is_system() {
2441                            let name = schema.name();
2442                            let full_name = state.resolve_full_schema_name(name);
2443                            return Err(AdapterError::Catalog(Error::new(
2444                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2445                            )));
2446                        }
2447                        Self::update_privilege_owners(
2448                            &mut schema.privileges,
2449                            schema.owner_id,
2450                            new_owner,
2451                        );
2452                        schema.owner_id = new_owner;
2453                        tx.update_schema(schema_id, schema.into())?;
2454                    }
2455                    ObjectId::Item(id) => {
2456                        let entry = state.get_entry(id);
2457                        let mut new_entry = entry.clone();
2458                        if id.is_system() {
2459                            let full_name = state.resolve_full_name(
2460                                new_entry.name(),
2461                                session.map(|session| session.conn_id()),
2462                            );
2463                            return Err(AdapterError::Catalog(Error::new(
2464                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2465                            )));
2466                        }
2467                        Self::update_privilege_owners(
2468                            &mut new_entry.privileges,
2469                            new_entry.owner_id,
2470                            new_owner,
2471                        );
2472                        new_entry.owner_id = new_owner;
2473                        if !new_entry.item().is_temporary() {
2474                            tx.update_item(*id, new_entry.into())?;
2475                        } else {
2476                            temporary_item_updates
2477                                .push((entry.clone().into(), StateDiff::Retraction));
2478                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2479                        }
2480                    }
2481                    ObjectId::NetworkPolicy(id) => {
2482                        let mut policy = state.get_network_policy(id).clone();
2483                        if id.is_system() {
2484                            return Err(AdapterError::Catalog(Error::new(
2485                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2486                            )));
2487                        }
2488                        Self::update_privilege_owners(
2489                            &mut policy.privileges,
2490                            policy.owner_id,
2491                            new_owner,
2492                        );
2493                        policy.owner_id = new_owner;
2494                        tx.update_network_policy(*id, policy.into())?;
2495                    }
2496                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2497                }
2498                let object_type = state.get_object_type(&id);
2499                CatalogState::add_to_audit_log(
2500                    &state.system_configuration,
2501                    oracle_write_ts,
2502                    session,
2503                    tx,
2504                    audit_events,
2505                    EventType::Alter,
2506                    object_type_to_audit_object_type(object_type),
2507                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2508                        object_id: id.to_string(),
2509                        old_owner_id: old_owner.to_string(),
2510                        new_owner_id: new_owner.to_string(),
2511                    }),
2512                )?;
2513            }
2514            Op::UpdateClusterConfig { id, name, config } => {
2515                let mut cluster = state.get_cluster(id).clone();
2516                cluster.config = config;
2517                tx.update_cluster(id, cluster.into())?;
2518                info!("update cluster {}", name);
2519
2520                CatalogState::add_to_audit_log(
2521                    &state.system_configuration,
2522                    oracle_write_ts,
2523                    session,
2524                    tx,
2525                    audit_events,
2526                    EventType::Alter,
2527                    ObjectType::Cluster,
2528                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2529                        id: id.to_string(),
2530                        name,
2531                    }),
2532                )?;
2533            }
2534            Op::UpdateClusterReplicaConfig {
2535                replica_id,
2536                cluster_id,
2537                config,
2538            } => {
2539                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2540                info!("update replica {}", replica.name);
2541                tx.update_cluster_replica(
2542                    replica_id,
2543                    mz_catalog::durable::ClusterReplica {
2544                        cluster_id,
2545                        replica_id,
2546                        name: replica.name.clone(),
2547                        config: config.clone().into(),
2548                        owner_id: replica.owner_id,
2549                    },
2550                )?;
2551            }
2552            Op::UpdateItem { id, name, to_item } => {
2553                let mut entry = state.get_entry(&id).clone();
2554                entry.name = name.clone();
2555                entry.item = to_item.clone();
2556                tx.update_item(id, entry.into())?;
2557
2558                if Self::should_audit_log_item(&to_item) {
2559                    let mut full_name = Self::full_name_detail(
2560                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2561                    );
2562                    full_name.item = name.item;
2563
2564                    CatalogState::add_to_audit_log(
2565                        &state.system_configuration,
2566                        oracle_write_ts,
2567                        session,
2568                        tx,
2569                        audit_events,
2570                        EventType::Alter,
2571                        catalog_type_to_audit_object_type(to_item.typ()),
2572                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2573                            id: id.to_string(),
2574                            name: full_name,
2575                        }),
2576                    )?;
2577                }
2578
2579                Self::log_update(state, &id);
2580            }
2581            Op::UpdateSystemConfiguration { name, value } => {
2582                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2583                tx.upsert_system_config(&name, parsed_value.clone())?;
2584                // This mirrors some "system vars" into the catalog storage
2585                // "config" collection so that we can toggle the flag with
2586                // Launch Darkly, but use it in boot before Launch Darkly is
2587                // available.
2588                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2589                    let with_0dt_deployment_max_wait =
2590                        Duration::parse(VarInput::Flat(&parsed_value))
2591                            .expect("parsing succeeded above");
2592                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2593                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2594                    let with_0dt_deployment_ddl_check_interval =
2595                        Duration::parse(VarInput::Flat(&parsed_value))
2596                            .expect("parsing succeeded above");
2597                    tx.set_0dt_deployment_ddl_check_interval(
2598                        with_0dt_deployment_ddl_check_interval,
2599                    )?;
2600                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2601                    let panic_after_timeout =
2602                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2603                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2604                }
2605
2606                CatalogState::add_to_audit_log(
2607                    &state.system_configuration,
2608                    oracle_write_ts,
2609                    session,
2610                    tx,
2611                    audit_events,
2612                    EventType::Alter,
2613                    ObjectType::System,
2614                    EventDetails::SetV1(mz_audit_log::SetV1 {
2615                        name,
2616                        value: Some(value.borrow().to_vec().join(", ")),
2617                    }),
2618                )?;
2619            }
2620            Op::ResetSystemConfiguration { name } => {
2621                tx.remove_system_config(&name);
2622                // This mirrors some "system vars" into the catalog storage
2623                // "config" collection so that we can toggle the flag with
2624                // Launch Darkly, but use it in boot before Launch Darkly is
2625                // available.
2626                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2627                    tx.reset_0dt_deployment_max_wait()?;
2628                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2629                    tx.reset_0dt_deployment_ddl_check_interval()?;
2630                }
2631
2632                CatalogState::add_to_audit_log(
2633                    &state.system_configuration,
2634                    oracle_write_ts,
2635                    session,
2636                    tx,
2637                    audit_events,
2638                    EventType::Alter,
2639                    ObjectType::System,
2640                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2641                )?;
2642            }
2643            Op::ResetAllSystemConfiguration => {
2644                tx.clear_system_configs();
2645                tx.reset_0dt_deployment_max_wait()?;
2646                tx.reset_0dt_deployment_ddl_check_interval()?;
2647
2648                CatalogState::add_to_audit_log(
2649                    &state.system_configuration,
2650                    oracle_write_ts,
2651                    session,
2652                    tx,
2653                    audit_events,
2654                    EventType::Alter,
2655                    ObjectType::System,
2656                    EventDetails::ResetAllV1,
2657                )?;
2658            }
2659            Op::WeirdStorageUsageUpdates {
2660                object_id,
2661                size_bytes,
2662                collection_timestamp,
2663            } => {
2664                let id = tx.allocate_storage_usage_ids()?;
2665                let metric =
2666                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2667                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2668                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2669                weird_builtin_table_update = Some(builtin_table_update);
2670            }
2671        };
2672        Ok((weird_builtin_table_update, temporary_item_updates))
2673    }
2674
2675    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2676        let entry = state.get_entry(id);
2677        info!(
2678            "update {} {} ({})",
2679            entry.item_type(),
2680            state.resolve_full_name(entry.name(), entry.conn_id()),
2681            id
2682        );
2683    }
2684
2685    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2686    /// implementation:
2687    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2688    fn update_privilege_owners(
2689        privileges: &mut PrivilegeMap,
2690        old_owner: RoleId,
2691        new_owner: RoleId,
2692    ) {
2693        // TODO(jkosh44) Would be nice not to clone every privilege.
2694        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2695
2696        let mut new_present = false;
2697        for privilege in flat_privileges.iter_mut() {
2698            // Old owner's granted privilege are updated to be granted by the new
2699            // owner.
2700            if privilege.grantor == old_owner {
2701                privilege.grantor = new_owner;
2702            } else if privilege.grantor == new_owner {
2703                new_present = true;
2704            }
2705            // Old owner's privileges is given to the new owner.
2706            if privilege.grantee == old_owner {
2707                privilege.grantee = new_owner;
2708            } else if privilege.grantee == new_owner {
2709                new_present = true;
2710            }
2711        }
2712
2713        // If the old privilege list contained references to the new owner, we may
2714        // have created duplicate entries. Here we try and consolidate them. This
2715        // is inspired by PostgreSQL's algorithm but not identical.
2716        if new_present {
2717            // Group privileges by (grantee, grantor).
2718            let privilege_map: BTreeMap<_, Vec<_>> =
2719                flat_privileges
2720                    .into_iter()
2721                    .fold(BTreeMap::new(), |mut accum, privilege| {
2722                        accum
2723                            .entry((privilege.grantee, privilege.grantor))
2724                            .or_default()
2725                            .push(privilege);
2726                        accum
2727                    });
2728
2729            // Consolidate and update all privileges.
2730            flat_privileges = privilege_map
2731                .into_iter()
2732                .map(|((grantee, grantor), values)|
2733                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2734                    values.into_iter().fold(
2735                        MzAclItem::empty(grantee, grantor),
2736                        |mut accum, mz_aclitem| {
2737                            accum.acl_mode =
2738                                accum.acl_mode.union(mz_aclitem.acl_mode);
2739                            accum
2740                        },
2741                    ))
2742                .collect();
2743        }
2744
2745        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2746    }
2747}
2748
2749/// Prepare the given transaction for replacing a catalog item with a new version.
2750///
2751/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2752/// dependent objects to refer to that new ID (at a previous version).
2753///
2754/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2755/// for catalog items. We currently think that there are no use cases that require this assumption,
2756/// but no way to know for sure.
2757fn tx_replace_item(
2758    tx: &mut Transaction<'_>,
2759    state: &CatalogState,
2760    id: CatalogItemId,
2761    new_entry: CatalogEntry,
2762) -> Result<(), AdapterError> {
2763    let new_id = new_entry.id;
2764
2765    // Rewrite dependent objects to point to the new ID.
2766    for use_id in new_entry.referenced_by() {
2767        // The dependent might be dropped in the same tx, so check.
2768        if tx.get_item(use_id).is_none() {
2769            continue;
2770        }
2771
2772        let mut dependent = state.get_entry(use_id).clone();
2773        dependent.item = dependent.item.replace_item_refs(id, new_id);
2774        tx.update_item(*use_id, dependent.into())?;
2775    }
2776
2777    // Move comments to the new ID.
2778    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2779    let new_comment_id = new_entry.comment_object_id();
2780    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2781        tx.drop_comments(&[old_comment_id].into())?;
2782        for (sub, comment) in comments {
2783            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2784        }
2785    }
2786
2787    let mz_catalog::durable::Item {
2788        id: _,
2789        oid,
2790        global_id,
2791        schema_id,
2792        name,
2793        create_sql,
2794        owner_id,
2795        privileges,
2796        extra_versions,
2797    } = new_entry.into();
2798
2799    tx.remove_item(id)?;
2800    tx.insert_item(
2801        new_id,
2802        oid,
2803        global_id,
2804        schema_id,
2805        &name,
2806        create_sql,
2807        owner_id,
2808        privileges,
2809        extra_versions,
2810    )?;
2811
2812    Ok(())
2813}
2814
2815/// Generate audit events for a replacement apply operation.
2816fn apply_replacement_audit_events(
2817    state: &CatalogState,
2818    target: &CatalogEntry,
2819    replacement: &CatalogEntry,
2820) -> Vec<(EventType, EventDetails)> {
2821    let mut events = Vec::new();
2822
2823    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2824    let target_id_name = IdFullNameV1 {
2825        id: target.id().to_string(),
2826        name: Catalog::full_name_detail(&target_name),
2827    };
2828    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2829    let replacement_id_name = IdFullNameV1 {
2830        id: replacement.id().to_string(),
2831        name: Catalog::full_name_detail(&replacement_name),
2832    };
2833
2834    if Catalog::should_audit_log_item(&replacement.item) {
2835        events.push((
2836            EventType::Drop,
2837            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2838        ));
2839    }
2840
2841    if Catalog::should_audit_log_item(&target.item) {
2842        events.push((
2843            EventType::Alter,
2844            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2845                target: target_id_name.clone(),
2846                replacement: replacement_id_name,
2847            }),
2848        ));
2849
2850        if let Some(old_cluster_id) = target.cluster_id()
2851            && let Some(new_cluster_id) = replacement.cluster_id()
2852            && old_cluster_id != new_cluster_id
2853        {
2854            // When the replacement is applied, the target takes on the ID of the replacement, so
2855            // we should use that ID for subsequent events.
2856            events.push((
2857                EventType::Alter,
2858                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2859                    id: replacement.id().to_string(),
2860                    name: target_id_name.name,
2861                    old_cluster_id: old_cluster_id.to_string(),
2862                    new_cluster_id: new_cluster_id.to_string(),
2863                }),
2864            ));
2865        }
2866    }
2867
2868    events
2869}
2870
2871/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2872///
2873/// Note: Previously we used to omit a single `Op::DropObject` for every object
2874/// we needed to drop. But removing a batch of objects from a durable Catalog
2875/// Transaction is O(n) where `n` is the number of objects that exist in the
2876/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2877/// `DROP ... CASCADE` statement.
2878#[derive(Debug, Default)]
2879pub(crate) struct ObjectsToDrop {
2880    pub comments: BTreeSet<CommentObjectId>,
2881    pub databases: BTreeSet<DatabaseId>,
2882    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2883    pub clusters: BTreeSet<ClusterId>,
2884    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2885    pub roles: BTreeSet<RoleId>,
2886    pub items: Vec<CatalogItemId>,
2887    pub network_policies: BTreeSet<NetworkPolicyId>,
2888}
2889
2890impl ObjectsToDrop {
2891    pub fn generate(
2892        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2893        state: &CatalogState,
2894        session: Option<&ConnMeta>,
2895    ) -> Result<Self, AdapterError> {
2896        let mut delta = ObjectsToDrop::default();
2897
2898        for drop_object_info in drop_object_infos {
2899            delta.add_item(drop_object_info, state, session)?;
2900        }
2901
2902        Ok(delta)
2903    }
2904
2905    fn add_item(
2906        &mut self,
2907        drop_object_info: DropObjectInfo,
2908        state: &CatalogState,
2909        session: Option<&ConnMeta>,
2910    ) -> Result<(), AdapterError> {
2911        self.comments
2912            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2913
2914        match drop_object_info {
2915            DropObjectInfo::Database(database_id) => {
2916                let database = &state.database_by_id[&database_id];
2917                if database_id.is_system() {
2918                    return Err(AdapterError::Catalog(Error::new(
2919                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2920                    )));
2921                }
2922
2923                self.databases.insert(database_id);
2924            }
2925            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2926                let schema = state.get_schema(
2927                    &database_spec,
2928                    &schema_spec,
2929                    session
2930                        .map(|session| session.conn_id())
2931                        .unwrap_or(&SYSTEM_CONN_ID),
2932                );
2933                let schema_id: SchemaId = schema_spec.into();
2934                if schema_id.is_system() {
2935                    let name = schema.name();
2936                    let full_name = state.resolve_full_schema_name(name);
2937                    return Err(AdapterError::Catalog(Error::new(
2938                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2939                    )));
2940                }
2941
2942                self.schemas.insert(schema_spec, database_spec);
2943            }
2944            DropObjectInfo::Role(role_id) => {
2945                let name = state.get_role(&role_id).name().to_string();
2946                if role_id.is_system() || role_id.is_predefined() {
2947                    return Err(AdapterError::Catalog(Error::new(
2948                        ErrorKind::ReservedRoleName(name.clone()),
2949                    )));
2950                }
2951                state.ensure_not_reserved_role(&role_id)?;
2952
2953                self.roles.insert(role_id);
2954            }
2955            DropObjectInfo::Cluster(cluster_id) => {
2956                let cluster = state.get_cluster(cluster_id);
2957                let name = &cluster.name;
2958                if cluster_id.is_system() {
2959                    return Err(AdapterError::Catalog(Error::new(
2960                        ErrorKind::ReadOnlyCluster(name.clone()),
2961                    )));
2962                }
2963
2964                self.clusters.insert(cluster_id);
2965            }
2966            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2967                let cluster = state.get_cluster(cluster_id);
2968                let replica = cluster.replica(replica_id).expect("Must exist");
2969
2970                self.replicas
2971                    .insert(replica.replica_id, (cluster.id, reason));
2972
2973                // Implicitly drop materialized views that target this replica.
2974                // When the target replica is gone, no replica advances the
2975                // persist shard's upper frontier, causing reads to hang.
2976                for (_id, entry) in state.get_entries() {
2977                    if let CatalogItem::MaterializedView(mv) = entry.item() {
2978                        if mv.target_replica == Some(replica_id)
2979                            && !self.items.contains(&entry.id())
2980                        {
2981                            tracing::warn!(
2982                                "implicitly dropping materialized view {} because target replica was dropped",
2983                                entry.name().item,
2984                            );
2985                            self.items.push(entry.id());
2986                        }
2987                    }
2988                }
2989            }
2990            DropObjectInfo::Item(item_id) => {
2991                let entry = state.get_entry(&item_id);
2992                if item_id.is_system() {
2993                    let name = entry.name();
2994                    let full_name =
2995                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2996                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2997                        full_name.to_string(),
2998                    ))));
2999                }
3000
3001                self.items.push(item_id);
3002            }
3003            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3004                let policy = state.get_network_policy(&network_policy_id);
3005                let name = &policy.name;
3006                if network_policy_id.is_system() {
3007                    return Err(AdapterError::Catalog(Error::new(
3008                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3009                    )));
3010                }
3011
3012                self.network_policies.insert(network_policy_id);
3013            }
3014        }
3015
3016        Ok(())
3017    }
3018}
3019
3020#[cfg(test)]
3021mod tests {
3022    use mz_catalog::SYSTEM_CONN_ID;
3023    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3024    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3025    use mz_repr::role_id::RoleId;
3026    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3027    use mz_sql::DEFAULT_SCHEMA;
3028    use mz_sql::catalog::CatalogDatabase;
3029    use mz_sql::names::{
3030        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3031    };
3032    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3033
3034    use crate::catalog::{Catalog, Op};
3035    use crate::session::DEFAULT_DATABASE_NAME;
3036
3037    #[mz_ore::test]
3038    fn test_update_privilege_owners() {
3039        let old_owner = RoleId::User(1);
3040        let new_owner = RoleId::User(2);
3041        let other_role = RoleId::User(3);
3042
3043        // older owner exists as grantor.
3044        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3045            MzAclItem {
3046                grantee: other_role,
3047                grantor: old_owner,
3048                acl_mode: AclMode::UPDATE,
3049            },
3050            MzAclItem {
3051                grantee: other_role,
3052                grantor: new_owner,
3053                acl_mode: AclMode::SELECT,
3054            },
3055        ]);
3056        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3057        assert_eq!(1, privileges.all_values().count());
3058        assert_eq!(
3059            vec![MzAclItem {
3060                grantee: other_role,
3061                grantor: new_owner,
3062                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3063            }],
3064            privileges.all_values_owned().collect::<Vec<_>>()
3065        );
3066
3067        // older owner exists as grantee.
3068        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3069            MzAclItem {
3070                grantee: old_owner,
3071                grantor: other_role,
3072                acl_mode: AclMode::UPDATE,
3073            },
3074            MzAclItem {
3075                grantee: new_owner,
3076                grantor: other_role,
3077                acl_mode: AclMode::SELECT,
3078            },
3079        ]);
3080        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3081        assert_eq!(1, privileges.all_values().count());
3082        assert_eq!(
3083            vec![MzAclItem {
3084                grantee: new_owner,
3085                grantor: other_role,
3086                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3087            }],
3088            privileges.all_values_owned().collect::<Vec<_>>()
3089        );
3090
3091        // older owner exists as grantee and grantor.
3092        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3093            MzAclItem {
3094                grantee: old_owner,
3095                grantor: old_owner,
3096                acl_mode: AclMode::UPDATE,
3097            },
3098            MzAclItem {
3099                grantee: new_owner,
3100                grantor: new_owner,
3101                acl_mode: AclMode::SELECT,
3102            },
3103        ]);
3104        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3105        assert_eq!(1, privileges.all_values().count());
3106        assert_eq!(
3107            vec![MzAclItem {
3108                grantee: new_owner,
3109                grantor: new_owner,
3110                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3111            }],
3112            privileges.all_values_owned().collect::<Vec<_>>()
3113        );
3114    }
3115
3116    /// Verifies that `transact_incremental_dry_run` processes only new ops
3117    /// against the accumulated state, not all ops from scratch. Two paths are
3118    /// compared:
3119    ///   - Incremental: two separate calls, each with one op
3120    ///   - All-at-once: one call with both ops
3121    /// Both must produce equivalent catalog state.
3122    #[mz_ore::test(tokio::test)]
3123    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3124    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3125        Catalog::with_debug(|catalog| async move {
3126            // Resolve default database and schema.
3127            let database = catalog
3128                .resolve_database(DEFAULT_DATABASE_NAME)
3129                .expect("default database");
3130            let database_name = database.name.clone();
3131            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3132            let schema = catalog
3133                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3134                .expect("default schema");
3135            let schema_name = schema.name.schema.clone();
3136            let schema_spec = schema.id.clone();
3137
3138            // Allocate IDs for two tables.
3139            let (id_t1, global_id_t1) = catalog
3140                .allocate_user_id_for_test()
3141                .await
3142                .expect("allocate id for t1");
3143            let (id_t2, global_id_t2) = catalog
3144                .allocate_user_id_for_test()
3145                .await
3146                .expect("allocate id for t2");
3147
3148            let oracle_write_ts = catalog.current_upper().await;
3149
3150            // Build two CreateItem ops.
3151            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3152                id,
3153                name: QualifiedItemName {
3154                    qualifiers: ItemQualifiers {
3155                        database_spec: database_spec.clone(),
3156                        schema_spec: schema_spec.clone(),
3157                    },
3158                    item: name.to_string(),
3159                },
3160                item: CatalogItem::Table(Table {
3161                    create_sql: Some(format!(
3162                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3163                    )),
3164                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3165                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3166                    conn_id: None,
3167                    resolved_ids: ResolvedIds::empty(),
3168                    custom_logical_compaction_window: None,
3169                    is_retained_metrics_object: false,
3170                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3171                }),
3172                owner_id: MZ_SYSTEM_ROLE_ID,
3173            };
3174
3175            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3176            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3177
3178            let base_state = catalog.state().clone();
3179
3180            // --- Path A: Incremental (two separate dry-run calls) ---
3181
3182            // First call: only op_t1, no previous snapshot.
3183            let (state_after_t1, snapshot_after_t1) = catalog
3184                .transact_incremental_dry_run(
3185                    &base_state,
3186                    vec![op_t1.clone()],
3187                    None,
3188                    None,
3189                    oracle_write_ts,
3190                )
3191                .await
3192                .expect("first dry run");
3193
3194            // After first dry run: t1 exists, t2 does not.
3195            assert!(
3196                state_after_t1.try_get_entry(&id_t1).is_some(),
3197                "t1 should exist after first dry run"
3198            );
3199            assert_eq!(
3200                state_after_t1
3201                    .try_get_entry(&id_t1)
3202                    .expect("t1 entry")
3203                    .name()
3204                    .item,
3205                "t1"
3206            );
3207            assert!(
3208                state_after_t1.try_get_entry(&id_t2).is_none(),
3209                "t2 should NOT exist after first dry run"
3210            );
3211
3212            // Second call: only op_t2, using state/snapshot from first call.
3213            let (state_incremental, _) = catalog
3214                .transact_incremental_dry_run(
3215                    &state_after_t1,
3216                    vec![op_t2.clone()],
3217                    None,
3218                    Some(snapshot_after_t1),
3219                    oracle_write_ts,
3220                )
3221                .await
3222                .expect("second dry run");
3223
3224            // After second dry run: both t1 and t2 exist.
3225            assert!(
3226                state_incremental.try_get_entry(&id_t1).is_some(),
3227                "t1 should exist in incremental result"
3228            );
3229            assert!(
3230                state_incremental.try_get_entry(&id_t2).is_some(),
3231                "t2 should exist in incremental result"
3232            );
3233
3234            // --- Path B: All-at-once (single dry-run call with both ops) ---
3235
3236            let (state_all_at_once, _) = catalog
3237                .transact_incremental_dry_run(
3238                    &base_state,
3239                    vec![op_t1.clone(), op_t2.clone()],
3240                    None,
3241                    None,
3242                    oracle_write_ts,
3243                )
3244                .await
3245                .expect("all-at-once dry run");
3246
3247            assert!(
3248                state_all_at_once.try_get_entry(&id_t1).is_some(),
3249                "t1 should exist in all-at-once result"
3250            );
3251            assert!(
3252                state_all_at_once.try_get_entry(&id_t2).is_some(),
3253                "t2 should exist in all-at-once result"
3254            );
3255
3256            // --- Compare: both paths produce equivalent items ---
3257
3258            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3259            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3260            assert_eq!(inc_t1.name(), all_t1.name());
3261            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3262
3263            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3264            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3265            assert_eq!(inc_t2.name(), all_t2.name());
3266            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3267
3268            catalog.expire().await;
3269        })
3270        .await
3271    }
3272}