Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
51    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
52    DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
53    RoleVars,
54};
55use mz_sql::names::{
56    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
57    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{NetworkPolicyRule, PlanError};
60use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
61use mz_sql::session::vars::OwnedVarInput;
62use mz_sql::session::vars::{Value as VarValue, VarInput};
63use mz_sql::{DEFAULT_SCHEMA, rbac};
64use mz_sql_parser::ast::{QualifiedReplica, Value};
65use mz_storage_client::storage_collections::StorageCollections;
66use serde::{Deserialize, Serialize};
67use tracing::{info, trace};
68
69use crate::AdapterError;
70use crate::catalog::state::LocalExpressionCache;
71use crate::catalog::{
72    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
73    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
74    is_reserved_role_name, object_type_to_audit_object_type,
75    system_object_type_to_audit_object_type,
76};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82/// A manually injected audit event.
83///
84/// Matches [`mz_audit_log::EventV1`], but without the `id` and `occurred_at` fields -- both are
85/// filled in automatically.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88    pub event_type: EventType,
89    pub object_type: ObjectType,
90    pub details: EventDetails,
91    pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96    AlterRetainHistory {
97        id: CatalogItemId,
98        value: Option<Value>,
99        window: CompactionWindow,
100    },
101    AlterSourceTimestampInterval {
102        id: CatalogItemId,
103        value: Option<Value>,
104        interval: Duration,
105    },
106    AlterRole {
107        id: RoleId,
108        name: String,
109        attributes: RoleAttributesRaw,
110        nopassword: bool,
111        vars: RoleVars,
112    },
113    AlterNetworkPolicy {
114        id: NetworkPolicyId,
115        rules: Vec<NetworkPolicyRule>,
116        name: String,
117        owner_id: RoleId,
118    },
119    AlterAddColumn {
120        id: CatalogItemId,
121        new_global_id: GlobalId,
122        name: ColumnName,
123        typ: SqlColumnType,
124        sql: RawDataType,
125    },
126    AlterMaterializedViewApplyReplacement {
127        id: CatalogItemId,
128        replacement_id: CatalogItemId,
129    },
130    CreateDatabase {
131        name: String,
132        owner_id: RoleId,
133    },
134    CreateSchema {
135        database_id: ResolvedDatabaseSpecifier,
136        schema_name: String,
137        owner_id: RoleId,
138    },
139    CreateRole {
140        name: String,
141        attributes: RoleAttributesRaw,
142    },
143    CreateCluster {
144        id: ClusterId,
145        name: String,
146        introspection_sources: Vec<&'static BuiltinLog>,
147        owner_id: RoleId,
148        config: ClusterConfig,
149    },
150    CreateClusterReplica {
151        cluster_id: ClusterId,
152        name: String,
153        config: ReplicaConfig,
154        owner_id: RoleId,
155        reason: ReplicaCreateDropReason,
156    },
157    CreateItem {
158        id: CatalogItemId,
159        name: QualifiedItemName,
160        item: CatalogItem,
161        owner_id: RoleId,
162    },
163    CreateNetworkPolicy {
164        rules: Vec<NetworkPolicyRule>,
165        name: String,
166        owner_id: RoleId,
167    },
168    Comment {
169        object_id: CommentObjectId,
170        sub_component: Option<usize>,
171        comment: Option<String>,
172    },
173    DropObjects(Vec<DropObjectInfo>),
174    GrantRole {
175        role_id: RoleId,
176        member_id: RoleId,
177        grantor_id: RoleId,
178    },
179    RenameCluster {
180        id: ClusterId,
181        name: String,
182        to_name: String,
183        check_reserved_names: bool,
184    },
185    RenameClusterReplica {
186        cluster_id: ClusterId,
187        replica_id: ReplicaId,
188        name: QualifiedReplica,
189        to_name: String,
190    },
191    RenameItem {
192        id: CatalogItemId,
193        current_full_name: FullItemName,
194        to_name: String,
195    },
196    RenameSchema {
197        database_spec: ResolvedDatabaseSpecifier,
198        schema_spec: SchemaSpecifier,
199        new_name: String,
200        check_reserved_names: bool,
201    },
202    UpdateOwner {
203        id: ObjectId,
204        new_owner: RoleId,
205    },
206    UpdatePrivilege {
207        target_id: SystemObjectId,
208        privilege: MzAclItem,
209        variant: UpdatePrivilegeVariant,
210    },
211    UpdateDefaultPrivilege {
212        privilege_object: DefaultPrivilegeObject,
213        privilege_acl_item: DefaultPrivilegeAclItem,
214        variant: UpdatePrivilegeVariant,
215    },
216    RevokeRole {
217        role_id: RoleId,
218        member_id: RoleId,
219        grantor_id: RoleId,
220    },
221    UpdateClusterConfig {
222        id: ClusterId,
223        name: String,
224        config: ClusterConfig,
225    },
226    UpdateClusterReplicaConfig {
227        cluster_id: ClusterId,
228        replica_id: ReplicaId,
229        config: ReplicaConfig,
230    },
231    UpdateItem {
232        id: CatalogItemId,
233        name: QualifiedItemName,
234        to_item: CatalogItem,
235    },
236    UpdateSourceReferences {
237        source_id: CatalogItemId,
238        references: SourceReferences,
239    },
240    UpdateSystemConfiguration {
241        name: String,
242        value: OwnedVarInput,
243    },
244    ResetSystemConfiguration {
245        name: String,
246    },
247    ResetAllSystemConfiguration,
248    /// Performs updates to the storage usage table, which probably should be a builtin source.
249    ///
250    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
251    /// might not work. If a process crashes after collecting storage usage events
252    /// but before updating the builtin table, then another listening catalog
253    /// will never know to update the builtin table.
254    WeirdStorageUsageUpdates {
255        object_id: Option<String>,
256        size_bytes: u64,
257        collection_timestamp: EpochMillis,
258    },
259    /// Injects audit events into the catalog.
260    ///
261    /// This is a nonstandard path used for manually appending audit events at the current time.
262    /// Mainly useful for correcting the audit log in the face of bugs that made us forget to emit
263    /// audit events.
264    InjectAuditEvents {
265        events: Vec<InjectedAuditEvent>,
266    },
267}
268
269/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
270/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
271/// the `Op::DropObjects`.
272#[derive(Debug, Clone)]
273pub enum DropObjectInfo {
274    Cluster(ClusterId),
275    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
276    Database(DatabaseId),
277    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
278    Role(RoleId),
279    Item(CatalogItemId),
280    NetworkPolicy(NetworkPolicyId),
281}
282
283impl DropObjectInfo {
284    /// Creates a `DropObjectInfo` from an `ObjectId`.
285    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
286    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
287        match id {
288            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
289            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
290                cluster_id,
291                replica_id,
292                ReplicaCreateDropReason::Manual,
293            )),
294            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
295            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
296            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
297            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
298            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
299        }
300    }
301
302    /// Creates an `ObjectId` from a `DropObjectInfo`.
303    /// Loses the `ReplicaCreateDropReason` if there is one!
304    fn to_object_id(&self) -> ObjectId {
305        match &self {
306            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
307            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
308                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
309            }
310            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
311            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
312            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
313            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
314            DropObjectInfo::NetworkPolicy(network_policy_id) => {
315                ObjectId::NetworkPolicy(network_policy_id.clone())
316            }
317        }
318    }
319}
320
321/// The reason for creating or dropping a replica.
322#[derive(Debug, Clone)]
323pub enum ReplicaCreateDropReason {
324    /// The user initiated the replica create or drop, e.g., by
325    /// - creating/dropping a cluster,
326    /// - ALTERing various options on a managed cluster,
327    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
328    Manual,
329    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
330    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
331    ClusterScheduling(Vec<SchedulingDecision>),
332}
333
334impl ReplicaCreateDropReason {
335    pub fn into_audit_log(
336        self,
337    ) -> (
338        CreateOrDropClusterReplicaReasonV1,
339        Option<SchedulingDecisionsWithReasonsV2>,
340    ) {
341        let (reason, scheduling_policies) = match self {
342            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
343            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
344                CreateOrDropClusterReplicaReasonV1::Schedule,
345                Some(scheduling_decisions),
346            ),
347        };
348        (
349            reason,
350            scheduling_policies
351                .as_ref()
352                .map(SchedulingDecision::reasons_to_audit_log_reasons),
353        )
354    }
355}
356
357pub struct TransactionResult {
358    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
359    /// Parsed catalog updates from which we will derive catalog implications.
360    pub catalog_updates: Vec<ParsedStateUpdate>,
361    pub audit_events: Vec<VersionedEvent>,
362}
363
364#[derive(Debug, Clone, Copy)]
365enum TransactInnerMode {
366    /// Prepare storage state and return a commit-ready transaction state.
367    ///
368    /// This mode is used by durable catalog transactions.
369    Commit,
370    /// Execute a dry run against a durable transaction that will not be
371    /// committed.
372    ///
373    /// This mode still validates and applies updates to an in-memory
374    /// `CatalogState`, but it must not call `prepare_state` because dry runs
375    /// must not trigger controller side effects.
376    DryRun,
377}
378
379impl Catalog {
380    fn should_audit_log_item(item: &CatalogItem) -> bool {
381        !item.is_temporary()
382    }
383
384    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
385    /// within a connection id.
386    fn temporary_ids(
387        &self,
388        ops: &[Op],
389        temporary_drops: BTreeSet<(&ConnectionId, String)>,
390    ) -> Result<BTreeSet<CatalogItemId>, Error> {
391        let mut creating = BTreeSet::new();
392        let mut temporary_ids = BTreeSet::new();
393        for op in ops.iter() {
394            if let Op::CreateItem {
395                id,
396                name,
397                item,
398                owner_id: _,
399            } = op
400            {
401                if let Some(conn_id) = item.conn_id() {
402                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
403                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
404                        || creating.contains(&(conn_id, &name.item))
405                    {
406                        return Err(
407                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
408                        );
409                    } else {
410                        creating.insert((conn_id, &name.item));
411                        temporary_ids.insert(id.clone());
412                    }
413                }
414            }
415        }
416        Ok(temporary_ids)
417    }
418
419    #[instrument(name = "catalog::transact")]
420    pub async fn transact(
421        &mut self,
422        // n.b. this is an option to prevent us from needing to build out a
423        // dummy impl of `StorageController` for tests.
424        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
425        oracle_write_ts: mz_repr::Timestamp,
426        session: Option<&ConnMeta>,
427        ops: Vec<Op>,
428    ) -> Result<TransactionResult, AdapterError> {
429        trace!("transact: {:?}", ops);
430        fail::fail_point!("catalog_transact", |arg| {
431            Err(AdapterError::Unstructured(anyhow::anyhow!(
432                "failpoint: {arg:?}"
433            )))
434        });
435
436        let drop_ids: BTreeSet<CatalogItemId> = ops
437            .iter()
438            .filter_map(|op| match op {
439                Op::DropObjects(drop_object_infos) => {
440                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
441                    let item_ids = ids.filter_map(|id| match id {
442                        ObjectId::Item(id) => Some(id),
443                        _ => None,
444                    });
445                    Some(item_ids)
446                }
447                _ => None,
448            })
449            .flatten()
450            .collect();
451        let temporary_drops = drop_ids
452            .iter()
453            .filter_map(|id| {
454                let entry = self.get_entry(id);
455                match entry.item().conn_id() {
456                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
457                    None => None,
458                }
459            })
460            .collect();
461
462        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
463        let mut builtin_table_updates = vec![];
464        let mut catalog_updates = vec![];
465        let mut audit_events = vec![];
466        let mut storage = self.storage().await;
467        let mut tx = storage
468            .transaction()
469            .await
470            .unwrap_or_terminate("starting catalog transaction");
471
472        let new_state = Self::transact_inner(
473            TransactInnerMode::Commit,
474            storage_collections,
475            oracle_write_ts,
476            session,
477            ops,
478            temporary_ids,
479            &mut builtin_table_updates,
480            &mut catalog_updates,
481            &mut audit_events,
482            &mut tx,
483            &self.state,
484        )
485        .await?;
486
487        // The user closure was successful, apply the updates. Terminate the
488        // process if this fails, because we have to restart envd due to
489        // indeterminate catalog state, which we only reconcile during catalog
490        // init.
491        tx.commit(oracle_write_ts)
492            .await
493            .unwrap_or_terminate("catalog storage transaction commit must succeed");
494
495        // Dropping here keeps the mutable borrow on self, preventing us accidentally
496        // mutating anything until after f is executed.
497        drop(storage);
498        if let Some(new_state) = new_state {
499            self.transient_revision += 1;
500            self.state = new_state;
501        }
502
503        Ok(TransactionResult {
504            builtin_table_updates,
505            catalog_updates,
506            audit_events,
507        })
508    }
509
510    /// Performs an incremental dry-run catalog transaction: processes only the
511    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
512    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
513    ///
514    /// The durable transaction is intentionally never committed and no storage
515    /// controller prepare-state side effects are run.
516    ///
517    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
518    /// snapshot (which represents the tx state after the previous dry run),
519    /// ensuring it starts in sync with `base_state`. If `None` (first
520    /// statement), a fresh transaction is loaded from durable storage.
521    ///
522    /// Returns the new accumulated state and a snapshot of the transaction's
523    /// state for use in subsequent incremental dry runs.
524    pub async fn transact_incremental_dry_run(
525        &self,
526        base_state: &CatalogState,
527        ops: Vec<Op>,
528        session: Option<&ConnMeta>,
529        prev_snapshot: Option<Snapshot>,
530        oracle_write_ts: mz_repr::Timestamp,
531    ) -> Result<(CatalogState, Snapshot), AdapterError> {
532        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
533        // but we still need to check for collisions.
534        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
535
536        let mut builtin_table_updates = vec![];
537        let mut catalog_updates = vec![];
538        let mut audit_events = vec![];
539        let mut storage = self.storage().await;
540        let mut tx = if let Some(snapshot) = prev_snapshot {
541            // Restore transaction from saved snapshot so it starts in sync
542            // with the accumulated CatalogState from previous dry runs.
543            storage
544                .transaction_from_snapshot(snapshot)
545                .unwrap_or_terminate("starting catalog transaction from snapshot")
546        } else {
547            // First statement: fresh transaction from durable storage, which
548            // is in sync with the real catalog state.
549            storage
550                .transaction()
551                .await
552                .unwrap_or_terminate("starting catalog transaction")
553        };
554
555        // Process only the new ops against the accumulated state in dry-run mode.
556        let new_state = Self::transact_inner(
557            TransactInnerMode::DryRun,
558            None,
559            oracle_write_ts,
560            session,
561            ops,
562            temporary_ids,
563            &mut builtin_table_updates,
564            &mut catalog_updates,
565            &mut audit_events,
566            &mut tx,
567            base_state,
568        )
569        .await?;
570
571        // Save the transaction's current state as a snapshot for the next
572        // incremental dry run.
573        let new_snapshot = tx.current_snapshot();
574
575        // Transaction is NOT committed — drop it.
576        drop(storage);
577
578        // transact_inner returns Some(state) when ops produced changes.
579        let state = new_state.unwrap_or_else(|| base_state.clone());
580        Ok((state, new_snapshot))
581    }
582
583    /// Extracts optimized expressions from `Op::CreateItem` operations for views
584    /// and materialized views. These can be used to populate a `LocalExpressionCache`
585    /// to avoid re-optimization during `apply_updates`.
586    fn extract_expressions_from_ops(
587        ops: &[Op],
588        optimizer_features: &OptimizerFeatures,
589    ) -> BTreeMap<GlobalId, LocalExpressions> {
590        let mut exprs = BTreeMap::new();
591
592        for op in ops {
593            if let Op::CreateItem { item, .. } = op {
594                match item {
595                    CatalogItem::View(view) => {
596                        exprs.insert(
597                            view.global_id,
598                            LocalExpressions {
599                                local_mir: (*view.locally_optimized_expr).clone(),
600                                optimizer_features: optimizer_features.clone(),
601                            },
602                        );
603                    }
604                    CatalogItem::MaterializedView(mv) => {
605                        exprs.insert(
606                            mv.global_id_writes(),
607                            LocalExpressions {
608                                local_mir: (*mv.locally_optimized_expr).clone(),
609                                optimizer_features: optimizer_features.clone(),
610                            },
611                        );
612                    }
613                    CatalogItem::Table(_)
614                    | CatalogItem::Source(_)
615                    | CatalogItem::Log(_)
616                    | CatalogItem::Sink(_)
617                    | CatalogItem::Index(_)
618                    | CatalogItem::Type(_)
619                    | CatalogItem::Func(_)
620                    | CatalogItem::Secret(_)
621                    | CatalogItem::Connection(_) => {}
622                }
623            }
624        }
625
626        exprs
627    }
628
629    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
630    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
631    ///
632    /// `mode` controls whether storage prepare-state side effects are allowed.
633    /// In `DryRun` mode, this method may update the in-memory state returned to
634    /// the caller, but it must not trigger controller side effects.
635    ///
636    #[instrument(name = "catalog::transact_inner")]
637    async fn transact_inner(
638        mode: TransactInnerMode,
639        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
640        oracle_write_ts: mz_repr::Timestamp,
641        session: Option<&ConnMeta>,
642        ops: Vec<Op>,
643        temporary_ids: BTreeSet<CatalogItemId>,
644        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
645        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
646        audit_events: &mut Vec<VersionedEvent>,
647        tx: &mut Transaction<'_>,
648        state: &CatalogState,
649    ) -> Result<Option<CatalogState>, AdapterError> {
650        // We come up with new catalog state, builtin state updates, and parsed
651        // catalog updates (for deriving catalog implications) in two phases:
652        //
653        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
654        //    one-by-one. This will give us the full list of updates to apply to
655        //    the catalog, which will allow us to apply it in one batch, which
656        //    in turn will allow the apply machinery to consolidate the updates.
657        // 2. We do one final apply call with all updates, which gives us the
658        //    final builtin table updates and parsed catalog updates.
659        //
660        // The reason is that the loop that is working off ops first does a
661        // transact_op to derive the state updates for that op, and then calls
662        // apply_updates on the catalog state. And successive ops might expect
663        // the catalog state to reflect the modified state _after_ applying
664        // previous ops.
665        //
666        // We want to, however, have one final apply_state that takes all the
667        // accumulated updates to derive the required controller updates and the
668        // builtin table updates.
669        //
670        // We won't win any DDL throughput benchmarks, but so far that's not
671        // what we're optimizing for and there would probably be other
672        // bottlenecks before we hit this one as a bottleneck.
673        //
674        // We could work around this by refactoring how the interplay of
675        // transact_op and apply_updates works, but that's a larger undertaking.
676        let mut preliminary_state = Cow::Borrowed(state);
677
678        // The final state that we will return, if modified.
679        let mut state = Cow::Borrowed(state);
680
681        if ops.is_empty() {
682            return Ok(None);
683        }
684
685        // Extract optimized expressions from CreateItem ops to avoid re-optimization
686        // during apply_updates. We extract before the loop since `ops` is moved there.
687        let optimizer_features = OptimizerFeatures::from(state.system_config());
688        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
689
690        let mut storage_collections_to_create = BTreeSet::new();
691        let mut storage_collections_to_drop = BTreeSet::new();
692        let mut storage_collections_to_register = BTreeMap::new();
693
694        let mut updates = Vec::new();
695
696        for op in ops {
697            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
698                oracle_write_ts,
699                session,
700                op,
701                &temporary_ids,
702                audit_events,
703                tx,
704                &*preliminary_state,
705                &mut storage_collections_to_create,
706                &mut storage_collections_to_drop,
707                &mut storage_collections_to_register,
708            )
709            .await?;
710
711            // Certain builtin tables are not derived from the durable catalog state, so they need
712            // to be updated ad-hoc based on the current transaction. This is weird and will not
713            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
714            // this instance crashes after committing the durable catalog but before applying the
715            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
716            // world. Currently, this works fine and there are no correctness issues because
717            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
718            // state of all builtin tables to the correct values.
719            if let Some(builtin_table_update) = weird_builtin_table_update {
720                builtin_table_updates.push(builtin_table_update);
721            }
722
723            // Temporary items are not stored in the durable catalog, so they need to be handled
724            // separately for updating state and builtin tables.
725            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
726            // in a multi-subscriber catalog world.
727            let upper = tx.upper();
728            let temporary_item_updates =
729                temporary_item_updates
730                    .into_iter()
731                    .map(|(item, diff)| StateUpdate {
732                        kind: StateUpdateKind::TemporaryItem(item),
733                        ts: upper,
734                        diff,
735                    });
736
737            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
738            op_updates.extend(temporary_item_updates);
739            if !op_updates.is_empty() {
740                // Clone the cache so each apply_updates call has access to cached expressions.
741                // The cache uses `remove` semantics, so we need a fresh clone for each call.
742                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
743                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
744                    .to_mut()
745                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
746                    .await;
747            }
748            updates.append(&mut op_updates);
749        }
750
751        if !updates.is_empty() {
752            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
753            let (op_builtin_table_updates, op_catalog_updates) = state
754                .to_mut()
755                .apply_updates(updates.clone(), &mut local_expr_cache)
756                .await;
757            let op_builtin_table_updates = state
758                .to_mut()
759                .resolve_builtin_table_updates(op_builtin_table_updates);
760            builtin_table_updates.extend(op_builtin_table_updates);
761            parsed_catalog_updates.extend(op_catalog_updates);
762        }
763
764        match mode {
765            TransactInnerMode::Commit => {
766                // `storage_collections` can be `None` in tests.
767                if let Some(c) = storage_collections {
768                    c.prepare_state(
769                        tx,
770                        storage_collections_to_create,
771                        storage_collections_to_drop,
772                        storage_collections_to_register,
773                    )
774                    .await?;
775                }
776            }
777            TransactInnerMode::DryRun => {
778                debug_assert!(
779                    storage_collections.is_none(),
780                    "dry-run mode must not prepare storage state"
781                );
782            }
783        }
784
785        let updates = tx.get_and_commit_op_updates();
786        if !updates.is_empty() {
787            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
788            let (op_builtin_table_updates, op_catalog_updates) = state
789                .to_mut()
790                .apply_updates(updates.clone(), &mut local_expr_cache)
791                .await;
792            let op_builtin_table_updates = state
793                .to_mut()
794                .resolve_builtin_table_updates(op_builtin_table_updates);
795            builtin_table_updates.extend(op_builtin_table_updates);
796            parsed_catalog_updates.extend(op_catalog_updates);
797        }
798
799        match state {
800            Cow::Owned(state) => Ok(Some(state)),
801            Cow::Borrowed(_) => Ok(None),
802        }
803    }
804
805    /// Performs the transaction operation described by `op`. This function prepares the changes in
806    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
807    /// changes.
808    ///
809    /// Optionally returns a builtin table update for any builtin table updates than cannot be
810    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
811    /// scenarios and ideally in the future don't exist.
812    #[instrument]
813    async fn transact_op(
814        oracle_write_ts: mz_repr::Timestamp,
815        session: Option<&ConnMeta>,
816        op: Op,
817        temporary_ids: &BTreeSet<CatalogItemId>,
818        audit_events: &mut Vec<VersionedEvent>,
819        tx: &mut Transaction<'_>,
820        state: &CatalogState,
821        storage_collections_to_create: &mut BTreeSet<GlobalId>,
822        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
823        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
824    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
825        let mut weird_builtin_table_update = None;
826        let mut temporary_item_updates = Vec::new();
827
828        match op {
829            Op::AlterRetainHistory { id, value, window } => {
830                let entry = state.get_entry(&id);
831                if id.is_system() {
832                    let name = entry.name();
833                    let full_name =
834                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
835                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
836                        full_name.to_string(),
837                    ))));
838                }
839
840                let mut new_entry = entry.clone();
841                let previous = new_entry
842                    .item
843                    .update_retain_history(value.clone(), window)
844                    .map_err(|_| {
845                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
846                            "planner should have rejected invalid alter retain history item type"
847                                .to_string(),
848                        )))
849                    })?;
850
851                if Self::should_audit_log_item(new_entry.item()) {
852                    let details =
853                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
854                            id: id.to_string(),
855                            old_history: previous.map(|previous| previous.to_string()),
856                            new_history: value.map(|v| v.to_string()),
857                        });
858                    CatalogState::add_to_audit_log(
859                        &state.system_configuration,
860                        oracle_write_ts,
861                        session,
862                        tx,
863                        audit_events,
864                        EventType::Alter,
865                        catalog_type_to_audit_object_type(new_entry.item().typ()),
866                        details,
867                    )?;
868                }
869
870                tx.update_item(id, new_entry.into())?;
871
872                Self::log_update(state, &id);
873            }
874            Op::AlterSourceTimestampInterval {
875                id,
876                value,
877                interval,
878            } => {
879                let entry = state.get_entry(&id);
880                if id.is_system() {
881                    let name = entry.name();
882                    let full_name =
883                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
884                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
885                        full_name.to_string(),
886                    ))));
887                }
888
889                let mut new_entry = entry.clone();
890                new_entry
891                    .item
892                    .update_timestamp_interval(value, interval)
893                    .map_err(|_| {
894                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
895                            "planner should have rejected invalid alter timestamp interval item type"
896                                .to_string(),
897                        )))
898                    })?;
899
900                tx.update_item(id, new_entry.into())?;
901
902                Self::log_update(state, &id);
903            }
904            Op::AlterRole {
905                id,
906                name,
907                attributes,
908                nopassword,
909                vars,
910            } => {
911                state.ensure_not_reserved_role(&id)?;
912
913                let mut existing_role = state.get_role(&id).clone();
914                let password = attributes.password.clone();
915                let scram_iterations = attributes
916                    .scram_iterations
917                    .unwrap_or_else(|| state.system_config().scram_iterations());
918                existing_role.attributes = attributes.into();
919                existing_role.vars = vars;
920                let password_action = if nopassword {
921                    PasswordAction::Clear
922                } else if let Some(password) = password {
923                    PasswordAction::Set(PasswordConfig {
924                        password,
925                        scram_iterations,
926                    })
927                } else {
928                    PasswordAction::NoChange
929                };
930                tx.update_role(id, existing_role.into(), password_action)?;
931
932                CatalogState::add_to_audit_log(
933                    &state.system_configuration,
934                    oracle_write_ts,
935                    session,
936                    tx,
937                    audit_events,
938                    EventType::Alter,
939                    ObjectType::Role,
940                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
941                        id: id.to_string(),
942                        name: name.clone(),
943                    }),
944                )?;
945
946                info!("update role {name} ({id})");
947            }
948            Op::AlterNetworkPolicy {
949                id,
950                rules,
951                name,
952                owner_id: _owner_id,
953            } => {
954                let existing_policy = state.get_network_policy(&id).clone();
955                let mut policy: NetworkPolicy = existing_policy.into();
956                policy.rules = rules;
957                if is_reserved_name(&name) {
958                    return Err(AdapterError::Catalog(Error::new(
959                        ErrorKind::ReservedNetworkPolicyName(name),
960                    )));
961                }
962                tx.update_network_policy(id, policy.clone())?;
963
964                CatalogState::add_to_audit_log(
965                    &state.system_configuration,
966                    oracle_write_ts,
967                    session,
968                    tx,
969                    audit_events,
970                    EventType::Alter,
971                    ObjectType::NetworkPolicy,
972                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
973                        id: id.to_string(),
974                        name: name.clone(),
975                    }),
976                )?;
977
978                info!("update network policy {name} ({id})");
979            }
980            Op::AlterAddColumn {
981                id,
982                new_global_id,
983                name,
984                typ,
985                sql,
986            } => {
987                let mut new_entry = state.get_entry(&id).clone();
988                let version = new_entry.item.add_column(name, typ, sql)?;
989                // All versions of a table share the same shard, so it shouldn't matter what
990                // GlobalId we use here.
991                let shard_id = state
992                    .storage_metadata()
993                    .get_collection_shard(new_entry.latest_global_id())?;
994
995                // TODO(alter_table): Support adding columns to sources.
996                let CatalogItem::Table(table) = &mut new_entry.item else {
997                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
998                };
999                table.collections.insert(version, new_global_id);
1000
1001                tx.update_item(id, new_entry.into())?;
1002                storage_collections_to_register.insert(new_global_id, shard_id);
1003            }
1004            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1005                let mut new_entry = state.get_entry(&id).clone();
1006                let replacement = state.get_entry(&replacement_id);
1007
1008                let new_audit_events =
1009                    apply_replacement_audit_events(state, &new_entry, replacement);
1010
1011                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1012                    return Err(AdapterError::internal(
1013                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1014                        "id must refer to a materialized view",
1015                    ));
1016                };
1017                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1018                    return Err(AdapterError::internal(
1019                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1020                        "replacement_id must refer to a materialized view",
1021                    ));
1022                };
1023
1024                mv.apply_replacement(replacement_mv.clone());
1025
1026                tx.remove_item(replacement_id)?;
1027
1028                new_entry.id = replacement_id;
1029                tx_replace_item(tx, state, id, new_entry)?;
1030
1031                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1032                tx.drop_comments(&[comment_id].into())?;
1033
1034                for (event_type, details) in new_audit_events {
1035                    CatalogState::add_to_audit_log(
1036                        &state.system_configuration,
1037                        oracle_write_ts,
1038                        session,
1039                        tx,
1040                        audit_events,
1041                        event_type,
1042                        ObjectType::MaterializedView,
1043                        details,
1044                    )?;
1045                }
1046            }
1047            Op::CreateDatabase { name, owner_id } => {
1048                let database_owner_privileges = vec![rbac::owner_privilege(
1049                    mz_sql::catalog::ObjectType::Database,
1050                    owner_id,
1051                )];
1052                let database_default_privileges = state
1053                    .default_privileges
1054                    .get_applicable_privileges(
1055                        owner_id,
1056                        None,
1057                        None,
1058                        mz_sql::catalog::ObjectType::Database,
1059                    )
1060                    .map(|item| item.mz_acl_item(owner_id));
1061                let database_privileges: Vec<_> = merge_mz_acl_items(
1062                    database_owner_privileges
1063                        .into_iter()
1064                        .chain(database_default_privileges),
1065                )
1066                .collect();
1067
1068                let schema_owner_privileges = vec![rbac::owner_privilege(
1069                    mz_sql::catalog::ObjectType::Schema,
1070                    owner_id,
1071                )];
1072                let schema_default_privileges = state
1073                    .default_privileges
1074                    .get_applicable_privileges(
1075                        owner_id,
1076                        None,
1077                        None,
1078                        mz_sql::catalog::ObjectType::Schema,
1079                    )
1080                    .map(|item| item.mz_acl_item(owner_id))
1081                    // Special default privilege on public schemas.
1082                    .chain(std::iter::once(MzAclItem {
1083                        grantee: RoleId::Public,
1084                        grantor: owner_id,
1085                        acl_mode: AclMode::USAGE,
1086                    }));
1087                let schema_privileges: Vec<_> = merge_mz_acl_items(
1088                    schema_owner_privileges
1089                        .into_iter()
1090                        .chain(schema_default_privileges),
1091                )
1092                .collect();
1093
1094                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1095                let (database_id, _) = tx.insert_user_database(
1096                    &name,
1097                    owner_id,
1098                    database_privileges.clone(),
1099                    &temporary_oids,
1100                )?;
1101                let (schema_id, _) = tx.insert_user_schema(
1102                    database_id,
1103                    DEFAULT_SCHEMA,
1104                    owner_id,
1105                    schema_privileges.clone(),
1106                    &temporary_oids,
1107                )?;
1108                CatalogState::add_to_audit_log(
1109                    &state.system_configuration,
1110                    oracle_write_ts,
1111                    session,
1112                    tx,
1113                    audit_events,
1114                    EventType::Create,
1115                    ObjectType::Database,
1116                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1117                        id: database_id.to_string(),
1118                        name: name.clone(),
1119                    }),
1120                )?;
1121                info!("create database {}", name);
1122
1123                CatalogState::add_to_audit_log(
1124                    &state.system_configuration,
1125                    oracle_write_ts,
1126                    session,
1127                    tx,
1128                    audit_events,
1129                    EventType::Create,
1130                    ObjectType::Schema,
1131                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1132                        id: schema_id.to_string(),
1133                        name: DEFAULT_SCHEMA.to_string(),
1134                        database_name: Some(name),
1135                    }),
1136                )?;
1137            }
1138            Op::CreateSchema {
1139                database_id,
1140                schema_name,
1141                owner_id,
1142            } => {
1143                if is_reserved_name(&schema_name) {
1144                    return Err(AdapterError::Catalog(Error::new(
1145                        ErrorKind::ReservedSchemaName(schema_name),
1146                    )));
1147                }
1148                let database_id = match database_id {
1149                    ResolvedDatabaseSpecifier::Id(id) => id,
1150                    ResolvedDatabaseSpecifier::Ambient => {
1151                        return Err(AdapterError::Catalog(Error::new(
1152                            ErrorKind::ReadOnlySystemSchema(schema_name),
1153                        )));
1154                    }
1155                };
1156                let owner_privileges = vec![rbac::owner_privilege(
1157                    mz_sql::catalog::ObjectType::Schema,
1158                    owner_id,
1159                )];
1160                let default_privileges = state
1161                    .default_privileges
1162                    .get_applicable_privileges(
1163                        owner_id,
1164                        Some(database_id),
1165                        None,
1166                        mz_sql::catalog::ObjectType::Schema,
1167                    )
1168                    .map(|item| item.mz_acl_item(owner_id));
1169                let privileges: Vec<_> =
1170                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1171                        .collect();
1172                let (schema_id, _) = tx.insert_user_schema(
1173                    database_id,
1174                    &schema_name,
1175                    owner_id,
1176                    privileges.clone(),
1177                    &state.get_temporary_oids().collect(),
1178                )?;
1179                CatalogState::add_to_audit_log(
1180                    &state.system_configuration,
1181                    oracle_write_ts,
1182                    session,
1183                    tx,
1184                    audit_events,
1185                    EventType::Create,
1186                    ObjectType::Schema,
1187                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1188                        id: schema_id.to_string(),
1189                        name: schema_name.clone(),
1190                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1191                    }),
1192                )?;
1193            }
1194            Op::CreateRole { name, attributes } => {
1195                if is_reserved_role_name(&name) {
1196                    return Err(AdapterError::Catalog(Error::new(
1197                        ErrorKind::ReservedRoleName(name),
1198                    )));
1199                }
1200                let membership = RoleMembership::new();
1201                let vars = RoleVars::default();
1202                let (id, _) = tx.insert_user_role(
1203                    name.clone(),
1204                    attributes.clone(),
1205                    membership.clone(),
1206                    vars.clone(),
1207                    &state.get_temporary_oids().collect(),
1208                )?;
1209                CatalogState::add_to_audit_log(
1210                    &state.system_configuration,
1211                    oracle_write_ts,
1212                    session,
1213                    tx,
1214                    audit_events,
1215                    EventType::Create,
1216                    ObjectType::Role,
1217                    EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1218                        id: id.to_string(),
1219                        name: name.clone(),
1220                        auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1221                            AutoProvisionSource::Oidc => "oidc".to_string(),
1222                            AutoProvisionSource::Frontegg => "frontegg".to_string(),
1223                            AutoProvisionSource::None => "none".to_string(),
1224                        }),
1225                    }),
1226                )?;
1227                info!("create role {}", name);
1228            }
1229            Op::CreateCluster {
1230                id,
1231                name,
1232                introspection_sources,
1233                owner_id,
1234                config,
1235            } => {
1236                if is_reserved_name(&name) {
1237                    return Err(AdapterError::Catalog(Error::new(
1238                        ErrorKind::ReservedClusterName(name),
1239                    )));
1240                }
1241                let owner_privileges = vec![rbac::owner_privilege(
1242                    mz_sql::catalog::ObjectType::Cluster,
1243                    owner_id,
1244                )];
1245                let default_privileges = state
1246                    .default_privileges
1247                    .get_applicable_privileges(
1248                        owner_id,
1249                        None,
1250                        None,
1251                        mz_sql::catalog::ObjectType::Cluster,
1252                    )
1253                    .map(|item| item.mz_acl_item(owner_id));
1254                let privileges: Vec<_> =
1255                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1256                        .collect();
1257                let introspection_source_ids: Vec<_> = introspection_sources
1258                    .iter()
1259                    .map(|introspection_source| {
1260                        Transaction::allocate_introspection_source_index_id(
1261                            &id,
1262                            introspection_source.variant,
1263                        )
1264                    })
1265                    .collect();
1266
1267                let introspection_sources = introspection_sources
1268                    .into_iter()
1269                    .zip_eq(introspection_source_ids)
1270                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1271                    .collect();
1272
1273                tx.insert_user_cluster(
1274                    id,
1275                    &name,
1276                    introspection_sources,
1277                    owner_id,
1278                    privileges.clone(),
1279                    config.clone().into(),
1280                    &state.get_temporary_oids().collect(),
1281                )?;
1282                CatalogState::add_to_audit_log(
1283                    &state.system_configuration,
1284                    oracle_write_ts,
1285                    session,
1286                    tx,
1287                    audit_events,
1288                    EventType::Create,
1289                    ObjectType::Cluster,
1290                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1291                        id: id.to_string(),
1292                        name: name.clone(),
1293                    }),
1294                )?;
1295                info!("create cluster {}", name);
1296            }
1297            Op::CreateClusterReplica {
1298                cluster_id,
1299                name,
1300                config,
1301                owner_id,
1302                reason,
1303            } => {
1304                if is_reserved_name(&name) {
1305                    return Err(AdapterError::Catalog(Error::new(
1306                        ErrorKind::ReservedReplicaName(name),
1307                    )));
1308                }
1309                let cluster = state.get_cluster(cluster_id);
1310                let id =
1311                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1312                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1313                    size,
1314                    billed_as,
1315                    internal,
1316                    ..
1317                }) = &config.location
1318                {
1319                    let (reason, scheduling_policies) = reason.into_audit_log();
1320                    let details = EventDetails::CreateClusterReplicaV4(
1321                        mz_audit_log::CreateClusterReplicaV4 {
1322                            cluster_id: cluster_id.to_string(),
1323                            cluster_name: cluster.name.clone(),
1324                            replica_id: Some(id.to_string()),
1325                            replica_name: name.clone(),
1326                            logical_size: size.clone(),
1327                            billed_as: billed_as.clone(),
1328                            internal: *internal,
1329                            reason,
1330                            scheduling_policies,
1331                        },
1332                    );
1333                    CatalogState::add_to_audit_log(
1334                        &state.system_configuration,
1335                        oracle_write_ts,
1336                        session,
1337                        tx,
1338                        audit_events,
1339                        EventType::Create,
1340                        ObjectType::ClusterReplica,
1341                        details,
1342                    )?;
1343                }
1344            }
1345            Op::CreateItem {
1346                id,
1347                name,
1348                item,
1349                owner_id,
1350            } => {
1351                state.check_unstable_dependencies(&item)?;
1352
1353                match &item {
1354                    CatalogItem::Table(table) => {
1355                        let gids: Vec<_> = table.global_ids().collect();
1356                        assert_eq!(gids.len(), 1);
1357                        storage_collections_to_create.extend(gids);
1358                    }
1359                    CatalogItem::Source(source) => {
1360                        storage_collections_to_create.insert(source.global_id());
1361                    }
1362                    CatalogItem::MaterializedView(mv) => {
1363                        let mv_gid = mv.global_id_writes();
1364                        if let Some(target_id) = mv.replacement_target {
1365                            let target_gid = state.get_entry(&target_id).latest_global_id();
1366                            let shard_id =
1367                                state.storage_metadata().get_collection_shard(target_gid)?;
1368                            storage_collections_to_register.insert(mv_gid, shard_id);
1369                        } else {
1370                            storage_collections_to_create.insert(mv_gid);
1371                        }
1372                    }
1373                    CatalogItem::Sink(sink) => {
1374                        storage_collections_to_create.insert(sink.global_id());
1375                    }
1376                    CatalogItem::Log(_)
1377                    | CatalogItem::View(_)
1378                    | CatalogItem::Index(_)
1379                    | CatalogItem::Type(_)
1380                    | CatalogItem::Func(_)
1381                    | CatalogItem::Secret(_)
1382                    | CatalogItem::Connection(_) => (),
1383                }
1384
1385                let system_user = session.map_or(false, |s| s.user().is_system_user());
1386                if !system_user {
1387                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1388                        let cluster_name = state.clusters_by_id[&id].name.clone();
1389                        return Err(AdapterError::Catalog(Error::new(
1390                            ErrorKind::ReadOnlyCluster(cluster_name),
1391                        )));
1392                    }
1393                }
1394
1395                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1396                let default_privileges = state
1397                    .default_privileges
1398                    .get_applicable_privileges(
1399                        owner_id,
1400                        name.qualifiers.database_spec.id(),
1401                        Some(name.qualifiers.schema_spec.into()),
1402                        item.typ().into(),
1403                    )
1404                    .map(|item| item.mz_acl_item(owner_id));
1405                // mz_support can read all progress sources.
1406                let progress_source_privilege = if item.is_progress_source() {
1407                    Some(MzAclItem {
1408                        grantee: MZ_SUPPORT_ROLE_ID,
1409                        grantor: owner_id,
1410                        acl_mode: AclMode::SELECT,
1411                    })
1412                } else {
1413                    None
1414                };
1415                let privileges: Vec<_> = merge_mz_acl_items(
1416                    owner_privileges
1417                        .into_iter()
1418                        .chain(default_privileges)
1419                        .chain(progress_source_privilege),
1420                )
1421                .collect();
1422
1423                let temporary_oids = state.get_temporary_oids().collect();
1424
1425                if item.is_temporary() {
1426                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1427                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1428                    {
1429                        return Err(AdapterError::Catalog(Error::new(
1430                            ErrorKind::InvalidTemporarySchema,
1431                        )));
1432                    }
1433                    let oid = tx.allocate_oid(&temporary_oids)?;
1434
1435                    let schema_id = name.qualifiers.schema_spec.clone().into();
1436                    let item_type = item.typ();
1437                    let (create_sql, global_id, versions) = item.to_serialized();
1438
1439                    let item = TemporaryItem {
1440                        id,
1441                        oid,
1442                        global_id,
1443                        schema_id,
1444                        name: name.item.clone(),
1445                        create_sql,
1446                        conn_id: item.conn_id().cloned(),
1447                        owner_id,
1448                        privileges: privileges.clone(),
1449                        extra_versions: versions,
1450                    };
1451                    temporary_item_updates.push((item, StateDiff::Addition));
1452
1453                    info!(
1454                        "create temporary {} {} ({})",
1455                        item_type,
1456                        state.resolve_full_name(&name, None),
1457                        id
1458                    );
1459                } else {
1460                    if let Some(temp_id) =
1461                        item.uses()
1462                            .iter()
1463                            .find(|id| match state.try_get_entry(*id) {
1464                                Some(entry) => entry.item().is_temporary(),
1465                                None => temporary_ids.contains(id),
1466                            })
1467                    {
1468                        let temp_item = state.get_entry(temp_id);
1469                        return Err(AdapterError::Catalog(Error::new(
1470                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1471                        )));
1472                    }
1473                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1474                        && !system_user
1475                    {
1476                        let schema_name = state
1477                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1478                            .schema;
1479                        return Err(AdapterError::Catalog(Error::new(
1480                            ErrorKind::ReadOnlySystemSchema(schema_name),
1481                        )));
1482                    }
1483                    let schema_id = name.qualifiers.schema_spec.clone().into();
1484                    let item_type = item.typ();
1485                    let (create_sql, global_id, versions) = item.to_serialized();
1486                    tx.insert_user_item(
1487                        id,
1488                        global_id,
1489                        schema_id,
1490                        &name.item,
1491                        create_sql,
1492                        owner_id,
1493                        privileges.clone(),
1494                        &temporary_oids,
1495                        versions,
1496                    )?;
1497                    info!(
1498                        "create {} {} ({})",
1499                        item_type,
1500                        state.resolve_full_name(&name, None),
1501                        id
1502                    );
1503                }
1504
1505                if Self::should_audit_log_item(&item) {
1506                    let name = Self::full_name_detail(
1507                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1508                    );
1509                    let details = match &item {
1510                        CatalogItem::Source(s) => {
1511                            let cluster_id = match s.data_source {
1512                                // Ingestion exports don't have their own cluster, but
1513                                // run on their ingestion's cluster.
1514                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1515                                    match state.get_entry(&ingestion_id).cluster_id() {
1516                                        Some(cluster_id) => Some(cluster_id.to_string()),
1517                                        None => None,
1518                                    }
1519                                }
1520                                _ => match item.cluster_id() {
1521                                    Some(cluster_id) => Some(cluster_id.to_string()),
1522                                    None => None,
1523                                },
1524                            };
1525
1526                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1527                                id: id.to_string(),
1528                                cluster_id,
1529                                name,
1530                                external_type: s.source_type().to_string(),
1531                            })
1532                        }
1533                        CatalogItem::Sink(s) => {
1534                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1535                                id: id.to_string(),
1536                                cluster_id: Some(s.cluster_id.to_string()),
1537                                name,
1538                                external_type: s.sink_type().to_string(),
1539                            })
1540                        }
1541                        CatalogItem::Index(i) => {
1542                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1543                                id: id.to_string(),
1544                                name,
1545                                cluster_id: i.cluster_id.to_string(),
1546                            })
1547                        }
1548                        CatalogItem::MaterializedView(mv) => {
1549                            EventDetails::CreateMaterializedViewV1(
1550                                mz_audit_log::CreateMaterializedViewV1 {
1551                                    id: id.to_string(),
1552                                    name,
1553                                    cluster_id: mv.cluster_id.to_string(),
1554                                    replacement_target_id: mv
1555                                        .replacement_target
1556                                        .map(|id| id.to_string()),
1557                                },
1558                            )
1559                        }
1560                        CatalogItem::Table(_)
1561                        | CatalogItem::Log(_)
1562                        | CatalogItem::View(_)
1563                        | CatalogItem::Type(_)
1564                        | CatalogItem::Func(_)
1565                        | CatalogItem::Secret(_)
1566                        | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1567                            id: id.to_string(),
1568                            name,
1569                        }),
1570                    };
1571                    CatalogState::add_to_audit_log(
1572                        &state.system_configuration,
1573                        oracle_write_ts,
1574                        session,
1575                        tx,
1576                        audit_events,
1577                        EventType::Create,
1578                        catalog_type_to_audit_object_type(item.typ()),
1579                        details,
1580                    )?;
1581                }
1582            }
1583            Op::CreateNetworkPolicy {
1584                rules,
1585                name,
1586                owner_id,
1587            } => {
1588                if state.network_policies_by_name.contains_key(&name) {
1589                    return Err(AdapterError::PlanError(PlanError::Catalog(
1590                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1591                    )));
1592                }
1593                if is_reserved_name(&name) {
1594                    return Err(AdapterError::Catalog(Error::new(
1595                        ErrorKind::ReservedNetworkPolicyName(name),
1596                    )));
1597                }
1598
1599                let owner_privileges = vec![rbac::owner_privilege(
1600                    mz_sql::catalog::ObjectType::NetworkPolicy,
1601                    owner_id,
1602                )];
1603                let default_privileges = state
1604                    .default_privileges
1605                    .get_applicable_privileges(
1606                        owner_id,
1607                        None,
1608                        None,
1609                        mz_sql::catalog::ObjectType::NetworkPolicy,
1610                    )
1611                    .map(|item| item.mz_acl_item(owner_id));
1612                let privileges: Vec<_> =
1613                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1614                        .collect();
1615
1616                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1617                let id = tx.insert_user_network_policy(
1618                    name.clone(),
1619                    rules,
1620                    privileges,
1621                    owner_id,
1622                    &temporary_oids,
1623                )?;
1624
1625                CatalogState::add_to_audit_log(
1626                    &state.system_configuration,
1627                    oracle_write_ts,
1628                    session,
1629                    tx,
1630                    audit_events,
1631                    EventType::Create,
1632                    ObjectType::NetworkPolicy,
1633                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1634                        id: id.to_string(),
1635                        name: name.clone(),
1636                    }),
1637                )?;
1638
1639                info!("created network policy {name} ({id})");
1640            }
1641            Op::Comment {
1642                object_id,
1643                sub_component,
1644                comment,
1645            } => {
1646                tx.update_comment(object_id, sub_component, comment)?;
1647                let entry = state.get_comment_id_entry(&object_id);
1648                let should_log = entry
1649                    .map(|entry| Self::should_audit_log_item(entry.item()))
1650                    // Things that aren't catalog entries can't be temp, so should be logged.
1651                    .unwrap_or(true);
1652                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1653                // comments won't be logged for now.
1654                if let (Some(conn_id), true) =
1655                    (session.map(|session| session.conn_id()), should_log)
1656                {
1657                    CatalogState::add_to_audit_log(
1658                        &state.system_configuration,
1659                        oracle_write_ts,
1660                        session,
1661                        tx,
1662                        audit_events,
1663                        EventType::Comment,
1664                        comment_id_to_audit_object_type(object_id),
1665                        EventDetails::IdNameV1(IdNameV1 {
1666                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1667                            id: format!("{object_id:?}"),
1668                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1669                        }),
1670                    )?;
1671                }
1672            }
1673            Op::UpdateSourceReferences {
1674                source_id,
1675                references,
1676            } => {
1677                tx.update_source_references(
1678                    source_id,
1679                    references
1680                        .references
1681                        .into_iter()
1682                        .map(|reference| reference.into())
1683                        .collect(),
1684                    references.updated_at,
1685                )?;
1686            }
1687            Op::DropObjects(drop_object_infos) => {
1688                // Generate all of the objects that need to get dropped.
1689                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1690
1691                // Drop any associated comments.
1692                tx.drop_comments(&delta.comments)?;
1693
1694                // Drop any items.
1695                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1696                    delta
1697                        .items
1698                        .iter()
1699                        .map(|id| id)
1700                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1701                tx.remove_items(&durable_items_to_drop)?;
1702                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1703                    let entry = state.get_entry(&id);
1704                    (entry.clone().into(), StateDiff::Retraction)
1705                }));
1706
1707                for item_id in delta.items {
1708                    let entry = state.get_entry(&item_id);
1709
1710                    if entry.item().is_storage_collection() {
1711                        storage_collections_to_drop.extend(entry.global_ids());
1712                    }
1713
1714                    if state.source_references.contains_key(&item_id) {
1715                        tx.remove_source_references(item_id)?;
1716                    }
1717
1718                    if Self::should_audit_log_item(entry.item()) {
1719                        CatalogState::add_to_audit_log(
1720                            &state.system_configuration,
1721                            oracle_write_ts,
1722                            session,
1723                            tx,
1724                            audit_events,
1725                            EventType::Drop,
1726                            catalog_type_to_audit_object_type(entry.item().typ()),
1727                            EventDetails::IdFullNameV1(IdFullNameV1 {
1728                                id: item_id.to_string(),
1729                                name: Self::full_name_detail(&state.resolve_full_name(
1730                                    entry.name(),
1731                                    session.map(|session| session.conn_id()),
1732                                )),
1733                            }),
1734                        )?;
1735                    }
1736                    info!(
1737                        "drop {} {} ({})",
1738                        entry.item_type(),
1739                        state.resolve_full_name(entry.name(), entry.conn_id()),
1740                        item_id
1741                    );
1742                }
1743
1744                // Drop any schemas.
1745                let schemas = delta
1746                    .schemas
1747                    .iter()
1748                    .map(|(schema_spec, database_spec)| {
1749                        (SchemaId::from(schema_spec), *database_spec)
1750                    })
1751                    .collect();
1752                tx.remove_schemas(&schemas)?;
1753
1754                for (schema_spec, database_spec) in delta.schemas {
1755                    let schema = state.get_schema(
1756                        &database_spec,
1757                        &schema_spec,
1758                        session
1759                            .map(|session| session.conn_id())
1760                            .unwrap_or(&SYSTEM_CONN_ID),
1761                    );
1762
1763                    let schema_id = SchemaId::from(schema_spec);
1764                    let database_id = match database_spec {
1765                        ResolvedDatabaseSpecifier::Ambient => None,
1766                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1767                    };
1768
1769                    CatalogState::add_to_audit_log(
1770                        &state.system_configuration,
1771                        oracle_write_ts,
1772                        session,
1773                        tx,
1774                        audit_events,
1775                        EventType::Drop,
1776                        ObjectType::Schema,
1777                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1778                            id: schema_id.to_string(),
1779                            name: schema.name.schema.to_string(),
1780                            database_name: database_id
1781                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1782                        }),
1783                    )?;
1784                }
1785
1786                // Drop any databases.
1787                tx.remove_databases(&delta.databases)?;
1788
1789                for database_id in delta.databases {
1790                    let database = state.get_database(&database_id).clone();
1791
1792                    CatalogState::add_to_audit_log(
1793                        &state.system_configuration,
1794                        oracle_write_ts,
1795                        session,
1796                        tx,
1797                        audit_events,
1798                        EventType::Drop,
1799                        ObjectType::Database,
1800                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1801                            id: database_id.to_string(),
1802                            name: database.name.clone(),
1803                        }),
1804                    )?;
1805                }
1806
1807                // Drop any roles.
1808                tx.remove_user_roles(&delta.roles)?;
1809
1810                for role_id in delta.roles {
1811                    let role = state
1812                        .roles_by_id
1813                        .get(&role_id)
1814                        .expect("catalog out of sync");
1815
1816                    CatalogState::add_to_audit_log(
1817                        &state.system_configuration,
1818                        oracle_write_ts,
1819                        session,
1820                        tx,
1821                        audit_events,
1822                        EventType::Drop,
1823                        ObjectType::Role,
1824                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1825                            id: role.id.to_string(),
1826                            name: role.name.clone(),
1827                        }),
1828                    )?;
1829                    info!("drop role {}", role.name());
1830                }
1831
1832                // Drop any network policies.
1833                tx.remove_network_policies(&delta.network_policies)?;
1834
1835                for network_policy_id in delta.network_policies {
1836                    let policy = state
1837                        .network_policies_by_id
1838                        .get(&network_policy_id)
1839                        .expect("catalog out of sync");
1840
1841                    CatalogState::add_to_audit_log(
1842                        &state.system_configuration,
1843                        oracle_write_ts,
1844                        session,
1845                        tx,
1846                        audit_events,
1847                        EventType::Drop,
1848                        ObjectType::NetworkPolicy,
1849                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1850                            id: policy.id.to_string(),
1851                            name: policy.name.clone(),
1852                        }),
1853                    )?;
1854                    info!("drop network policy {}", policy.name.clone());
1855                }
1856
1857                // Drop any replicas.
1858                let replicas = delta.replicas.keys().copied().collect();
1859                tx.remove_cluster_replicas(&replicas)?;
1860
1861                for (replica_id, (cluster_id, reason)) in delta.replicas {
1862                    let cluster = state.get_cluster(cluster_id);
1863                    let replica = cluster.replica(replica_id).expect("Must exist");
1864
1865                    let (reason, scheduling_policies) = reason.into_audit_log();
1866                    let details =
1867                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1868                            cluster_id: cluster_id.to_string(),
1869                            cluster_name: cluster.name.clone(),
1870                            replica_id: Some(replica_id.to_string()),
1871                            replica_name: replica.name.clone(),
1872                            reason,
1873                            scheduling_policies,
1874                        });
1875                    CatalogState::add_to_audit_log(
1876                        &state.system_configuration,
1877                        oracle_write_ts,
1878                        session,
1879                        tx,
1880                        audit_events,
1881                        EventType::Drop,
1882                        ObjectType::ClusterReplica,
1883                        details,
1884                    )?;
1885                }
1886
1887                // Drop any clusters.
1888                tx.remove_clusters(&delta.clusters)?;
1889
1890                for cluster_id in delta.clusters {
1891                    let cluster = state.get_cluster(cluster_id);
1892
1893                    CatalogState::add_to_audit_log(
1894                        &state.system_configuration,
1895                        oracle_write_ts,
1896                        session,
1897                        tx,
1898                        audit_events,
1899                        EventType::Drop,
1900                        ObjectType::Cluster,
1901                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1902                            id: cluster.id.to_string(),
1903                            name: cluster.name.clone(),
1904                        }),
1905                    )?;
1906                }
1907            }
1908            Op::GrantRole {
1909                role_id,
1910                member_id,
1911                grantor_id,
1912            } => {
1913                state.ensure_not_reserved_role(&member_id)?;
1914                state.ensure_grantable_role(&role_id)?;
1915                if state.collect_role_membership(&role_id).contains(&member_id) {
1916                    let group_role = state.get_role(&role_id);
1917                    let member_role = state.get_role(&member_id);
1918                    return Err(AdapterError::Catalog(Error::new(
1919                        ErrorKind::CircularRoleMembership {
1920                            role_name: group_role.name().to_string(),
1921                            member_name: member_role.name().to_string(),
1922                        },
1923                    )));
1924                }
1925                let mut member_role = state.get_role(&member_id).clone();
1926                member_role.membership.map.insert(role_id, grantor_id);
1927                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1928
1929                CatalogState::add_to_audit_log(
1930                    &state.system_configuration,
1931                    oracle_write_ts,
1932                    session,
1933                    tx,
1934                    audit_events,
1935                    EventType::Grant,
1936                    ObjectType::Role,
1937                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1938                        role_id: role_id.to_string(),
1939                        member_id: member_id.to_string(),
1940                        grantor_id: grantor_id.to_string(),
1941                        executed_by: session
1942                            .map(|session| session.authenticated_role_id())
1943                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1944                            .to_string(),
1945                    }),
1946                )?;
1947            }
1948            Op::RevokeRole {
1949                role_id,
1950                member_id,
1951                grantor_id,
1952            } => {
1953                state.ensure_not_reserved_role(&member_id)?;
1954                state.ensure_grantable_role(&role_id)?;
1955                let mut member_role = state.get_role(&member_id).clone();
1956                member_role.membership.map.remove(&role_id);
1957                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1958
1959                CatalogState::add_to_audit_log(
1960                    &state.system_configuration,
1961                    oracle_write_ts,
1962                    session,
1963                    tx,
1964                    audit_events,
1965                    EventType::Revoke,
1966                    ObjectType::Role,
1967                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1968                        role_id: role_id.to_string(),
1969                        member_id: member_id.to_string(),
1970                        grantor_id: grantor_id.to_string(),
1971                        executed_by: session
1972                            .map(|session| session.authenticated_role_id())
1973                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1974                            .to_string(),
1975                    }),
1976                )?;
1977            }
1978            Op::UpdatePrivilege {
1979                target_id,
1980                privilege,
1981                variant,
1982            } => {
1983                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1984                    UpdatePrivilegeVariant::Grant => {
1985                        privileges.grant(privilege);
1986                    }
1987                    UpdatePrivilegeVariant::Revoke => {
1988                        privileges.revoke(&privilege);
1989                    }
1990                };
1991                match &target_id {
1992                    SystemObjectId::Object(object_id) => match object_id {
1993                        ObjectId::Cluster(id) => {
1994                            let mut cluster = state.get_cluster(*id).clone();
1995                            update_privilege_fn(&mut cluster.privileges);
1996                            tx.update_cluster(*id, cluster.into())?;
1997                        }
1998                        ObjectId::Database(id) => {
1999                            let mut database = state.get_database(id).clone();
2000                            update_privilege_fn(&mut database.privileges);
2001                            tx.update_database(*id, database.into())?;
2002                        }
2003                        ObjectId::NetworkPolicy(id) => {
2004                            let mut policy = state.get_network_policy(id).clone();
2005                            update_privilege_fn(&mut policy.privileges);
2006                            tx.update_network_policy(*id, policy.into())?;
2007                        }
2008                        ObjectId::Schema((database_spec, schema_spec)) => {
2009                            let schema_id = schema_spec.clone().into();
2010                            let mut schema = state
2011                                .get_schema(
2012                                    database_spec,
2013                                    schema_spec,
2014                                    session
2015                                        .map(|session| session.conn_id())
2016                                        .unwrap_or(&SYSTEM_CONN_ID),
2017                                )
2018                                .clone();
2019                            update_privilege_fn(&mut schema.privileges);
2020                            tx.update_schema(schema_id, schema.into())?;
2021                        }
2022                        ObjectId::Item(id) => {
2023                            let entry = state.get_entry(id);
2024                            let mut new_entry = entry.clone();
2025                            update_privilege_fn(&mut new_entry.privileges);
2026                            if !new_entry.item().is_temporary() {
2027                                tx.update_item(*id, new_entry.into())?;
2028                            } else {
2029                                temporary_item_updates
2030                                    .push((entry.clone().into(), StateDiff::Retraction));
2031                                temporary_item_updates
2032                                    .push((new_entry.into(), StateDiff::Addition));
2033                            }
2034                        }
2035                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2036                    },
2037                    SystemObjectId::System => {
2038                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2039                        update_privilege_fn(&mut system_privileges);
2040                        let new_privilege =
2041                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2042                        tx.set_system_privilege(
2043                            privilege.grantee,
2044                            privilege.grantor,
2045                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2046                        )?;
2047                    }
2048                }
2049                let object_type = state.get_system_object_type(&target_id);
2050                let object_id_str = match &target_id {
2051                    SystemObjectId::System => "SYSTEM".to_string(),
2052                    SystemObjectId::Object(id) => id.to_string(),
2053                };
2054                CatalogState::add_to_audit_log(
2055                    &state.system_configuration,
2056                    oracle_write_ts,
2057                    session,
2058                    tx,
2059                    audit_events,
2060                    variant.into(),
2061                    system_object_type_to_audit_object_type(&object_type),
2062                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2063                        object_id: object_id_str,
2064                        grantee_id: privilege.grantee.to_string(),
2065                        grantor_id: privilege.grantor.to_string(),
2066                        privileges: privilege.acl_mode.to_string(),
2067                    }),
2068                )?;
2069            }
2070            Op::UpdateDefaultPrivilege {
2071                privilege_object,
2072                privilege_acl_item,
2073                variant,
2074            } => {
2075                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2076                match variant {
2077                    UpdatePrivilegeVariant::Grant => default_privileges
2078                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2079                    UpdatePrivilegeVariant::Revoke => {
2080                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2081                    }
2082                }
2083                let new_acl_mode = default_privileges
2084                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2085                tx.set_default_privilege(
2086                    privilege_object.role_id,
2087                    privilege_object.database_id,
2088                    privilege_object.schema_id,
2089                    privilege_object.object_type,
2090                    privilege_acl_item.grantee,
2091                    new_acl_mode.cloned(),
2092                )?;
2093                CatalogState::add_to_audit_log(
2094                    &state.system_configuration,
2095                    oracle_write_ts,
2096                    session,
2097                    tx,
2098                    audit_events,
2099                    variant.into(),
2100                    object_type_to_audit_object_type(privilege_object.object_type),
2101                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2102                        role_id: privilege_object.role_id.to_string(),
2103                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2104                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2105                        grantee_id: privilege_acl_item.grantee.to_string(),
2106                        privileges: privilege_acl_item.acl_mode.to_string(),
2107                    }),
2108                )?;
2109            }
2110            Op::RenameCluster {
2111                id,
2112                name,
2113                to_name,
2114                check_reserved_names,
2115            } => {
2116                if id.is_system() {
2117                    return Err(AdapterError::Catalog(Error::new(
2118                        ErrorKind::ReadOnlyCluster(name.clone()),
2119                    )));
2120                }
2121                if check_reserved_names && is_reserved_name(&to_name) {
2122                    return Err(AdapterError::Catalog(Error::new(
2123                        ErrorKind::ReservedClusterName(to_name),
2124                    )));
2125                }
2126                tx.rename_cluster(id, &name, &to_name)?;
2127                CatalogState::add_to_audit_log(
2128                    &state.system_configuration,
2129                    oracle_write_ts,
2130                    session,
2131                    tx,
2132                    audit_events,
2133                    EventType::Alter,
2134                    ObjectType::Cluster,
2135                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2136                        id: id.to_string(),
2137                        old_name: name.clone(),
2138                        new_name: to_name.clone(),
2139                    }),
2140                )?;
2141                info!("rename cluster {name} to {to_name}");
2142            }
2143            Op::RenameClusterReplica {
2144                cluster_id,
2145                replica_id,
2146                name,
2147                to_name,
2148            } => {
2149                if is_reserved_name(&to_name) {
2150                    return Err(AdapterError::Catalog(Error::new(
2151                        ErrorKind::ReservedReplicaName(to_name),
2152                    )));
2153                }
2154                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2155                CatalogState::add_to_audit_log(
2156                    &state.system_configuration,
2157                    oracle_write_ts,
2158                    session,
2159                    tx,
2160                    audit_events,
2161                    EventType::Alter,
2162                    ObjectType::ClusterReplica,
2163                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2164                        cluster_id: cluster_id.to_string(),
2165                        replica_id: replica_id.to_string(),
2166                        old_name: name.replica.as_str().to_string(),
2167                        new_name: to_name.clone(),
2168                    }),
2169                )?;
2170                info!("rename cluster replica {name} to {to_name}");
2171            }
2172            Op::RenameItem {
2173                id,
2174                to_name,
2175                current_full_name,
2176            } => {
2177                let mut updates = Vec::new();
2178
2179                let entry = state.get_entry(&id);
2180                if let CatalogItem::Type(_) = entry.item() {
2181                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2182                        current_full_name.to_string(),
2183                    ))));
2184                }
2185
2186                if entry.id().is_system() {
2187                    let name = state
2188                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2189                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2190                        name.to_string(),
2191                    ))));
2192                }
2193
2194                let mut to_full_name = current_full_name.clone();
2195                to_full_name.item.clone_from(&to_name);
2196
2197                let mut to_qualified_name = entry.name().clone();
2198                to_qualified_name.item.clone_from(&to_name);
2199
2200                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2201                    id: id.to_string(),
2202                    old_name: Self::full_name_detail(&current_full_name),
2203                    new_name: Self::full_name_detail(&to_full_name),
2204                });
2205                if Self::should_audit_log_item(entry.item()) {
2206                    CatalogState::add_to_audit_log(
2207                        &state.system_configuration,
2208                        oracle_write_ts,
2209                        session,
2210                        tx,
2211                        audit_events,
2212                        EventType::Alter,
2213                        catalog_type_to_audit_object_type(entry.item().typ()),
2214                        details,
2215                    )?;
2216                }
2217
2218                // Rename item itself.
2219                let mut new_entry = entry.clone();
2220                new_entry.name.item.clone_from(&to_name);
2221                new_entry.item = entry
2222                    .item()
2223                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2224                    .map_err(|e| {
2225                        Error::new(ErrorKind::from(AmbiguousRename {
2226                            depender: state
2227                                .resolve_full_name(entry.name(), entry.conn_id())
2228                                .to_string(),
2229                            dependee: state
2230                                .resolve_full_name(entry.name(), entry.conn_id())
2231                                .to_string(),
2232                            message: e,
2233                        }))
2234                    })?;
2235
2236                for id in entry.referenced_by() {
2237                    let dependent_item = state.get_entry(id);
2238                    let mut to_entry = dependent_item.clone();
2239                    to_entry.item = dependent_item
2240                        .item()
2241                        .rename_item_refs(
2242                            current_full_name.clone(),
2243                            to_full_name.item.clone(),
2244                            false,
2245                        )
2246                        .map_err(|e| {
2247                            Error::new(ErrorKind::from(AmbiguousRename {
2248                                depender: state
2249                                    .resolve_full_name(
2250                                        dependent_item.name(),
2251                                        dependent_item.conn_id(),
2252                                    )
2253                                    .to_string(),
2254                                dependee: state
2255                                    .resolve_full_name(entry.name(), entry.conn_id())
2256                                    .to_string(),
2257                                message: e,
2258                            }))
2259                        })?;
2260
2261                    if !to_entry.item().is_temporary() {
2262                        tx.update_item(*id, to_entry.into())?;
2263                    } else {
2264                        temporary_item_updates
2265                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2266                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2267                    }
2268                    updates.push(*id);
2269                }
2270                if !new_entry.item().is_temporary() {
2271                    tx.update_item(id, new_entry.into())?;
2272                } else {
2273                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2274                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2275                }
2276
2277                updates.push(id);
2278                for id in updates {
2279                    Self::log_update(state, &id);
2280                }
2281            }
2282            Op::RenameSchema {
2283                database_spec,
2284                schema_spec,
2285                new_name,
2286                check_reserved_names,
2287            } => {
2288                if check_reserved_names && is_reserved_name(&new_name) {
2289                    return Err(AdapterError::Catalog(Error::new(
2290                        ErrorKind::ReservedSchemaName(new_name),
2291                    )));
2292                }
2293
2294                let conn_id = session
2295                    .map(|session| session.conn_id())
2296                    .unwrap_or(&SYSTEM_CONN_ID);
2297
2298                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2299                let cur_name = schema.name().schema.clone();
2300
2301                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2302                    return Err(AdapterError::Catalog(Error::new(
2303                        ErrorKind::AmbientSchemaRename(cur_name),
2304                    )));
2305                };
2306                let database = state.get_database(&database_id);
2307                let database_name = &database.name;
2308
2309                let mut updates = Vec::new();
2310                let mut items_to_update = BTreeMap::new();
2311
2312                let mut update_item = |id| {
2313                    if items_to_update.contains_key(id) {
2314                        return Ok(());
2315                    }
2316
2317                    let entry = state.get_entry(id);
2318
2319                    // Update our item.
2320                    let mut new_entry = entry.clone();
2321                    new_entry.item = entry
2322                        .item
2323                        .rename_schema_refs(database_name, &cur_name, &new_name)
2324                        .map_err(|(s, _i)| {
2325                            Error::new(ErrorKind::from(AmbiguousRename {
2326                                depender: state
2327                                    .resolve_full_name(entry.name(), entry.conn_id())
2328                                    .to_string(),
2329                                dependee: format!("{database_name}.{cur_name}"),
2330                                message: format!("ambiguous reference to schema named {s}"),
2331                            }))
2332                        })?;
2333
2334                    // Queue updates for Catalog storage and Builtin Tables.
2335                    if !new_entry.item().is_temporary() {
2336                        items_to_update.insert(*id, new_entry.into());
2337                    } else {
2338                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2339                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2340                    }
2341                    updates.push(id);
2342
2343                    Ok::<_, AdapterError>(())
2344                };
2345
2346                // Update all of the items in the schema.
2347                for (_name, item_id) in &schema.items {
2348                    // Update the item itself.
2349                    update_item(item_id)?;
2350
2351                    // Update everything that depends on this item.
2352                    for id in state.get_entry(item_id).referenced_by() {
2353                        update_item(id)?;
2354                    }
2355                }
2356                // Note: When updating the transaction it's very important that we update the
2357                // items as a whole group, otherwise we exhibit quadratic behavior.
2358                tx.update_items(items_to_update)?;
2359
2360                // Renaming temporary schemas is not supported.
2361                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2362                    let schema_name = schema.name().schema.clone();
2363                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2364                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2365                    )));
2366                };
2367
2368                // Add an entry to the audit log.
2369                let database_name = database_spec
2370                    .id()
2371                    .map(|id| state.get_database(&id).name.clone());
2372                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2373                    id: schema_id.to_string(),
2374                    old_name: schema.name().schema.clone(),
2375                    new_name: new_name.clone(),
2376                    database_name,
2377                });
2378                CatalogState::add_to_audit_log(
2379                    &state.system_configuration,
2380                    oracle_write_ts,
2381                    session,
2382                    tx,
2383                    audit_events,
2384                    EventType::Alter,
2385                    mz_audit_log::ObjectType::Schema,
2386                    details,
2387                )?;
2388
2389                // Update the schema itself.
2390                let mut new_schema = schema.clone();
2391                new_schema.name.schema.clone_from(&new_name);
2392                tx.update_schema(schema_id, new_schema.into())?;
2393
2394                for id in updates {
2395                    Self::log_update(state, id);
2396                }
2397            }
2398            Op::UpdateOwner { id, new_owner } => {
2399                let conn_id = session
2400                    .map(|session| session.conn_id())
2401                    .unwrap_or(&SYSTEM_CONN_ID);
2402                let old_owner = state
2403                    .get_owner_id(&id, conn_id)
2404                    .expect("cannot update the owner of an object without an owner");
2405                match &id {
2406                    ObjectId::Cluster(id) => {
2407                        let mut cluster = state.get_cluster(*id).clone();
2408                        if id.is_system() {
2409                            return Err(AdapterError::Catalog(Error::new(
2410                                ErrorKind::ReadOnlyCluster(cluster.name),
2411                            )));
2412                        }
2413                        Self::update_privilege_owners(
2414                            &mut cluster.privileges,
2415                            cluster.owner_id,
2416                            new_owner,
2417                        );
2418                        cluster.owner_id = new_owner;
2419                        tx.update_cluster(*id, cluster.into())?;
2420                    }
2421                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2422                        let cluster = state.get_cluster(*cluster_id);
2423                        let mut replica = cluster
2424                            .replica(*replica_id)
2425                            .expect("catalog out of sync")
2426                            .clone();
2427                        if replica_id.is_system() {
2428                            return Err(AdapterError::Catalog(Error::new(
2429                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2430                            )));
2431                        }
2432                        replica.owner_id = new_owner;
2433                        tx.update_cluster_replica(*replica_id, replica.into())?;
2434                    }
2435                    ObjectId::Database(id) => {
2436                        let mut database = state.get_database(id).clone();
2437                        if id.is_system() {
2438                            return Err(AdapterError::Catalog(Error::new(
2439                                ErrorKind::ReadOnlyDatabase(database.name),
2440                            )));
2441                        }
2442                        Self::update_privilege_owners(
2443                            &mut database.privileges,
2444                            database.owner_id,
2445                            new_owner,
2446                        );
2447                        database.owner_id = new_owner;
2448                        tx.update_database(*id, database.clone().into())?;
2449                    }
2450                    ObjectId::Schema((database_spec, schema_spec)) => {
2451                        let schema_id: SchemaId = schema_spec.clone().into();
2452                        let mut schema = state
2453                            .get_schema(database_spec, schema_spec, conn_id)
2454                            .clone();
2455                        if schema_id.is_system() {
2456                            let name = schema.name();
2457                            let full_name = state.resolve_full_schema_name(name);
2458                            return Err(AdapterError::Catalog(Error::new(
2459                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2460                            )));
2461                        }
2462                        Self::update_privilege_owners(
2463                            &mut schema.privileges,
2464                            schema.owner_id,
2465                            new_owner,
2466                        );
2467                        schema.owner_id = new_owner;
2468                        tx.update_schema(schema_id, schema.into())?;
2469                    }
2470                    ObjectId::Item(id) => {
2471                        let entry = state.get_entry(id);
2472                        let mut new_entry = entry.clone();
2473                        if id.is_system() {
2474                            let full_name = state.resolve_full_name(
2475                                new_entry.name(),
2476                                session.map(|session| session.conn_id()),
2477                            );
2478                            return Err(AdapterError::Catalog(Error::new(
2479                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2480                            )));
2481                        }
2482                        Self::update_privilege_owners(
2483                            &mut new_entry.privileges,
2484                            new_entry.owner_id,
2485                            new_owner,
2486                        );
2487                        new_entry.owner_id = new_owner;
2488                        if !new_entry.item().is_temporary() {
2489                            tx.update_item(*id, new_entry.into())?;
2490                        } else {
2491                            temporary_item_updates
2492                                .push((entry.clone().into(), StateDiff::Retraction));
2493                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2494                        }
2495                    }
2496                    ObjectId::NetworkPolicy(id) => {
2497                        let mut policy = state.get_network_policy(id).clone();
2498                        if id.is_system() {
2499                            return Err(AdapterError::Catalog(Error::new(
2500                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2501                            )));
2502                        }
2503                        Self::update_privilege_owners(
2504                            &mut policy.privileges,
2505                            policy.owner_id,
2506                            new_owner,
2507                        );
2508                        policy.owner_id = new_owner;
2509                        tx.update_network_policy(*id, policy.into())?;
2510                    }
2511                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2512                }
2513                let object_type = state.get_object_type(&id);
2514                CatalogState::add_to_audit_log(
2515                    &state.system_configuration,
2516                    oracle_write_ts,
2517                    session,
2518                    tx,
2519                    audit_events,
2520                    EventType::Alter,
2521                    object_type_to_audit_object_type(object_type),
2522                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2523                        object_id: id.to_string(),
2524                        old_owner_id: old_owner.to_string(),
2525                        new_owner_id: new_owner.to_string(),
2526                    }),
2527                )?;
2528            }
2529            Op::UpdateClusterConfig { id, name, config } => {
2530                let mut cluster = state.get_cluster(id).clone();
2531                cluster.config = config;
2532                tx.update_cluster(id, cluster.into())?;
2533                info!("update cluster {}", name);
2534
2535                CatalogState::add_to_audit_log(
2536                    &state.system_configuration,
2537                    oracle_write_ts,
2538                    session,
2539                    tx,
2540                    audit_events,
2541                    EventType::Alter,
2542                    ObjectType::Cluster,
2543                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2544                        id: id.to_string(),
2545                        name,
2546                    }),
2547                )?;
2548            }
2549            Op::UpdateClusterReplicaConfig {
2550                replica_id,
2551                cluster_id,
2552                config,
2553            } => {
2554                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2555                info!("update replica {}", replica.name);
2556                tx.update_cluster_replica(
2557                    replica_id,
2558                    mz_catalog::durable::ClusterReplica {
2559                        cluster_id,
2560                        replica_id,
2561                        name: replica.name.clone(),
2562                        config: config.clone().into(),
2563                        owner_id: replica.owner_id,
2564                    },
2565                )?;
2566            }
2567            Op::UpdateItem { id, name, to_item } => {
2568                let mut entry = state.get_entry(&id).clone();
2569                entry.name = name.clone();
2570                entry.item = to_item.clone();
2571                tx.update_item(id, entry.into())?;
2572
2573                if Self::should_audit_log_item(&to_item) {
2574                    let mut full_name = Self::full_name_detail(
2575                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2576                    );
2577                    full_name.item = name.item;
2578
2579                    CatalogState::add_to_audit_log(
2580                        &state.system_configuration,
2581                        oracle_write_ts,
2582                        session,
2583                        tx,
2584                        audit_events,
2585                        EventType::Alter,
2586                        catalog_type_to_audit_object_type(to_item.typ()),
2587                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2588                            id: id.to_string(),
2589                            name: full_name,
2590                        }),
2591                    )?;
2592                }
2593
2594                Self::log_update(state, &id);
2595            }
2596            Op::UpdateSystemConfiguration { name, value } => {
2597                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2598                tx.upsert_system_config(&name, parsed_value.clone())?;
2599                // This mirrors some "system vars" into the catalog storage
2600                // "config" collection so that we can toggle the flag with
2601                // Launch Darkly, but use it in boot before Launch Darkly is
2602                // available.
2603                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2604                    let with_0dt_deployment_max_wait =
2605                        Duration::parse(VarInput::Flat(&parsed_value))
2606                            .expect("parsing succeeded above");
2607                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2608                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2609                    let with_0dt_deployment_ddl_check_interval =
2610                        Duration::parse(VarInput::Flat(&parsed_value))
2611                            .expect("parsing succeeded above");
2612                    tx.set_0dt_deployment_ddl_check_interval(
2613                        with_0dt_deployment_ddl_check_interval,
2614                    )?;
2615                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2616                    let panic_after_timeout =
2617                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2618                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2619                }
2620
2621                CatalogState::add_to_audit_log(
2622                    &state.system_configuration,
2623                    oracle_write_ts,
2624                    session,
2625                    tx,
2626                    audit_events,
2627                    EventType::Alter,
2628                    ObjectType::System,
2629                    EventDetails::SetV1(mz_audit_log::SetV1 {
2630                        name,
2631                        value: Some(value.borrow().to_vec().join(", ")),
2632                    }),
2633                )?;
2634            }
2635            Op::ResetSystemConfiguration { name } => {
2636                tx.remove_system_config(&name);
2637                // This mirrors some "system vars" into the catalog storage
2638                // "config" collection so that we can toggle the flag with
2639                // Launch Darkly, but use it in boot before Launch Darkly is
2640                // available.
2641                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2642                    tx.reset_0dt_deployment_max_wait()?;
2643                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2644                    tx.reset_0dt_deployment_ddl_check_interval()?;
2645                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2646                    tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2647                }
2648
2649                CatalogState::add_to_audit_log(
2650                    &state.system_configuration,
2651                    oracle_write_ts,
2652                    session,
2653                    tx,
2654                    audit_events,
2655                    EventType::Alter,
2656                    ObjectType::System,
2657                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2658                )?;
2659            }
2660            Op::ResetAllSystemConfiguration => {
2661                tx.clear_system_configs();
2662                tx.reset_0dt_deployment_max_wait()?;
2663                tx.reset_0dt_deployment_ddl_check_interval()?;
2664                tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2665
2666                CatalogState::add_to_audit_log(
2667                    &state.system_configuration,
2668                    oracle_write_ts,
2669                    session,
2670                    tx,
2671                    audit_events,
2672                    EventType::Alter,
2673                    ObjectType::System,
2674                    EventDetails::ResetAllV1,
2675                )?;
2676            }
2677            Op::WeirdStorageUsageUpdates {
2678                object_id,
2679                size_bytes,
2680                collection_timestamp,
2681            } => {
2682                let id = tx.allocate_storage_usage_ids()?;
2683                let metric =
2684                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2685                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2686                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2687                weird_builtin_table_update = Some(builtin_table_update);
2688            }
2689            Op::InjectAuditEvents { events } => {
2690                for event in events {
2691                    let id = tx.allocate_audit_log_id()?;
2692                    let ev = VersionedEvent::new(
2693                        id,
2694                        event.event_type,
2695                        event.object_type,
2696                        event.details,
2697                        event.user,
2698                        oracle_write_ts.into(),
2699                    );
2700                    audit_events.push(ev.clone());
2701                    tx.insert_audit_log_event(ev);
2702                }
2703            }
2704        };
2705        Ok((weird_builtin_table_update, temporary_item_updates))
2706    }
2707
2708    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2709        let entry = state.get_entry(id);
2710        info!(
2711            "update {} {} ({})",
2712            entry.item_type(),
2713            state.resolve_full_name(entry.name(), entry.conn_id()),
2714            id
2715        );
2716    }
2717
2718    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2719    /// implementation:
2720    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2721    fn update_privilege_owners(
2722        privileges: &mut PrivilegeMap,
2723        old_owner: RoleId,
2724        new_owner: RoleId,
2725    ) {
2726        // TODO(jkosh44) Would be nice not to clone every privilege.
2727        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2728
2729        let mut new_present = false;
2730        for privilege in flat_privileges.iter_mut() {
2731            // Old owner's granted privilege are updated to be granted by the new
2732            // owner.
2733            if privilege.grantor == old_owner {
2734                privilege.grantor = new_owner;
2735            } else if privilege.grantor == new_owner {
2736                new_present = true;
2737            }
2738            // Old owner's privileges is given to the new owner.
2739            if privilege.grantee == old_owner {
2740                privilege.grantee = new_owner;
2741            } else if privilege.grantee == new_owner {
2742                new_present = true;
2743            }
2744        }
2745
2746        // If the old privilege list contained references to the new owner, we may
2747        // have created duplicate entries. Here we try and consolidate them. This
2748        // is inspired by PostgreSQL's algorithm but not identical.
2749        if new_present {
2750            // Group privileges by (grantee, grantor).
2751            let privilege_map: BTreeMap<_, Vec<_>> =
2752                flat_privileges
2753                    .into_iter()
2754                    .fold(BTreeMap::new(), |mut accum, privilege| {
2755                        accum
2756                            .entry((privilege.grantee, privilege.grantor))
2757                            .or_default()
2758                            .push(privilege);
2759                        accum
2760                    });
2761
2762            // Consolidate and update all privileges.
2763            flat_privileges = privilege_map
2764                .into_iter()
2765                .map(|((grantee, grantor), values)|
2766                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2767                    values.into_iter().fold(
2768                        MzAclItem::empty(grantee, grantor),
2769                        |mut accum, mz_aclitem| {
2770                            accum.acl_mode =
2771                                accum.acl_mode.union(mz_aclitem.acl_mode);
2772                            accum
2773                        },
2774                    ))
2775                .collect();
2776        }
2777
2778        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2779    }
2780}
2781
2782/// Prepare the given transaction for replacing a catalog item with a new version.
2783///
2784/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2785/// dependent objects to refer to that new ID (at a previous version).
2786///
2787/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2788/// for catalog items. We currently think that there are no use cases that require this assumption,
2789/// but no way to know for sure.
2790fn tx_replace_item(
2791    tx: &mut Transaction<'_>,
2792    state: &CatalogState,
2793    id: CatalogItemId,
2794    new_entry: CatalogEntry,
2795) -> Result<(), AdapterError> {
2796    let new_id = new_entry.id;
2797
2798    // Rewrite dependent objects to point to the new ID.
2799    for use_id in new_entry.referenced_by() {
2800        // The dependent might be dropped in the same tx, so check.
2801        if tx.get_item(use_id).is_none() {
2802            continue;
2803        }
2804
2805        let mut dependent = state.get_entry(use_id).clone();
2806        dependent.item = dependent.item.replace_item_refs(id, new_id);
2807        tx.update_item(*use_id, dependent.into())?;
2808    }
2809
2810    // Move comments to the new ID.
2811    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2812    let new_comment_id = new_entry.comment_object_id();
2813    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2814        tx.drop_comments(&[old_comment_id].into())?;
2815        for (sub, comment) in comments {
2816            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2817        }
2818    }
2819
2820    let mz_catalog::durable::Item {
2821        id: _,
2822        oid,
2823        global_id,
2824        schema_id,
2825        name,
2826        create_sql,
2827        owner_id,
2828        privileges,
2829        extra_versions,
2830    } = new_entry.into();
2831
2832    tx.remove_item(id)?;
2833    tx.insert_item(
2834        new_id,
2835        oid,
2836        global_id,
2837        schema_id,
2838        &name,
2839        create_sql,
2840        owner_id,
2841        privileges,
2842        extra_versions,
2843    )?;
2844
2845    Ok(())
2846}
2847
2848/// Generate audit events for a replacement apply operation.
2849fn apply_replacement_audit_events(
2850    state: &CatalogState,
2851    target: &CatalogEntry,
2852    replacement: &CatalogEntry,
2853) -> Vec<(EventType, EventDetails)> {
2854    let mut events = Vec::new();
2855
2856    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2857    let target_id_name = IdFullNameV1 {
2858        id: target.id().to_string(),
2859        name: Catalog::full_name_detail(&target_name),
2860    };
2861    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2862    let replacement_id_name = IdFullNameV1 {
2863        id: replacement.id().to_string(),
2864        name: Catalog::full_name_detail(&replacement_name),
2865    };
2866
2867    if Catalog::should_audit_log_item(&replacement.item) {
2868        events.push((
2869            EventType::Drop,
2870            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2871        ));
2872    }
2873
2874    if Catalog::should_audit_log_item(&target.item) {
2875        events.push((
2876            EventType::Alter,
2877            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2878                target: target_id_name.clone(),
2879                replacement: replacement_id_name,
2880            }),
2881        ));
2882
2883        if let Some(old_cluster_id) = target.cluster_id()
2884            && let Some(new_cluster_id) = replacement.cluster_id()
2885            && old_cluster_id != new_cluster_id
2886        {
2887            // When the replacement is applied, the target takes on the ID of the replacement, so
2888            // we should use that ID for subsequent events.
2889            events.push((
2890                EventType::Alter,
2891                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2892                    id: replacement.id().to_string(),
2893                    name: target_id_name.name,
2894                    old_cluster_id: old_cluster_id.to_string(),
2895                    new_cluster_id: new_cluster_id.to_string(),
2896                }),
2897            ));
2898        }
2899    }
2900
2901    events
2902}
2903
2904/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2905///
2906/// Note: Previously we used to omit a single `Op::DropObject` for every object
2907/// we needed to drop. But removing a batch of objects from a durable Catalog
2908/// Transaction is O(n) where `n` is the number of objects that exist in the
2909/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2910/// `DROP ... CASCADE` statement.
2911#[derive(Debug, Default)]
2912pub(crate) struct ObjectsToDrop {
2913    pub comments: BTreeSet<CommentObjectId>,
2914    pub databases: BTreeSet<DatabaseId>,
2915    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2916    pub clusters: BTreeSet<ClusterId>,
2917    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2918    pub roles: BTreeSet<RoleId>,
2919    pub items: Vec<CatalogItemId>,
2920    pub network_policies: BTreeSet<NetworkPolicyId>,
2921}
2922
2923impl ObjectsToDrop {
2924    pub fn generate(
2925        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2926        state: &CatalogState,
2927        session: Option<&ConnMeta>,
2928    ) -> Result<Self, AdapterError> {
2929        let mut delta = ObjectsToDrop::default();
2930
2931        for drop_object_info in drop_object_infos {
2932            delta.add_item(drop_object_info, state, session)?;
2933        }
2934
2935        Ok(delta)
2936    }
2937
2938    fn add_item(
2939        &mut self,
2940        drop_object_info: DropObjectInfo,
2941        state: &CatalogState,
2942        session: Option<&ConnMeta>,
2943    ) -> Result<(), AdapterError> {
2944        self.comments
2945            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2946
2947        match drop_object_info {
2948            DropObjectInfo::Database(database_id) => {
2949                let database = &state.database_by_id[&database_id];
2950                if database_id.is_system() {
2951                    return Err(AdapterError::Catalog(Error::new(
2952                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2953                    )));
2954                }
2955
2956                self.databases.insert(database_id);
2957            }
2958            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2959                let schema = state.get_schema(
2960                    &database_spec,
2961                    &schema_spec,
2962                    session
2963                        .map(|session| session.conn_id())
2964                        .unwrap_or(&SYSTEM_CONN_ID),
2965                );
2966                let schema_id: SchemaId = schema_spec.into();
2967                if schema_id.is_system() {
2968                    let name = schema.name();
2969                    let full_name = state.resolve_full_schema_name(name);
2970                    return Err(AdapterError::Catalog(Error::new(
2971                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2972                    )));
2973                }
2974
2975                self.schemas.insert(schema_spec, database_spec);
2976            }
2977            DropObjectInfo::Role(role_id) => {
2978                let name = state.get_role(&role_id).name().to_string();
2979                if role_id.is_system() || role_id.is_predefined() {
2980                    return Err(AdapterError::Catalog(Error::new(
2981                        ErrorKind::ReservedRoleName(name.clone()),
2982                    )));
2983                }
2984                state.ensure_not_reserved_role(&role_id)?;
2985
2986                self.roles.insert(role_id);
2987            }
2988            DropObjectInfo::Cluster(cluster_id) => {
2989                let cluster = state.get_cluster(cluster_id);
2990                let name = &cluster.name;
2991                if cluster_id.is_system() {
2992                    return Err(AdapterError::Catalog(Error::new(
2993                        ErrorKind::ReadOnlyCluster(name.clone()),
2994                    )));
2995                }
2996
2997                self.clusters.insert(cluster_id);
2998            }
2999            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3000                let cluster = state.get_cluster(cluster_id);
3001                let replica = cluster.replica(replica_id).expect("Must exist");
3002
3003                self.replicas
3004                    .insert(replica.replica_id, (cluster.id, reason));
3005
3006                // Implicitly drop materialized views that target this replica.
3007                // When the target replica is gone, no replica advances the
3008                // persist shard's upper frontier, causing reads to hang.
3009                //
3010                // Also cascade to anything depending on the implicitly-dropped
3011                // MV so we don't leave dangling references in the catalog.
3012                // Plan-driven drops already include these via
3013                // `cluster_replica_dependents`; this branch handles internal
3014                // callers that build `Op::DropObjects` directly without going
3015                // through the plan stage.
3016                let mut seen: BTreeSet<ObjectId> =
3017                    self.items.iter().copied().map(ObjectId::Item).collect();
3018                for item_id in &cluster.bound_objects {
3019                    let entry = state.get_entry(item_id);
3020                    if let CatalogItem::MaterializedView(mv) = entry.item()
3021                        && mv.target_replica == Some(replica_id)
3022                        && !seen.contains(&ObjectId::Item(*item_id))
3023                    {
3024                        info!(
3025                            "implicitly dropping materialized view {} because target replica was dropped",
3026                            entry.name().item,
3027                        );
3028                        for dep in state.item_dependents(*item_id, &mut seen) {
3029                            if let ObjectId::Item(dep_id) = dep {
3030                                self.items.push(dep_id);
3031                            }
3032                        }
3033                    }
3034                }
3035            }
3036            DropObjectInfo::Item(item_id) => {
3037                let entry = state.get_entry(&item_id);
3038                if item_id.is_system() {
3039                    let name = entry.name();
3040                    let full_name =
3041                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
3042                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3043                        full_name.to_string(),
3044                    ))));
3045                }
3046
3047                self.items.push(item_id);
3048            }
3049            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3050                let policy = state.get_network_policy(&network_policy_id);
3051                let name = &policy.name;
3052                if network_policy_id.is_system() {
3053                    return Err(AdapterError::Catalog(Error::new(
3054                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3055                    )));
3056                }
3057
3058                self.network_policies.insert(network_policy_id);
3059            }
3060        }
3061
3062        Ok(())
3063    }
3064}
3065
3066#[cfg(test)]
3067mod tests {
3068    use mz_catalog::SYSTEM_CONN_ID;
3069    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3070    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3071    use mz_repr::role_id::RoleId;
3072    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3073    use mz_sql::DEFAULT_SCHEMA;
3074    use mz_sql::catalog::CatalogDatabase;
3075    use mz_sql::names::{
3076        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3077    };
3078    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3079
3080    use crate::catalog::{Catalog, Op};
3081    use crate::session::DEFAULT_DATABASE_NAME;
3082
3083    #[mz_ore::test]
3084    fn test_update_privilege_owners() {
3085        let old_owner = RoleId::User(1);
3086        let new_owner = RoleId::User(2);
3087        let other_role = RoleId::User(3);
3088
3089        // older owner exists as grantor.
3090        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3091            MzAclItem {
3092                grantee: other_role,
3093                grantor: old_owner,
3094                acl_mode: AclMode::UPDATE,
3095            },
3096            MzAclItem {
3097                grantee: other_role,
3098                grantor: new_owner,
3099                acl_mode: AclMode::SELECT,
3100            },
3101        ]);
3102        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3103        assert_eq!(1, privileges.all_values().count());
3104        assert_eq!(
3105            vec![MzAclItem {
3106                grantee: other_role,
3107                grantor: new_owner,
3108                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3109            }],
3110            privileges.all_values_owned().collect::<Vec<_>>()
3111        );
3112
3113        // older owner exists as grantee.
3114        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3115            MzAclItem {
3116                grantee: old_owner,
3117                grantor: other_role,
3118                acl_mode: AclMode::UPDATE,
3119            },
3120            MzAclItem {
3121                grantee: new_owner,
3122                grantor: other_role,
3123                acl_mode: AclMode::SELECT,
3124            },
3125        ]);
3126        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3127        assert_eq!(1, privileges.all_values().count());
3128        assert_eq!(
3129            vec![MzAclItem {
3130                grantee: new_owner,
3131                grantor: other_role,
3132                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3133            }],
3134            privileges.all_values_owned().collect::<Vec<_>>()
3135        );
3136
3137        // older owner exists as grantee and grantor.
3138        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3139            MzAclItem {
3140                grantee: old_owner,
3141                grantor: old_owner,
3142                acl_mode: AclMode::UPDATE,
3143            },
3144            MzAclItem {
3145                grantee: new_owner,
3146                grantor: new_owner,
3147                acl_mode: AclMode::SELECT,
3148            },
3149        ]);
3150        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3151        assert_eq!(1, privileges.all_values().count());
3152        assert_eq!(
3153            vec![MzAclItem {
3154                grantee: new_owner,
3155                grantor: new_owner,
3156                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3157            }],
3158            privileges.all_values_owned().collect::<Vec<_>>()
3159        );
3160    }
3161
3162    /// Verifies that `transact_incremental_dry_run` processes only new ops
3163    /// against the accumulated state, not all ops from scratch. Two paths are
3164    /// compared:
3165    ///   - Incremental: two separate calls, each with one op
3166    ///   - All-at-once: one call with both ops
3167    /// Both must produce equivalent catalog state.
3168    #[mz_ore::test(tokio::test)]
3169    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3170    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3171        Catalog::with_debug(|catalog| async move {
3172            // Resolve default database and schema.
3173            let database = catalog
3174                .resolve_database(DEFAULT_DATABASE_NAME)
3175                .expect("default database");
3176            let database_name = database.name.clone();
3177            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3178            let schema = catalog
3179                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3180                .expect("default schema");
3181            let schema_name = schema.name.schema.clone();
3182            let schema_spec = schema.id.clone();
3183
3184            // Allocate IDs for two tables.
3185            let (id_t1, global_id_t1) = catalog
3186                .allocate_user_id_for_test()
3187                .await
3188                .expect("allocate id for t1");
3189            let (id_t2, global_id_t2) = catalog
3190                .allocate_user_id_for_test()
3191                .await
3192                .expect("allocate id for t2");
3193
3194            let oracle_write_ts = catalog.current_upper().await;
3195
3196            // Build two CreateItem ops.
3197            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3198                id,
3199                name: QualifiedItemName {
3200                    qualifiers: ItemQualifiers {
3201                        database_spec: database_spec.clone(),
3202                        schema_spec: schema_spec.clone(),
3203                    },
3204                    item: name.to_string(),
3205                },
3206                item: CatalogItem::Table(Table {
3207                    create_sql: Some(format!(
3208                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3209                    )),
3210                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3211                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3212                    conn_id: None,
3213                    resolved_ids: ResolvedIds::empty(),
3214                    custom_logical_compaction_window: None,
3215                    is_retained_metrics_object: false,
3216                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3217                }),
3218                owner_id: MZ_SYSTEM_ROLE_ID,
3219            };
3220
3221            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3222            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3223
3224            let base_state = catalog.state().clone();
3225
3226            // --- Path A: Incremental (two separate dry-run calls) ---
3227
3228            // First call: only op_t1, no previous snapshot.
3229            let (state_after_t1, snapshot_after_t1) = catalog
3230                .transact_incremental_dry_run(
3231                    &base_state,
3232                    vec![op_t1.clone()],
3233                    None,
3234                    None,
3235                    oracle_write_ts,
3236                )
3237                .await
3238                .expect("first dry run");
3239
3240            // After first dry run: t1 exists, t2 does not.
3241            assert!(
3242                state_after_t1.try_get_entry(&id_t1).is_some(),
3243                "t1 should exist after first dry run"
3244            );
3245            assert_eq!(
3246                state_after_t1
3247                    .try_get_entry(&id_t1)
3248                    .expect("t1 entry")
3249                    .name()
3250                    .item,
3251                "t1"
3252            );
3253            assert!(
3254                state_after_t1.try_get_entry(&id_t2).is_none(),
3255                "t2 should NOT exist after first dry run"
3256            );
3257
3258            // Second call: only op_t2, using state/snapshot from first call.
3259            let (state_incremental, _) = catalog
3260                .transact_incremental_dry_run(
3261                    &state_after_t1,
3262                    vec![op_t2.clone()],
3263                    None,
3264                    Some(snapshot_after_t1),
3265                    oracle_write_ts,
3266                )
3267                .await
3268                .expect("second dry run");
3269
3270            // After second dry run: both t1 and t2 exist.
3271            assert!(
3272                state_incremental.try_get_entry(&id_t1).is_some(),
3273                "t1 should exist in incremental result"
3274            );
3275            assert!(
3276                state_incremental.try_get_entry(&id_t2).is_some(),
3277                "t2 should exist in incremental result"
3278            );
3279
3280            // --- Path B: All-at-once (single dry-run call with both ops) ---
3281
3282            let (state_all_at_once, _) = catalog
3283                .transact_incremental_dry_run(
3284                    &base_state,
3285                    vec![op_t1.clone(), op_t2.clone()],
3286                    None,
3287                    None,
3288                    oracle_write_ts,
3289                )
3290                .await
3291                .expect("all-at-once dry run");
3292
3293            assert!(
3294                state_all_at_once.try_get_entry(&id_t1).is_some(),
3295                "t1 should exist in all-at-once result"
3296            );
3297            assert!(
3298                state_all_at_once.try_get_entry(&id_t2).is_some(),
3299                "t2 should exist in all-at-once result"
3300            );
3301
3302            // --- Compare: both paths produce equivalent items ---
3303
3304            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3305            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3306            assert_eq!(inc_t1.name(), all_t1.name());
3307            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3308
3309            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3310            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3311            assert_eq!(inc_t2.name(), all_t2.name());
3312            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3313
3314            catalog.expire().await;
3315        })
3316        .await
3317    }
3318}