Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_ore::now::EpochMillis;
42use mz_persist_types::ShardId;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
44use mz_repr::network_policy_id::NetworkPolicyId;
45use mz_repr::optimize::OptimizerFeatures;
46use mz_repr::role_id::RoleId;
47use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
48use mz_sql::ast::RawDataType;
49use mz_sql::catalog::{
50    AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
51    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
52    DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
53    RoleVars,
54};
55use mz_sql::names::{
56    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
57    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{NetworkPolicyRule, PlanError};
60use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
61use mz_sql::session::vars::OwnedVarInput;
62use mz_sql::session::vars::{Value as VarValue, VarInput};
63use mz_sql::{DEFAULT_SCHEMA, rbac};
64use mz_sql_parser::ast::{QualifiedReplica, Value};
65use mz_storage_client::storage_collections::StorageCollections;
66use serde::{Deserialize, Serialize};
67use tracing::{info, trace};
68
69use crate::AdapterError;
70use crate::catalog::state::LocalExpressionCache;
71use crate::catalog::{
72    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
73    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
74    is_reserved_role_name, object_type_to_audit_object_type,
75    system_object_type_to_audit_object_type,
76};
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82/// A manually injected audit event.
83///
84/// Matches [`mz_audit_log::EventV1`], but without the `id` and `occurred_at` fields -- both are
85/// filled in automatically.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88    pub event_type: EventType,
89    pub object_type: ObjectType,
90    pub details: EventDetails,
91    pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96    AlterRetainHistory {
97        id: CatalogItemId,
98        value: Option<Value>,
99        window: CompactionWindow,
100    },
101    AlterSourceTimestampInterval {
102        id: CatalogItemId,
103        value: Option<Value>,
104        interval: Duration,
105    },
106    AlterRole {
107        id: RoleId,
108        name: String,
109        attributes: RoleAttributesRaw,
110        nopassword: bool,
111        vars: RoleVars,
112    },
113    AlterNetworkPolicy {
114        id: NetworkPolicyId,
115        rules: Vec<NetworkPolicyRule>,
116        name: String,
117        owner_id: RoleId,
118    },
119    AlterAddColumn {
120        id: CatalogItemId,
121        new_global_id: GlobalId,
122        name: ColumnName,
123        typ: SqlColumnType,
124        sql: RawDataType,
125    },
126    AlterMaterializedViewApplyReplacement {
127        id: CatalogItemId,
128        replacement_id: CatalogItemId,
129    },
130    CreateDatabase {
131        name: String,
132        owner_id: RoleId,
133    },
134    CreateSchema {
135        database_id: ResolvedDatabaseSpecifier,
136        schema_name: String,
137        owner_id: RoleId,
138    },
139    CreateRole {
140        name: String,
141        attributes: RoleAttributesRaw,
142    },
143    CreateCluster {
144        id: ClusterId,
145        name: String,
146        introspection_sources: Vec<&'static BuiltinLog>,
147        owner_id: RoleId,
148        config: ClusterConfig,
149    },
150    CreateClusterReplica {
151        cluster_id: ClusterId,
152        name: String,
153        config: ReplicaConfig,
154        owner_id: RoleId,
155        reason: ReplicaCreateDropReason,
156    },
157    CreateItem {
158        id: CatalogItemId,
159        name: QualifiedItemName,
160        item: CatalogItem,
161        owner_id: RoleId,
162    },
163    CreateNetworkPolicy {
164        rules: Vec<NetworkPolicyRule>,
165        name: String,
166        owner_id: RoleId,
167    },
168    Comment {
169        object_id: CommentObjectId,
170        sub_component: Option<usize>,
171        comment: Option<String>,
172    },
173    DropObjects(Vec<DropObjectInfo>),
174    GrantRole {
175        role_id: RoleId,
176        member_id: RoleId,
177        grantor_id: RoleId,
178    },
179    RenameCluster {
180        id: ClusterId,
181        name: String,
182        to_name: String,
183        check_reserved_names: bool,
184    },
185    RenameClusterReplica {
186        cluster_id: ClusterId,
187        replica_id: ReplicaId,
188        name: QualifiedReplica,
189        to_name: String,
190    },
191    RenameItem {
192        id: CatalogItemId,
193        current_full_name: FullItemName,
194        to_name: String,
195    },
196    RenameSchema {
197        database_spec: ResolvedDatabaseSpecifier,
198        schema_spec: SchemaSpecifier,
199        new_name: String,
200        check_reserved_names: bool,
201    },
202    UpdateOwner {
203        id: ObjectId,
204        new_owner: RoleId,
205    },
206    UpdatePrivilege {
207        target_id: SystemObjectId,
208        privilege: MzAclItem,
209        variant: UpdatePrivilegeVariant,
210    },
211    UpdateDefaultPrivilege {
212        privilege_object: DefaultPrivilegeObject,
213        privilege_acl_item: DefaultPrivilegeAclItem,
214        variant: UpdatePrivilegeVariant,
215    },
216    RevokeRole {
217        role_id: RoleId,
218        member_id: RoleId,
219        grantor_id: RoleId,
220    },
221    UpdateClusterConfig {
222        id: ClusterId,
223        name: String,
224        config: ClusterConfig,
225    },
226    UpdateClusterReplicaConfig {
227        cluster_id: ClusterId,
228        replica_id: ReplicaId,
229        config: ReplicaConfig,
230    },
231    UpdateItem {
232        id: CatalogItemId,
233        name: QualifiedItemName,
234        to_item: CatalogItem,
235    },
236    UpdateSourceReferences {
237        source_id: CatalogItemId,
238        references: SourceReferences,
239    },
240    UpdateSystemConfiguration {
241        name: String,
242        value: OwnedVarInput,
243    },
244    ResetSystemConfiguration {
245        name: String,
246    },
247    ResetAllSystemConfiguration,
248    /// Performs updates to the storage usage table, which probably should be a builtin source.
249    ///
250    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
251    /// might not work. If a process crashes after collecting storage usage events
252    /// but before updating the builtin table, then another listening catalog
253    /// will never know to update the builtin table.
254    WeirdStorageUsageUpdates {
255        object_id: Option<String>,
256        size_bytes: u64,
257        collection_timestamp: EpochMillis,
258    },
259    /// Injects audit events into the catalog.
260    ///
261    /// This is a nonstandard path used for manually appending audit events at the current time.
262    /// Mainly useful for correcting the audit log in the face of bugs that made us forget to emit
263    /// audit events.
264    InjectAuditEvents {
265        events: Vec<InjectedAuditEvent>,
266    },
267}
268
269/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
270/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
271/// the `Op::DropObjects`.
272#[derive(Debug, Clone)]
273pub enum DropObjectInfo {
274    Cluster(ClusterId),
275    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
276    Database(DatabaseId),
277    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
278    Role(RoleId),
279    Item(CatalogItemId),
280    NetworkPolicy(NetworkPolicyId),
281}
282
283impl DropObjectInfo {
284    /// Creates a `DropObjectInfo` from an `ObjectId`.
285    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
286    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
287        match id {
288            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
289            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
290                cluster_id,
291                replica_id,
292                ReplicaCreateDropReason::Manual,
293            )),
294            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
295            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
296            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
297            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
298            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
299        }
300    }
301
302    /// Creates an `ObjectId` from a `DropObjectInfo`.
303    /// Loses the `ReplicaCreateDropReason` if there is one!
304    fn to_object_id(&self) -> ObjectId {
305        match &self {
306            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
307            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
308                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
309            }
310            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
311            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
312            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
313            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
314            DropObjectInfo::NetworkPolicy(network_policy_id) => {
315                ObjectId::NetworkPolicy(network_policy_id.clone())
316            }
317        }
318    }
319}
320
321/// The reason for creating or dropping a replica.
322#[derive(Debug, Clone)]
323pub enum ReplicaCreateDropReason {
324    /// The user initiated the replica create or drop, e.g., by
325    /// - creating/dropping a cluster,
326    /// - ALTERing various options on a managed cluster,
327    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
328    Manual,
329    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
330    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
331    ClusterScheduling(Vec<SchedulingDecision>),
332}
333
334impl ReplicaCreateDropReason {
335    pub fn into_audit_log(
336        self,
337    ) -> (
338        CreateOrDropClusterReplicaReasonV1,
339        Option<SchedulingDecisionsWithReasonsV2>,
340    ) {
341        let (reason, scheduling_policies) = match self {
342            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
343            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
344                CreateOrDropClusterReplicaReasonV1::Schedule,
345                Some(scheduling_decisions),
346            ),
347        };
348        (
349            reason,
350            scheduling_policies
351                .as_ref()
352                .map(SchedulingDecision::reasons_to_audit_log_reasons),
353        )
354    }
355}
356
357pub struct TransactionResult {
358    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
359    /// Parsed catalog updates from which we will derive catalog implications.
360    pub catalog_updates: Vec<ParsedStateUpdate>,
361    pub audit_events: Vec<VersionedEvent>,
362}
363
364#[derive(Debug, Clone, Copy)]
365enum TransactInnerMode {
366    /// Prepare storage state and return a commit-ready transaction state.
367    ///
368    /// This mode is used by durable catalog transactions.
369    Commit,
370    /// Execute a dry run against a durable transaction that will not be
371    /// committed.
372    ///
373    /// This mode still validates and applies updates to an in-memory
374    /// `CatalogState`, but it must not call `prepare_state` because dry runs
375    /// must not trigger controller side effects.
376    DryRun,
377}
378
379impl Catalog {
380    fn should_audit_log_item(item: &CatalogItem) -> bool {
381        !item.is_temporary()
382    }
383
384    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
385    /// within a connection id.
386    fn temporary_ids(
387        &self,
388        ops: &[Op],
389        temporary_drops: BTreeSet<(&ConnectionId, String)>,
390    ) -> Result<BTreeSet<CatalogItemId>, Error> {
391        let mut creating = BTreeSet::new();
392        let mut temporary_ids = BTreeSet::new();
393        for op in ops.iter() {
394            if let Op::CreateItem {
395                id,
396                name,
397                item,
398                owner_id: _,
399            } = op
400            {
401                if let Some(conn_id) = item.conn_id() {
402                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
403                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
404                        || creating.contains(&(conn_id, &name.item))
405                    {
406                        return Err(
407                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
408                        );
409                    } else {
410                        creating.insert((conn_id, &name.item));
411                        temporary_ids.insert(id.clone());
412                    }
413                }
414            }
415        }
416        Ok(temporary_ids)
417    }
418
419    #[instrument(name = "catalog::transact")]
420    pub async fn transact(
421        &mut self,
422        // n.b. this is an option to prevent us from needing to build out a
423        // dummy impl of `StorageController` for tests.
424        storage_collections: Option<
425            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
426        >,
427        oracle_write_ts: mz_repr::Timestamp,
428        session: Option<&ConnMeta>,
429        ops: Vec<Op>,
430    ) -> Result<TransactionResult, AdapterError> {
431        trace!("transact: {:?}", ops);
432        fail::fail_point!("catalog_transact", |arg| {
433            Err(AdapterError::Unstructured(anyhow::anyhow!(
434                "failpoint: {arg:?}"
435            )))
436        });
437
438        let drop_ids: BTreeSet<CatalogItemId> = ops
439            .iter()
440            .filter_map(|op| match op {
441                Op::DropObjects(drop_object_infos) => {
442                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
443                    let item_ids = ids.filter_map(|id| match id {
444                        ObjectId::Item(id) => Some(id),
445                        _ => None,
446                    });
447                    Some(item_ids)
448                }
449                _ => None,
450            })
451            .flatten()
452            .collect();
453        let temporary_drops = drop_ids
454            .iter()
455            .filter_map(|id| {
456                let entry = self.get_entry(id);
457                match entry.item().conn_id() {
458                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
459                    None => None,
460                }
461            })
462            .collect();
463        let dropped_global_ids = drop_ids
464            .iter()
465            .flat_map(|item_id| self.get_global_ids(item_id))
466            .collect();
467
468        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
469        let mut builtin_table_updates = vec![];
470        let mut catalog_updates = vec![];
471        let mut audit_events = vec![];
472        let mut storage = self.storage().await;
473        let mut tx = storage
474            .transaction()
475            .await
476            .unwrap_or_terminate("starting catalog transaction");
477
478        let new_state = Self::transact_inner(
479            TransactInnerMode::Commit,
480            storage_collections,
481            oracle_write_ts,
482            session,
483            ops,
484            temporary_ids,
485            &mut builtin_table_updates,
486            &mut catalog_updates,
487            &mut audit_events,
488            &mut tx,
489            &self.state,
490        )
491        .await?;
492
493        // The user closure was successful, apply the updates. Terminate the
494        // process if this fails, because we have to restart envd due to
495        // indeterminate catalog state, which we only reconcile during catalog
496        // init.
497        tx.commit(oracle_write_ts)
498            .await
499            .unwrap_or_terminate("catalog storage transaction commit must succeed");
500
501        // Dropping here keeps the mutable borrow on self, preventing us accidentally
502        // mutating anything until after f is executed.
503        drop(storage);
504        if let Some(new_state) = new_state {
505            self.transient_revision += 1;
506            self.state = new_state;
507        }
508
509        // Drop in-memory planning metadata.
510        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
511        if self.state.system_config().enable_mz_notices() {
512            // Generate retractions for the Builtin tables.
513            self.state().pack_optimizer_notices(
514                &mut builtin_table_updates,
515                dropped_notices.iter(),
516                Diff::MINUS_ONE,
517            );
518        }
519
520        Ok(TransactionResult {
521            builtin_table_updates,
522            catalog_updates,
523            audit_events,
524        })
525    }
526
527    /// Performs an incremental dry-run catalog transaction: processes only the
528    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
529    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
530    ///
531    /// The durable transaction is intentionally never committed and no storage
532    /// controller prepare-state side effects are run.
533    ///
534    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
535    /// snapshot (which represents the tx state after the previous dry run),
536    /// ensuring it starts in sync with `base_state`. If `None` (first
537    /// statement), a fresh transaction is loaded from durable storage.
538    ///
539    /// Returns the new accumulated state and a snapshot of the transaction's
540    /// state for use in subsequent incremental dry runs.
541    pub async fn transact_incremental_dry_run(
542        &self,
543        base_state: &CatalogState,
544        ops: Vec<Op>,
545        session: Option<&ConnMeta>,
546        prev_snapshot: Option<Snapshot>,
547        oracle_write_ts: mz_repr::Timestamp,
548    ) -> Result<(CatalogState, Snapshot), AdapterError> {
549        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
550        // but we still need to check for collisions.
551        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
552
553        let mut builtin_table_updates = vec![];
554        let mut catalog_updates = vec![];
555        let mut audit_events = vec![];
556        let mut storage = self.storage().await;
557        let mut tx = if let Some(snapshot) = prev_snapshot {
558            // Restore transaction from saved snapshot so it starts in sync
559            // with the accumulated CatalogState from previous dry runs.
560            storage
561                .transaction_from_snapshot(snapshot)
562                .unwrap_or_terminate("starting catalog transaction from snapshot")
563        } else {
564            // First statement: fresh transaction from durable storage, which
565            // is in sync with the real catalog state.
566            storage
567                .transaction()
568                .await
569                .unwrap_or_terminate("starting catalog transaction")
570        };
571
572        // Process only the new ops against the accumulated state in dry-run mode.
573        let new_state = Self::transact_inner(
574            TransactInnerMode::DryRun,
575            None,
576            oracle_write_ts,
577            session,
578            ops,
579            temporary_ids,
580            &mut builtin_table_updates,
581            &mut catalog_updates,
582            &mut audit_events,
583            &mut tx,
584            base_state,
585        )
586        .await?;
587
588        // Save the transaction's current state as a snapshot for the next
589        // incremental dry run.
590        let new_snapshot = tx.current_snapshot();
591
592        // Transaction is NOT committed — drop it.
593        drop(storage);
594
595        // transact_inner returns Some(state) when ops produced changes.
596        let state = new_state.unwrap_or_else(|| base_state.clone());
597        Ok((state, new_snapshot))
598    }
599
600    /// Extracts optimized expressions from `Op::CreateItem` operations for views
601    /// and materialized views. These can be used to populate a `LocalExpressionCache`
602    /// to avoid re-optimization during `apply_updates`.
603    fn extract_expressions_from_ops(
604        ops: &[Op],
605        optimizer_features: &OptimizerFeatures,
606    ) -> BTreeMap<GlobalId, LocalExpressions> {
607        let mut exprs = BTreeMap::new();
608
609        for op in ops {
610            if let Op::CreateItem { item, .. } = op {
611                match item {
612                    CatalogItem::View(view) => {
613                        exprs.insert(
614                            view.global_id,
615                            LocalExpressions {
616                                local_mir: (*view.optimized_expr).clone(),
617                                optimizer_features: optimizer_features.clone(),
618                            },
619                        );
620                    }
621                    CatalogItem::MaterializedView(mv) => {
622                        exprs.insert(
623                            mv.global_id_writes(),
624                            LocalExpressions {
625                                local_mir: (*mv.optimized_expr).clone(),
626                                optimizer_features: optimizer_features.clone(),
627                            },
628                        );
629                    }
630                    CatalogItem::Table(_)
631                    | CatalogItem::Source(_)
632                    | CatalogItem::Log(_)
633                    | CatalogItem::Sink(_)
634                    | CatalogItem::Index(_)
635                    | CatalogItem::Type(_)
636                    | CatalogItem::Func(_)
637                    | CatalogItem::Secret(_)
638                    | CatalogItem::Connection(_)
639                    | CatalogItem::ContinualTask(_) => {}
640                }
641            }
642        }
643
644        exprs
645    }
646
647    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
648    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
649    ///
650    /// `mode` controls whether storage prepare-state side effects are allowed.
651    /// In `DryRun` mode, this method may update the in-memory state returned to
652    /// the caller, but it must not trigger controller side effects.
653    ///
654    #[instrument(name = "catalog::transact_inner")]
655    async fn transact_inner(
656        mode: TransactInnerMode,
657        storage_collections: Option<
658            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
659        >,
660        oracle_write_ts: mz_repr::Timestamp,
661        session: Option<&ConnMeta>,
662        ops: Vec<Op>,
663        temporary_ids: BTreeSet<CatalogItemId>,
664        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
665        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
666        audit_events: &mut Vec<VersionedEvent>,
667        tx: &mut Transaction<'_>,
668        state: &CatalogState,
669    ) -> Result<Option<CatalogState>, AdapterError> {
670        // We come up with new catalog state, builtin state updates, and parsed
671        // catalog updates (for deriving catalog implications) in two phases:
672        //
673        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
674        //    one-by-one. This will give us the full list of updates to apply to
675        //    the catalog, which will allow us to apply it in one batch, which
676        //    in turn will allow the apply machinery to consolidate the updates.
677        // 2. We do one final apply call with all updates, which gives us the
678        //    final builtin table updates and parsed catalog updates.
679        //
680        // The reason is that the loop that is working off ops first does a
681        // transact_op to derive the state updates for that op, and then calls
682        // apply_updates on the catalog state. And successive ops might expect
683        // the catalog state to reflect the modified state _after_ applying
684        // previous ops.
685        //
686        // We want to, however, have one final apply_state that takes all the
687        // accumulated updates to derive the required controller updates and the
688        // builtin table updates.
689        //
690        // We won't win any DDL throughput benchmarks, but so far that's not
691        // what we're optimizing for and there would probably be other
692        // bottlenecks before we hit this one as a bottleneck.
693        //
694        // We could work around this by refactoring how the interplay of
695        // transact_op and apply_updates works, but that's a larger undertaking.
696        let mut preliminary_state = Cow::Borrowed(state);
697
698        // The final state that we will return, if modified.
699        let mut state = Cow::Borrowed(state);
700
701        if ops.is_empty() {
702            return Ok(None);
703        }
704
705        // Extract optimized expressions from CreateItem ops to avoid re-optimization
706        // during apply_updates. We extract before the loop since `ops` is moved there.
707        let optimizer_features = OptimizerFeatures::from(state.system_config());
708        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
709
710        let mut storage_collections_to_create = BTreeSet::new();
711        let mut storage_collections_to_drop = BTreeSet::new();
712        let mut storage_collections_to_register = BTreeMap::new();
713
714        let mut updates = Vec::new();
715
716        for op in ops {
717            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
718                oracle_write_ts,
719                session,
720                op,
721                &temporary_ids,
722                audit_events,
723                tx,
724                &*preliminary_state,
725                &mut storage_collections_to_create,
726                &mut storage_collections_to_drop,
727                &mut storage_collections_to_register,
728            )
729            .await?;
730
731            // Certain builtin tables are not derived from the durable catalog state, so they need
732            // to be updated ad-hoc based on the current transaction. This is weird and will not
733            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
734            // this instance crashes after committing the durable catalog but before applying the
735            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
736            // world. Currently, this works fine and there are no correctness issues because
737            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
738            // state of all builtin tables to the correct values.
739            if let Some(builtin_table_update) = weird_builtin_table_update {
740                builtin_table_updates.push(builtin_table_update);
741            }
742
743            // Temporary items are not stored in the durable catalog, so they need to be handled
744            // separately for updating state and builtin tables.
745            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
746            // in a multi-subscriber catalog world.
747            let upper = tx.upper();
748            let temporary_item_updates =
749                temporary_item_updates
750                    .into_iter()
751                    .map(|(item, diff)| StateUpdate {
752                        kind: StateUpdateKind::TemporaryItem(item),
753                        ts: upper,
754                        diff,
755                    });
756
757            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
758            op_updates.extend(temporary_item_updates);
759            if !op_updates.is_empty() {
760                // Clone the cache so each apply_updates call has access to cached expressions.
761                // The cache uses `remove` semantics, so we need a fresh clone for each call.
762                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
763                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
764                    .to_mut()
765                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
766                    .await;
767            }
768            updates.append(&mut op_updates);
769        }
770
771        if !updates.is_empty() {
772            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
773            let (op_builtin_table_updates, op_catalog_updates) = state
774                .to_mut()
775                .apply_updates(updates.clone(), &mut local_expr_cache)
776                .await;
777            let op_builtin_table_updates = state
778                .to_mut()
779                .resolve_builtin_table_updates(op_builtin_table_updates);
780            builtin_table_updates.extend(op_builtin_table_updates);
781            parsed_catalog_updates.extend(op_catalog_updates);
782        }
783
784        match mode {
785            TransactInnerMode::Commit => {
786                // `storage_collections` can be `None` in tests.
787                if let Some(c) = storage_collections {
788                    c.prepare_state(
789                        tx,
790                        storage_collections_to_create,
791                        storage_collections_to_drop,
792                        storage_collections_to_register,
793                    )
794                    .await?;
795                }
796            }
797            TransactInnerMode::DryRun => {
798                debug_assert!(
799                    storage_collections.is_none(),
800                    "dry-run mode must not prepare storage state"
801                );
802            }
803        }
804
805        let updates = tx.get_and_commit_op_updates();
806        if !updates.is_empty() {
807            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
808            let (op_builtin_table_updates, op_catalog_updates) = state
809                .to_mut()
810                .apply_updates(updates.clone(), &mut local_expr_cache)
811                .await;
812            let op_builtin_table_updates = state
813                .to_mut()
814                .resolve_builtin_table_updates(op_builtin_table_updates);
815            builtin_table_updates.extend(op_builtin_table_updates);
816            parsed_catalog_updates.extend(op_catalog_updates);
817        }
818
819        match state {
820            Cow::Owned(state) => Ok(Some(state)),
821            Cow::Borrowed(_) => Ok(None),
822        }
823    }
824
825    /// Performs the transaction operation described by `op`. This function prepares the changes in
826    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
827    /// changes.
828    ///
829    /// Optionally returns a builtin table update for any builtin table updates than cannot be
830    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
831    /// scenarios and ideally in the future don't exist.
832    #[instrument]
833    async fn transact_op(
834        oracle_write_ts: mz_repr::Timestamp,
835        session: Option<&ConnMeta>,
836        op: Op,
837        temporary_ids: &BTreeSet<CatalogItemId>,
838        audit_events: &mut Vec<VersionedEvent>,
839        tx: &mut Transaction<'_>,
840        state: &CatalogState,
841        storage_collections_to_create: &mut BTreeSet<GlobalId>,
842        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
843        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
844    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
845        let mut weird_builtin_table_update = None;
846        let mut temporary_item_updates = Vec::new();
847
848        match op {
849            Op::AlterRetainHistory { id, value, window } => {
850                let entry = state.get_entry(&id);
851                if id.is_system() {
852                    let name = entry.name();
853                    let full_name =
854                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
855                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
856                        full_name.to_string(),
857                    ))));
858                }
859
860                let mut new_entry = entry.clone();
861                let previous = new_entry
862                    .item
863                    .update_retain_history(value.clone(), window)
864                    .map_err(|_| {
865                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
866                            "planner should have rejected invalid alter retain history item type"
867                                .to_string(),
868                        )))
869                    })?;
870
871                if Self::should_audit_log_item(new_entry.item()) {
872                    let details =
873                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
874                            id: id.to_string(),
875                            old_history: previous.map(|previous| previous.to_string()),
876                            new_history: value.map(|v| v.to_string()),
877                        });
878                    CatalogState::add_to_audit_log(
879                        &state.system_configuration,
880                        oracle_write_ts,
881                        session,
882                        tx,
883                        audit_events,
884                        EventType::Alter,
885                        catalog_type_to_audit_object_type(new_entry.item().typ()),
886                        details,
887                    )?;
888                }
889
890                tx.update_item(id, new_entry.into())?;
891
892                Self::log_update(state, &id);
893            }
894            Op::AlterSourceTimestampInterval {
895                id,
896                value,
897                interval,
898            } => {
899                let entry = state.get_entry(&id);
900                if id.is_system() {
901                    let name = entry.name();
902                    let full_name =
903                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
904                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
905                        full_name.to_string(),
906                    ))));
907                }
908
909                let mut new_entry = entry.clone();
910                new_entry
911                    .item
912                    .update_timestamp_interval(value, interval)
913                    .map_err(|_| {
914                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
915                            "planner should have rejected invalid alter timestamp interval item type"
916                                .to_string(),
917                        )))
918                    })?;
919
920                tx.update_item(id, new_entry.into())?;
921
922                Self::log_update(state, &id);
923            }
924            Op::AlterRole {
925                id,
926                name,
927                attributes,
928                nopassword,
929                vars,
930            } => {
931                state.ensure_not_reserved_role(&id)?;
932
933                let mut existing_role = state.get_role(&id).clone();
934                let password = attributes.password.clone();
935                let scram_iterations = attributes
936                    .scram_iterations
937                    .unwrap_or_else(|| state.system_config().scram_iterations());
938                existing_role.attributes = attributes.into();
939                existing_role.vars = vars;
940                let password_action = if nopassword {
941                    PasswordAction::Clear
942                } else if let Some(password) = password {
943                    PasswordAction::Set(PasswordConfig {
944                        password,
945                        scram_iterations,
946                    })
947                } else {
948                    PasswordAction::NoChange
949                };
950                tx.update_role(id, existing_role.into(), password_action)?;
951
952                CatalogState::add_to_audit_log(
953                    &state.system_configuration,
954                    oracle_write_ts,
955                    session,
956                    tx,
957                    audit_events,
958                    EventType::Alter,
959                    ObjectType::Role,
960                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
961                        id: id.to_string(),
962                        name: name.clone(),
963                    }),
964                )?;
965
966                info!("update role {name} ({id})");
967            }
968            Op::AlterNetworkPolicy {
969                id,
970                rules,
971                name,
972                owner_id: _owner_id,
973            } => {
974                let existing_policy = state.get_network_policy(&id).clone();
975                let mut policy: NetworkPolicy = existing_policy.into();
976                policy.rules = rules;
977                if is_reserved_name(&name) {
978                    return Err(AdapterError::Catalog(Error::new(
979                        ErrorKind::ReservedNetworkPolicyName(name),
980                    )));
981                }
982                tx.update_network_policy(id, policy.clone())?;
983
984                CatalogState::add_to_audit_log(
985                    &state.system_configuration,
986                    oracle_write_ts,
987                    session,
988                    tx,
989                    audit_events,
990                    EventType::Alter,
991                    ObjectType::NetworkPolicy,
992                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
993                        id: id.to_string(),
994                        name: name.clone(),
995                    }),
996                )?;
997
998                info!("update network policy {name} ({id})");
999            }
1000            Op::AlterAddColumn {
1001                id,
1002                new_global_id,
1003                name,
1004                typ,
1005                sql,
1006            } => {
1007                let mut new_entry = state.get_entry(&id).clone();
1008                let version = new_entry.item.add_column(name, typ, sql)?;
1009                // All versions of a table share the same shard, so it shouldn't matter what
1010                // GlobalId we use here.
1011                let shard_id = state
1012                    .storage_metadata()
1013                    .get_collection_shard(new_entry.latest_global_id())?;
1014
1015                // TODO(alter_table): Support adding columns to sources.
1016                let CatalogItem::Table(table) = &mut new_entry.item else {
1017                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
1018                };
1019                table.collections.insert(version, new_global_id);
1020
1021                tx.update_item(id, new_entry.into())?;
1022                storage_collections_to_register.insert(new_global_id, shard_id);
1023            }
1024            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1025                let mut new_entry = state.get_entry(&id).clone();
1026                let replacement = state.get_entry(&replacement_id);
1027
1028                let new_audit_events =
1029                    apply_replacement_audit_events(state, &new_entry, replacement);
1030
1031                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1032                    return Err(AdapterError::internal(
1033                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1034                        "id must refer to a materialized view",
1035                    ));
1036                };
1037                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1038                    return Err(AdapterError::internal(
1039                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1040                        "replacement_id must refer to a materialized view",
1041                    ));
1042                };
1043
1044                mv.apply_replacement(replacement_mv.clone());
1045
1046                tx.remove_item(replacement_id)?;
1047
1048                new_entry.id = replacement_id;
1049                tx_replace_item(tx, state, id, new_entry)?;
1050
1051                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1052                tx.drop_comments(&[comment_id].into())?;
1053
1054                for (event_type, details) in new_audit_events {
1055                    CatalogState::add_to_audit_log(
1056                        &state.system_configuration,
1057                        oracle_write_ts,
1058                        session,
1059                        tx,
1060                        audit_events,
1061                        event_type,
1062                        ObjectType::MaterializedView,
1063                        details,
1064                    )?;
1065                }
1066            }
1067            Op::CreateDatabase { name, owner_id } => {
1068                let database_owner_privileges = vec![rbac::owner_privilege(
1069                    mz_sql::catalog::ObjectType::Database,
1070                    owner_id,
1071                )];
1072                let database_default_privileges = state
1073                    .default_privileges
1074                    .get_applicable_privileges(
1075                        owner_id,
1076                        None,
1077                        None,
1078                        mz_sql::catalog::ObjectType::Database,
1079                    )
1080                    .map(|item| item.mz_acl_item(owner_id));
1081                let database_privileges: Vec<_> = merge_mz_acl_items(
1082                    database_owner_privileges
1083                        .into_iter()
1084                        .chain(database_default_privileges),
1085                )
1086                .collect();
1087
1088                let schema_owner_privileges = vec![rbac::owner_privilege(
1089                    mz_sql::catalog::ObjectType::Schema,
1090                    owner_id,
1091                )];
1092                let schema_default_privileges = state
1093                    .default_privileges
1094                    .get_applicable_privileges(
1095                        owner_id,
1096                        None,
1097                        None,
1098                        mz_sql::catalog::ObjectType::Schema,
1099                    )
1100                    .map(|item| item.mz_acl_item(owner_id))
1101                    // Special default privilege on public schemas.
1102                    .chain(std::iter::once(MzAclItem {
1103                        grantee: RoleId::Public,
1104                        grantor: owner_id,
1105                        acl_mode: AclMode::USAGE,
1106                    }));
1107                let schema_privileges: Vec<_> = merge_mz_acl_items(
1108                    schema_owner_privileges
1109                        .into_iter()
1110                        .chain(schema_default_privileges),
1111                )
1112                .collect();
1113
1114                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1115                let (database_id, _) = tx.insert_user_database(
1116                    &name,
1117                    owner_id,
1118                    database_privileges.clone(),
1119                    &temporary_oids,
1120                )?;
1121                let (schema_id, _) = tx.insert_user_schema(
1122                    database_id,
1123                    DEFAULT_SCHEMA,
1124                    owner_id,
1125                    schema_privileges.clone(),
1126                    &temporary_oids,
1127                )?;
1128                CatalogState::add_to_audit_log(
1129                    &state.system_configuration,
1130                    oracle_write_ts,
1131                    session,
1132                    tx,
1133                    audit_events,
1134                    EventType::Create,
1135                    ObjectType::Database,
1136                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1137                        id: database_id.to_string(),
1138                        name: name.clone(),
1139                    }),
1140                )?;
1141                info!("create database {}", name);
1142
1143                CatalogState::add_to_audit_log(
1144                    &state.system_configuration,
1145                    oracle_write_ts,
1146                    session,
1147                    tx,
1148                    audit_events,
1149                    EventType::Create,
1150                    ObjectType::Schema,
1151                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1152                        id: schema_id.to_string(),
1153                        name: DEFAULT_SCHEMA.to_string(),
1154                        database_name: Some(name),
1155                    }),
1156                )?;
1157            }
1158            Op::CreateSchema {
1159                database_id,
1160                schema_name,
1161                owner_id,
1162            } => {
1163                if is_reserved_name(&schema_name) {
1164                    return Err(AdapterError::Catalog(Error::new(
1165                        ErrorKind::ReservedSchemaName(schema_name),
1166                    )));
1167                }
1168                let database_id = match database_id {
1169                    ResolvedDatabaseSpecifier::Id(id) => id,
1170                    ResolvedDatabaseSpecifier::Ambient => {
1171                        return Err(AdapterError::Catalog(Error::new(
1172                            ErrorKind::ReadOnlySystemSchema(schema_name),
1173                        )));
1174                    }
1175                };
1176                let owner_privileges = vec![rbac::owner_privilege(
1177                    mz_sql::catalog::ObjectType::Schema,
1178                    owner_id,
1179                )];
1180                let default_privileges = state
1181                    .default_privileges
1182                    .get_applicable_privileges(
1183                        owner_id,
1184                        Some(database_id),
1185                        None,
1186                        mz_sql::catalog::ObjectType::Schema,
1187                    )
1188                    .map(|item| item.mz_acl_item(owner_id));
1189                let privileges: Vec<_> =
1190                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1191                        .collect();
1192                let (schema_id, _) = tx.insert_user_schema(
1193                    database_id,
1194                    &schema_name,
1195                    owner_id,
1196                    privileges.clone(),
1197                    &state.get_temporary_oids().collect(),
1198                )?;
1199                CatalogState::add_to_audit_log(
1200                    &state.system_configuration,
1201                    oracle_write_ts,
1202                    session,
1203                    tx,
1204                    audit_events,
1205                    EventType::Create,
1206                    ObjectType::Schema,
1207                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1208                        id: schema_id.to_string(),
1209                        name: schema_name.clone(),
1210                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1211                    }),
1212                )?;
1213            }
1214            Op::CreateRole { name, attributes } => {
1215                if is_reserved_role_name(&name) {
1216                    return Err(AdapterError::Catalog(Error::new(
1217                        ErrorKind::ReservedRoleName(name),
1218                    )));
1219                }
1220                let membership = RoleMembership::new();
1221                let vars = RoleVars::default();
1222                let (id, _) = tx.insert_user_role(
1223                    name.clone(),
1224                    attributes.clone(),
1225                    membership.clone(),
1226                    vars.clone(),
1227                    &state.get_temporary_oids().collect(),
1228                )?;
1229                CatalogState::add_to_audit_log(
1230                    &state.system_configuration,
1231                    oracle_write_ts,
1232                    session,
1233                    tx,
1234                    audit_events,
1235                    EventType::Create,
1236                    ObjectType::Role,
1237                    EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1238                        id: id.to_string(),
1239                        name: name.clone(),
1240                        auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1241                            AutoProvisionSource::Oidc => "oidc".to_string(),
1242                            AutoProvisionSource::Frontegg => "frontegg".to_string(),
1243                            AutoProvisionSource::None => "none".to_string(),
1244                        }),
1245                    }),
1246                )?;
1247                info!("create role {}", name);
1248            }
1249            Op::CreateCluster {
1250                id,
1251                name,
1252                introspection_sources,
1253                owner_id,
1254                config,
1255            } => {
1256                if is_reserved_name(&name) {
1257                    return Err(AdapterError::Catalog(Error::new(
1258                        ErrorKind::ReservedClusterName(name),
1259                    )));
1260                }
1261                let owner_privileges = vec![rbac::owner_privilege(
1262                    mz_sql::catalog::ObjectType::Cluster,
1263                    owner_id,
1264                )];
1265                let default_privileges = state
1266                    .default_privileges
1267                    .get_applicable_privileges(
1268                        owner_id,
1269                        None,
1270                        None,
1271                        mz_sql::catalog::ObjectType::Cluster,
1272                    )
1273                    .map(|item| item.mz_acl_item(owner_id));
1274                let privileges: Vec<_> =
1275                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1276                        .collect();
1277                let introspection_source_ids: Vec<_> = introspection_sources
1278                    .iter()
1279                    .map(|introspection_source| {
1280                        Transaction::allocate_introspection_source_index_id(
1281                            &id,
1282                            introspection_source.variant,
1283                        )
1284                    })
1285                    .collect();
1286
1287                let introspection_sources = introspection_sources
1288                    .into_iter()
1289                    .zip_eq(introspection_source_ids)
1290                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1291                    .collect();
1292
1293                tx.insert_user_cluster(
1294                    id,
1295                    &name,
1296                    introspection_sources,
1297                    owner_id,
1298                    privileges.clone(),
1299                    config.clone().into(),
1300                    &state.get_temporary_oids().collect(),
1301                )?;
1302                CatalogState::add_to_audit_log(
1303                    &state.system_configuration,
1304                    oracle_write_ts,
1305                    session,
1306                    tx,
1307                    audit_events,
1308                    EventType::Create,
1309                    ObjectType::Cluster,
1310                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1311                        id: id.to_string(),
1312                        name: name.clone(),
1313                    }),
1314                )?;
1315                info!("create cluster {}", name);
1316            }
1317            Op::CreateClusterReplica {
1318                cluster_id,
1319                name,
1320                config,
1321                owner_id,
1322                reason,
1323            } => {
1324                if is_reserved_name(&name) {
1325                    return Err(AdapterError::Catalog(Error::new(
1326                        ErrorKind::ReservedReplicaName(name),
1327                    )));
1328                }
1329                let cluster = state.get_cluster(cluster_id);
1330                let id =
1331                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1332                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1333                    size,
1334                    billed_as,
1335                    internal,
1336                    ..
1337                }) = &config.location
1338                {
1339                    let (reason, scheduling_policies) = reason.into_audit_log();
1340                    let details = EventDetails::CreateClusterReplicaV4(
1341                        mz_audit_log::CreateClusterReplicaV4 {
1342                            cluster_id: cluster_id.to_string(),
1343                            cluster_name: cluster.name.clone(),
1344                            replica_id: Some(id.to_string()),
1345                            replica_name: name.clone(),
1346                            logical_size: size.clone(),
1347                            billed_as: billed_as.clone(),
1348                            internal: *internal,
1349                            reason,
1350                            scheduling_policies,
1351                        },
1352                    );
1353                    CatalogState::add_to_audit_log(
1354                        &state.system_configuration,
1355                        oracle_write_ts,
1356                        session,
1357                        tx,
1358                        audit_events,
1359                        EventType::Create,
1360                        ObjectType::ClusterReplica,
1361                        details,
1362                    )?;
1363                }
1364            }
1365            Op::CreateItem {
1366                id,
1367                name,
1368                item,
1369                owner_id,
1370            } => {
1371                state.check_unstable_dependencies(&item)?;
1372
1373                match &item {
1374                    CatalogItem::Table(table) => {
1375                        let gids: Vec<_> = table.global_ids().collect();
1376                        assert_eq!(gids.len(), 1);
1377                        storage_collections_to_create.extend(gids);
1378                    }
1379                    CatalogItem::Source(source) => {
1380                        storage_collections_to_create.insert(source.global_id());
1381                    }
1382                    CatalogItem::MaterializedView(mv) => {
1383                        let mv_gid = mv.global_id_writes();
1384                        if let Some(target_id) = mv.replacement_target {
1385                            let target_gid = state.get_entry(&target_id).latest_global_id();
1386                            let shard_id =
1387                                state.storage_metadata().get_collection_shard(target_gid)?;
1388                            storage_collections_to_register.insert(mv_gid, shard_id);
1389                        } else {
1390                            storage_collections_to_create.insert(mv_gid);
1391                        }
1392                    }
1393                    CatalogItem::ContinualTask(ct) => {
1394                        storage_collections_to_create.insert(ct.global_id());
1395                    }
1396                    CatalogItem::Sink(sink) => {
1397                        storage_collections_to_create.insert(sink.global_id());
1398                    }
1399                    CatalogItem::Log(_)
1400                    | CatalogItem::View(_)
1401                    | CatalogItem::Index(_)
1402                    | CatalogItem::Type(_)
1403                    | CatalogItem::Func(_)
1404                    | CatalogItem::Secret(_)
1405                    | CatalogItem::Connection(_) => (),
1406                }
1407
1408                let system_user = session.map_or(false, |s| s.user().is_system_user());
1409                if !system_user {
1410                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1411                        let cluster_name = state.clusters_by_id[&id].name.clone();
1412                        return Err(AdapterError::Catalog(Error::new(
1413                            ErrorKind::ReadOnlyCluster(cluster_name),
1414                        )));
1415                    }
1416                }
1417
1418                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1419                let default_privileges = state
1420                    .default_privileges
1421                    .get_applicable_privileges(
1422                        owner_id,
1423                        name.qualifiers.database_spec.id(),
1424                        Some(name.qualifiers.schema_spec.into()),
1425                        item.typ().into(),
1426                    )
1427                    .map(|item| item.mz_acl_item(owner_id));
1428                // mz_support can read all progress sources.
1429                let progress_source_privilege = if item.is_progress_source() {
1430                    Some(MzAclItem {
1431                        grantee: MZ_SUPPORT_ROLE_ID,
1432                        grantor: owner_id,
1433                        acl_mode: AclMode::SELECT,
1434                    })
1435                } else {
1436                    None
1437                };
1438                let privileges: Vec<_> = merge_mz_acl_items(
1439                    owner_privileges
1440                        .into_iter()
1441                        .chain(default_privileges)
1442                        .chain(progress_source_privilege),
1443                )
1444                .collect();
1445
1446                let temporary_oids = state.get_temporary_oids().collect();
1447
1448                if item.is_temporary() {
1449                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1450                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1451                    {
1452                        return Err(AdapterError::Catalog(Error::new(
1453                            ErrorKind::InvalidTemporarySchema,
1454                        )));
1455                    }
1456                    let oid = tx.allocate_oid(&temporary_oids)?;
1457
1458                    let schema_id = name.qualifiers.schema_spec.clone().into();
1459                    let item_type = item.typ();
1460                    let (create_sql, global_id, versions) = item.to_serialized();
1461
1462                    let item = TemporaryItem {
1463                        id,
1464                        oid,
1465                        global_id,
1466                        schema_id,
1467                        name: name.item.clone(),
1468                        create_sql,
1469                        conn_id: item.conn_id().cloned(),
1470                        owner_id,
1471                        privileges: privileges.clone(),
1472                        extra_versions: versions,
1473                    };
1474                    temporary_item_updates.push((item, StateDiff::Addition));
1475
1476                    info!(
1477                        "create temporary {} {} ({})",
1478                        item_type,
1479                        state.resolve_full_name(&name, None),
1480                        id
1481                    );
1482                } else {
1483                    if let Some(temp_id) =
1484                        item.uses()
1485                            .iter()
1486                            .find(|id| match state.try_get_entry(*id) {
1487                                Some(entry) => entry.item().is_temporary(),
1488                                None => temporary_ids.contains(id),
1489                            })
1490                    {
1491                        let temp_item = state.get_entry(temp_id);
1492                        return Err(AdapterError::Catalog(Error::new(
1493                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1494                        )));
1495                    }
1496                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1497                        && !system_user
1498                    {
1499                        let schema_name = state
1500                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1501                            .schema;
1502                        return Err(AdapterError::Catalog(Error::new(
1503                            ErrorKind::ReadOnlySystemSchema(schema_name),
1504                        )));
1505                    }
1506                    let schema_id = name.qualifiers.schema_spec.clone().into();
1507                    let item_type = item.typ();
1508                    let (create_sql, global_id, versions) = item.to_serialized();
1509                    tx.insert_user_item(
1510                        id,
1511                        global_id,
1512                        schema_id,
1513                        &name.item,
1514                        create_sql,
1515                        owner_id,
1516                        privileges.clone(),
1517                        &temporary_oids,
1518                        versions,
1519                    )?;
1520                    info!(
1521                        "create {} {} ({})",
1522                        item_type,
1523                        state.resolve_full_name(&name, None),
1524                        id
1525                    );
1526                }
1527
1528                if Self::should_audit_log_item(&item) {
1529                    let name = Self::full_name_detail(
1530                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1531                    );
1532                    let details = match &item {
1533                        CatalogItem::Source(s) => {
1534                            let cluster_id = match s.data_source {
1535                                // Ingestion exports don't have their own cluster, but
1536                                // run on their ingestion's cluster.
1537                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1538                                    match state.get_entry(&ingestion_id).cluster_id() {
1539                                        Some(cluster_id) => Some(cluster_id.to_string()),
1540                                        None => None,
1541                                    }
1542                                }
1543                                _ => match item.cluster_id() {
1544                                    Some(cluster_id) => Some(cluster_id.to_string()),
1545                                    None => None,
1546                                },
1547                            };
1548
1549                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1550                                id: id.to_string(),
1551                                cluster_id,
1552                                name,
1553                                external_type: s.source_type().to_string(),
1554                            })
1555                        }
1556                        CatalogItem::Sink(s) => {
1557                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1558                                id: id.to_string(),
1559                                cluster_id: Some(s.cluster_id.to_string()),
1560                                name,
1561                                external_type: s.sink_type().to_string(),
1562                            })
1563                        }
1564                        CatalogItem::Index(i) => {
1565                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1566                                id: id.to_string(),
1567                                name,
1568                                cluster_id: i.cluster_id.to_string(),
1569                            })
1570                        }
1571                        CatalogItem::MaterializedView(mv) => {
1572                            EventDetails::CreateMaterializedViewV1(
1573                                mz_audit_log::CreateMaterializedViewV1 {
1574                                    id: id.to_string(),
1575                                    name,
1576                                    cluster_id: mv.cluster_id.to_string(),
1577                                    replacement_target_id: mv
1578                                        .replacement_target
1579                                        .map(|id| id.to_string()),
1580                                },
1581                            )
1582                        }
1583                        CatalogItem::Table(_)
1584                        | CatalogItem::Log(_)
1585                        | CatalogItem::View(_)
1586                        | CatalogItem::Type(_)
1587                        | CatalogItem::Func(_)
1588                        | CatalogItem::Secret(_)
1589                        | CatalogItem::Connection(_)
1590                        | CatalogItem::ContinualTask(_) => {
1591                            EventDetails::IdFullNameV1(IdFullNameV1 {
1592                                id: id.to_string(),
1593                                name,
1594                            })
1595                        }
1596                    };
1597                    CatalogState::add_to_audit_log(
1598                        &state.system_configuration,
1599                        oracle_write_ts,
1600                        session,
1601                        tx,
1602                        audit_events,
1603                        EventType::Create,
1604                        catalog_type_to_audit_object_type(item.typ()),
1605                        details,
1606                    )?;
1607                }
1608            }
1609            Op::CreateNetworkPolicy {
1610                rules,
1611                name,
1612                owner_id,
1613            } => {
1614                if state.network_policies_by_name.contains_key(&name) {
1615                    return Err(AdapterError::PlanError(PlanError::Catalog(
1616                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1617                    )));
1618                }
1619                if is_reserved_name(&name) {
1620                    return Err(AdapterError::Catalog(Error::new(
1621                        ErrorKind::ReservedNetworkPolicyName(name),
1622                    )));
1623                }
1624
1625                let owner_privileges = vec![rbac::owner_privilege(
1626                    mz_sql::catalog::ObjectType::NetworkPolicy,
1627                    owner_id,
1628                )];
1629                let default_privileges = state
1630                    .default_privileges
1631                    .get_applicable_privileges(
1632                        owner_id,
1633                        None,
1634                        None,
1635                        mz_sql::catalog::ObjectType::NetworkPolicy,
1636                    )
1637                    .map(|item| item.mz_acl_item(owner_id));
1638                let privileges: Vec<_> =
1639                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1640                        .collect();
1641
1642                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1643                let id = tx.insert_user_network_policy(
1644                    name.clone(),
1645                    rules,
1646                    privileges,
1647                    owner_id,
1648                    &temporary_oids,
1649                )?;
1650
1651                CatalogState::add_to_audit_log(
1652                    &state.system_configuration,
1653                    oracle_write_ts,
1654                    session,
1655                    tx,
1656                    audit_events,
1657                    EventType::Create,
1658                    ObjectType::NetworkPolicy,
1659                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1660                        id: id.to_string(),
1661                        name: name.clone(),
1662                    }),
1663                )?;
1664
1665                info!("created network policy {name} ({id})");
1666            }
1667            Op::Comment {
1668                object_id,
1669                sub_component,
1670                comment,
1671            } => {
1672                tx.update_comment(object_id, sub_component, comment)?;
1673                let entry = state.get_comment_id_entry(&object_id);
1674                let should_log = entry
1675                    .map(|entry| Self::should_audit_log_item(entry.item()))
1676                    // Things that aren't catalog entries can't be temp, so should be logged.
1677                    .unwrap_or(true);
1678                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1679                // comments won't be logged for now.
1680                if let (Some(conn_id), true) =
1681                    (session.map(|session| session.conn_id()), should_log)
1682                {
1683                    CatalogState::add_to_audit_log(
1684                        &state.system_configuration,
1685                        oracle_write_ts,
1686                        session,
1687                        tx,
1688                        audit_events,
1689                        EventType::Comment,
1690                        comment_id_to_audit_object_type(object_id),
1691                        EventDetails::IdNameV1(IdNameV1 {
1692                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1693                            id: format!("{object_id:?}"),
1694                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1695                        }),
1696                    )?;
1697                }
1698            }
1699            Op::UpdateSourceReferences {
1700                source_id,
1701                references,
1702            } => {
1703                tx.update_source_references(
1704                    source_id,
1705                    references
1706                        .references
1707                        .into_iter()
1708                        .map(|reference| reference.into())
1709                        .collect(),
1710                    references.updated_at,
1711                )?;
1712            }
1713            Op::DropObjects(drop_object_infos) => {
1714                // Generate all of the objects that need to get dropped.
1715                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1716
1717                // Drop any associated comments.
1718                tx.drop_comments(&delta.comments)?;
1719
1720                // Drop any items.
1721                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1722                    delta
1723                        .items
1724                        .iter()
1725                        .map(|id| id)
1726                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1727                tx.remove_items(&durable_items_to_drop)?;
1728                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1729                    let entry = state.get_entry(&id);
1730                    (entry.clone().into(), StateDiff::Retraction)
1731                }));
1732
1733                for item_id in delta.items {
1734                    let entry = state.get_entry(&item_id);
1735
1736                    // Drop associated storage collections, unless the dropped item is a
1737                    // replacement, in which case the replacement target owns the storage
1738                    // collection.
1739                    if entry.item().is_storage_collection() && entry.replacement_target().is_none()
1740                    {
1741                        storage_collections_to_drop.extend(entry.global_ids());
1742                    }
1743
1744                    if state.source_references.contains_key(&item_id) {
1745                        tx.remove_source_references(item_id)?;
1746                    }
1747
1748                    if Self::should_audit_log_item(entry.item()) {
1749                        CatalogState::add_to_audit_log(
1750                            &state.system_configuration,
1751                            oracle_write_ts,
1752                            session,
1753                            tx,
1754                            audit_events,
1755                            EventType::Drop,
1756                            catalog_type_to_audit_object_type(entry.item().typ()),
1757                            EventDetails::IdFullNameV1(IdFullNameV1 {
1758                                id: item_id.to_string(),
1759                                name: Self::full_name_detail(&state.resolve_full_name(
1760                                    entry.name(),
1761                                    session.map(|session| session.conn_id()),
1762                                )),
1763                            }),
1764                        )?;
1765                    }
1766                    info!(
1767                        "drop {} {} ({})",
1768                        entry.item_type(),
1769                        state.resolve_full_name(entry.name(), entry.conn_id()),
1770                        item_id
1771                    );
1772                }
1773
1774                // Drop any schemas.
1775                let schemas = delta
1776                    .schemas
1777                    .iter()
1778                    .map(|(schema_spec, database_spec)| {
1779                        (SchemaId::from(schema_spec), *database_spec)
1780                    })
1781                    .collect();
1782                tx.remove_schemas(&schemas)?;
1783
1784                for (schema_spec, database_spec) in delta.schemas {
1785                    let schema = state.get_schema(
1786                        &database_spec,
1787                        &schema_spec,
1788                        session
1789                            .map(|session| session.conn_id())
1790                            .unwrap_or(&SYSTEM_CONN_ID),
1791                    );
1792
1793                    let schema_id = SchemaId::from(schema_spec);
1794                    let database_id = match database_spec {
1795                        ResolvedDatabaseSpecifier::Ambient => None,
1796                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1797                    };
1798
1799                    CatalogState::add_to_audit_log(
1800                        &state.system_configuration,
1801                        oracle_write_ts,
1802                        session,
1803                        tx,
1804                        audit_events,
1805                        EventType::Drop,
1806                        ObjectType::Schema,
1807                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1808                            id: schema_id.to_string(),
1809                            name: schema.name.schema.to_string(),
1810                            database_name: database_id
1811                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1812                        }),
1813                    )?;
1814                }
1815
1816                // Drop any databases.
1817                tx.remove_databases(&delta.databases)?;
1818
1819                for database_id in delta.databases {
1820                    let database = state.get_database(&database_id).clone();
1821
1822                    CatalogState::add_to_audit_log(
1823                        &state.system_configuration,
1824                        oracle_write_ts,
1825                        session,
1826                        tx,
1827                        audit_events,
1828                        EventType::Drop,
1829                        ObjectType::Database,
1830                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1831                            id: database_id.to_string(),
1832                            name: database.name.clone(),
1833                        }),
1834                    )?;
1835                }
1836
1837                // Drop any roles.
1838                tx.remove_user_roles(&delta.roles)?;
1839
1840                for role_id in delta.roles {
1841                    let role = state
1842                        .roles_by_id
1843                        .get(&role_id)
1844                        .expect("catalog out of sync");
1845
1846                    CatalogState::add_to_audit_log(
1847                        &state.system_configuration,
1848                        oracle_write_ts,
1849                        session,
1850                        tx,
1851                        audit_events,
1852                        EventType::Drop,
1853                        ObjectType::Role,
1854                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1855                            id: role.id.to_string(),
1856                            name: role.name.clone(),
1857                        }),
1858                    )?;
1859                    info!("drop role {}", role.name());
1860                }
1861
1862                // Drop any network policies.
1863                tx.remove_network_policies(&delta.network_policies)?;
1864
1865                for network_policy_id in delta.network_policies {
1866                    let policy = state
1867                        .network_policies_by_id
1868                        .get(&network_policy_id)
1869                        .expect("catalog out of sync");
1870
1871                    CatalogState::add_to_audit_log(
1872                        &state.system_configuration,
1873                        oracle_write_ts,
1874                        session,
1875                        tx,
1876                        audit_events,
1877                        EventType::Drop,
1878                        ObjectType::NetworkPolicy,
1879                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1880                            id: policy.id.to_string(),
1881                            name: policy.name.clone(),
1882                        }),
1883                    )?;
1884                    info!("drop network policy {}", policy.name.clone());
1885                }
1886
1887                // Drop any replicas.
1888                let replicas = delta.replicas.keys().copied().collect();
1889                tx.remove_cluster_replicas(&replicas)?;
1890
1891                for (replica_id, (cluster_id, reason)) in delta.replicas {
1892                    let cluster = state.get_cluster(cluster_id);
1893                    let replica = cluster.replica(replica_id).expect("Must exist");
1894
1895                    let (reason, scheduling_policies) = reason.into_audit_log();
1896                    let details =
1897                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1898                            cluster_id: cluster_id.to_string(),
1899                            cluster_name: cluster.name.clone(),
1900                            replica_id: Some(replica_id.to_string()),
1901                            replica_name: replica.name.clone(),
1902                            reason,
1903                            scheduling_policies,
1904                        });
1905                    CatalogState::add_to_audit_log(
1906                        &state.system_configuration,
1907                        oracle_write_ts,
1908                        session,
1909                        tx,
1910                        audit_events,
1911                        EventType::Drop,
1912                        ObjectType::ClusterReplica,
1913                        details,
1914                    )?;
1915                }
1916
1917                // Drop any clusters.
1918                tx.remove_clusters(&delta.clusters)?;
1919
1920                for cluster_id in delta.clusters {
1921                    let cluster = state.get_cluster(cluster_id);
1922
1923                    CatalogState::add_to_audit_log(
1924                        &state.system_configuration,
1925                        oracle_write_ts,
1926                        session,
1927                        tx,
1928                        audit_events,
1929                        EventType::Drop,
1930                        ObjectType::Cluster,
1931                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1932                            id: cluster.id.to_string(),
1933                            name: cluster.name.clone(),
1934                        }),
1935                    )?;
1936                }
1937            }
1938            Op::GrantRole {
1939                role_id,
1940                member_id,
1941                grantor_id,
1942            } => {
1943                state.ensure_not_reserved_role(&member_id)?;
1944                state.ensure_grantable_role(&role_id)?;
1945                if state.collect_role_membership(&role_id).contains(&member_id) {
1946                    let group_role = state.get_role(&role_id);
1947                    let member_role = state.get_role(&member_id);
1948                    return Err(AdapterError::Catalog(Error::new(
1949                        ErrorKind::CircularRoleMembership {
1950                            role_name: group_role.name().to_string(),
1951                            member_name: member_role.name().to_string(),
1952                        },
1953                    )));
1954                }
1955                let mut member_role = state.get_role(&member_id).clone();
1956                member_role.membership.map.insert(role_id, grantor_id);
1957                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1958
1959                CatalogState::add_to_audit_log(
1960                    &state.system_configuration,
1961                    oracle_write_ts,
1962                    session,
1963                    tx,
1964                    audit_events,
1965                    EventType::Grant,
1966                    ObjectType::Role,
1967                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1968                        role_id: role_id.to_string(),
1969                        member_id: member_id.to_string(),
1970                        grantor_id: grantor_id.to_string(),
1971                        executed_by: session
1972                            .map(|session| session.authenticated_role_id())
1973                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1974                            .to_string(),
1975                    }),
1976                )?;
1977            }
1978            Op::RevokeRole {
1979                role_id,
1980                member_id,
1981                grantor_id,
1982            } => {
1983                state.ensure_not_reserved_role(&member_id)?;
1984                state.ensure_grantable_role(&role_id)?;
1985                let mut member_role = state.get_role(&member_id).clone();
1986                member_role.membership.map.remove(&role_id);
1987                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1988
1989                CatalogState::add_to_audit_log(
1990                    &state.system_configuration,
1991                    oracle_write_ts,
1992                    session,
1993                    tx,
1994                    audit_events,
1995                    EventType::Revoke,
1996                    ObjectType::Role,
1997                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1998                        role_id: role_id.to_string(),
1999                        member_id: member_id.to_string(),
2000                        grantor_id: grantor_id.to_string(),
2001                        executed_by: session
2002                            .map(|session| session.authenticated_role_id())
2003                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
2004                            .to_string(),
2005                    }),
2006                )?;
2007            }
2008            Op::UpdatePrivilege {
2009                target_id,
2010                privilege,
2011                variant,
2012            } => {
2013                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2014                    UpdatePrivilegeVariant::Grant => {
2015                        privileges.grant(privilege);
2016                    }
2017                    UpdatePrivilegeVariant::Revoke => {
2018                        privileges.revoke(&privilege);
2019                    }
2020                };
2021                match &target_id {
2022                    SystemObjectId::Object(object_id) => match object_id {
2023                        ObjectId::Cluster(id) => {
2024                            let mut cluster = state.get_cluster(*id).clone();
2025                            update_privilege_fn(&mut cluster.privileges);
2026                            tx.update_cluster(*id, cluster.into())?;
2027                        }
2028                        ObjectId::Database(id) => {
2029                            let mut database = state.get_database(id).clone();
2030                            update_privilege_fn(&mut database.privileges);
2031                            tx.update_database(*id, database.into())?;
2032                        }
2033                        ObjectId::NetworkPolicy(id) => {
2034                            let mut policy = state.get_network_policy(id).clone();
2035                            update_privilege_fn(&mut policy.privileges);
2036                            tx.update_network_policy(*id, policy.into())?;
2037                        }
2038                        ObjectId::Schema((database_spec, schema_spec)) => {
2039                            let schema_id = schema_spec.clone().into();
2040                            let mut schema = state
2041                                .get_schema(
2042                                    database_spec,
2043                                    schema_spec,
2044                                    session
2045                                        .map(|session| session.conn_id())
2046                                        .unwrap_or(&SYSTEM_CONN_ID),
2047                                )
2048                                .clone();
2049                            update_privilege_fn(&mut schema.privileges);
2050                            tx.update_schema(schema_id, schema.into())?;
2051                        }
2052                        ObjectId::Item(id) => {
2053                            let entry = state.get_entry(id);
2054                            let mut new_entry = entry.clone();
2055                            update_privilege_fn(&mut new_entry.privileges);
2056                            if !new_entry.item().is_temporary() {
2057                                tx.update_item(*id, new_entry.into())?;
2058                            } else {
2059                                temporary_item_updates
2060                                    .push((entry.clone().into(), StateDiff::Retraction));
2061                                temporary_item_updates
2062                                    .push((new_entry.into(), StateDiff::Addition));
2063                            }
2064                        }
2065                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2066                    },
2067                    SystemObjectId::System => {
2068                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2069                        update_privilege_fn(&mut system_privileges);
2070                        let new_privilege =
2071                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2072                        tx.set_system_privilege(
2073                            privilege.grantee,
2074                            privilege.grantor,
2075                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2076                        )?;
2077                    }
2078                }
2079                let object_type = state.get_system_object_type(&target_id);
2080                let object_id_str = match &target_id {
2081                    SystemObjectId::System => "SYSTEM".to_string(),
2082                    SystemObjectId::Object(id) => id.to_string(),
2083                };
2084                CatalogState::add_to_audit_log(
2085                    &state.system_configuration,
2086                    oracle_write_ts,
2087                    session,
2088                    tx,
2089                    audit_events,
2090                    variant.into(),
2091                    system_object_type_to_audit_object_type(&object_type),
2092                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2093                        object_id: object_id_str,
2094                        grantee_id: privilege.grantee.to_string(),
2095                        grantor_id: privilege.grantor.to_string(),
2096                        privileges: privilege.acl_mode.to_string(),
2097                    }),
2098                )?;
2099            }
2100            Op::UpdateDefaultPrivilege {
2101                privilege_object,
2102                privilege_acl_item,
2103                variant,
2104            } => {
2105                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2106                match variant {
2107                    UpdatePrivilegeVariant::Grant => default_privileges
2108                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2109                    UpdatePrivilegeVariant::Revoke => {
2110                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2111                    }
2112                }
2113                let new_acl_mode = default_privileges
2114                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2115                tx.set_default_privilege(
2116                    privilege_object.role_id,
2117                    privilege_object.database_id,
2118                    privilege_object.schema_id,
2119                    privilege_object.object_type,
2120                    privilege_acl_item.grantee,
2121                    new_acl_mode.cloned(),
2122                )?;
2123                CatalogState::add_to_audit_log(
2124                    &state.system_configuration,
2125                    oracle_write_ts,
2126                    session,
2127                    tx,
2128                    audit_events,
2129                    variant.into(),
2130                    object_type_to_audit_object_type(privilege_object.object_type),
2131                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2132                        role_id: privilege_object.role_id.to_string(),
2133                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2134                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2135                        grantee_id: privilege_acl_item.grantee.to_string(),
2136                        privileges: privilege_acl_item.acl_mode.to_string(),
2137                    }),
2138                )?;
2139            }
2140            Op::RenameCluster {
2141                id,
2142                name,
2143                to_name,
2144                check_reserved_names,
2145            } => {
2146                if id.is_system() {
2147                    return Err(AdapterError::Catalog(Error::new(
2148                        ErrorKind::ReadOnlyCluster(name.clone()),
2149                    )));
2150                }
2151                if check_reserved_names && is_reserved_name(&to_name) {
2152                    return Err(AdapterError::Catalog(Error::new(
2153                        ErrorKind::ReservedClusterName(to_name),
2154                    )));
2155                }
2156                tx.rename_cluster(id, &name, &to_name)?;
2157                CatalogState::add_to_audit_log(
2158                    &state.system_configuration,
2159                    oracle_write_ts,
2160                    session,
2161                    tx,
2162                    audit_events,
2163                    EventType::Alter,
2164                    ObjectType::Cluster,
2165                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2166                        id: id.to_string(),
2167                        old_name: name.clone(),
2168                        new_name: to_name.clone(),
2169                    }),
2170                )?;
2171                info!("rename cluster {name} to {to_name}");
2172            }
2173            Op::RenameClusterReplica {
2174                cluster_id,
2175                replica_id,
2176                name,
2177                to_name,
2178            } => {
2179                if is_reserved_name(&to_name) {
2180                    return Err(AdapterError::Catalog(Error::new(
2181                        ErrorKind::ReservedReplicaName(to_name),
2182                    )));
2183                }
2184                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2185                CatalogState::add_to_audit_log(
2186                    &state.system_configuration,
2187                    oracle_write_ts,
2188                    session,
2189                    tx,
2190                    audit_events,
2191                    EventType::Alter,
2192                    ObjectType::ClusterReplica,
2193                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2194                        cluster_id: cluster_id.to_string(),
2195                        replica_id: replica_id.to_string(),
2196                        old_name: name.replica.as_str().to_string(),
2197                        new_name: to_name.clone(),
2198                    }),
2199                )?;
2200                info!("rename cluster replica {name} to {to_name}");
2201            }
2202            Op::RenameItem {
2203                id,
2204                to_name,
2205                current_full_name,
2206            } => {
2207                let mut updates = Vec::new();
2208
2209                let entry = state.get_entry(&id);
2210                if let CatalogItem::Type(_) = entry.item() {
2211                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2212                        current_full_name.to_string(),
2213                    ))));
2214                }
2215
2216                if entry.id().is_system() {
2217                    let name = state
2218                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2219                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2220                        name.to_string(),
2221                    ))));
2222                }
2223
2224                let mut to_full_name = current_full_name.clone();
2225                to_full_name.item.clone_from(&to_name);
2226
2227                let mut to_qualified_name = entry.name().clone();
2228                to_qualified_name.item.clone_from(&to_name);
2229
2230                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2231                    id: id.to_string(),
2232                    old_name: Self::full_name_detail(&current_full_name),
2233                    new_name: Self::full_name_detail(&to_full_name),
2234                });
2235                if Self::should_audit_log_item(entry.item()) {
2236                    CatalogState::add_to_audit_log(
2237                        &state.system_configuration,
2238                        oracle_write_ts,
2239                        session,
2240                        tx,
2241                        audit_events,
2242                        EventType::Alter,
2243                        catalog_type_to_audit_object_type(entry.item().typ()),
2244                        details,
2245                    )?;
2246                }
2247
2248                // Rename item itself.
2249                let mut new_entry = entry.clone();
2250                new_entry.name.item.clone_from(&to_name);
2251                new_entry.item = entry
2252                    .item()
2253                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2254                    .map_err(|e| {
2255                        Error::new(ErrorKind::from(AmbiguousRename {
2256                            depender: state
2257                                .resolve_full_name(entry.name(), entry.conn_id())
2258                                .to_string(),
2259                            dependee: state
2260                                .resolve_full_name(entry.name(), entry.conn_id())
2261                                .to_string(),
2262                            message: e,
2263                        }))
2264                    })?;
2265
2266                for id in entry.referenced_by() {
2267                    let dependent_item = state.get_entry(id);
2268                    let mut to_entry = dependent_item.clone();
2269                    to_entry.item = dependent_item
2270                        .item()
2271                        .rename_item_refs(
2272                            current_full_name.clone(),
2273                            to_full_name.item.clone(),
2274                            false,
2275                        )
2276                        .map_err(|e| {
2277                            Error::new(ErrorKind::from(AmbiguousRename {
2278                                depender: state
2279                                    .resolve_full_name(
2280                                        dependent_item.name(),
2281                                        dependent_item.conn_id(),
2282                                    )
2283                                    .to_string(),
2284                                dependee: state
2285                                    .resolve_full_name(entry.name(), entry.conn_id())
2286                                    .to_string(),
2287                                message: e,
2288                            }))
2289                        })?;
2290
2291                    if !to_entry.item().is_temporary() {
2292                        tx.update_item(*id, to_entry.into())?;
2293                    } else {
2294                        temporary_item_updates
2295                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2296                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2297                    }
2298                    updates.push(*id);
2299                }
2300                if !new_entry.item().is_temporary() {
2301                    tx.update_item(id, new_entry.into())?;
2302                } else {
2303                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2304                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2305                }
2306
2307                updates.push(id);
2308                for id in updates {
2309                    Self::log_update(state, &id);
2310                }
2311            }
2312            Op::RenameSchema {
2313                database_spec,
2314                schema_spec,
2315                new_name,
2316                check_reserved_names,
2317            } => {
2318                if check_reserved_names && is_reserved_name(&new_name) {
2319                    return Err(AdapterError::Catalog(Error::new(
2320                        ErrorKind::ReservedSchemaName(new_name),
2321                    )));
2322                }
2323
2324                let conn_id = session
2325                    .map(|session| session.conn_id())
2326                    .unwrap_or(&SYSTEM_CONN_ID);
2327
2328                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2329                let cur_name = schema.name().schema.clone();
2330
2331                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2332                    return Err(AdapterError::Catalog(Error::new(
2333                        ErrorKind::AmbientSchemaRename(cur_name),
2334                    )));
2335                };
2336                let database = state.get_database(&database_id);
2337                let database_name = &database.name;
2338
2339                let mut updates = Vec::new();
2340                let mut items_to_update = BTreeMap::new();
2341
2342                let mut update_item = |id| {
2343                    if items_to_update.contains_key(id) {
2344                        return Ok(());
2345                    }
2346
2347                    let entry = state.get_entry(id);
2348
2349                    // Update our item.
2350                    let mut new_entry = entry.clone();
2351                    new_entry.item = entry
2352                        .item
2353                        .rename_schema_refs(database_name, &cur_name, &new_name)
2354                        .map_err(|(s, _i)| {
2355                            Error::new(ErrorKind::from(AmbiguousRename {
2356                                depender: state
2357                                    .resolve_full_name(entry.name(), entry.conn_id())
2358                                    .to_string(),
2359                                dependee: format!("{database_name}.{cur_name}"),
2360                                message: format!("ambiguous reference to schema named {s}"),
2361                            }))
2362                        })?;
2363
2364                    // Queue updates for Catalog storage and Builtin Tables.
2365                    if !new_entry.item().is_temporary() {
2366                        items_to_update.insert(*id, new_entry.into());
2367                    } else {
2368                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2369                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2370                    }
2371                    updates.push(id);
2372
2373                    Ok::<_, AdapterError>(())
2374                };
2375
2376                // Update all of the items in the schema.
2377                for (_name, item_id) in &schema.items {
2378                    // Update the item itself.
2379                    update_item(item_id)?;
2380
2381                    // Update everything that depends on this item.
2382                    for id in state.get_entry(item_id).referenced_by() {
2383                        update_item(id)?;
2384                    }
2385                }
2386                // Note: When updating the transaction it's very important that we update the
2387                // items as a whole group, otherwise we exhibit quadratic behavior.
2388                tx.update_items(items_to_update)?;
2389
2390                // Renaming temporary schemas is not supported.
2391                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2392                    let schema_name = schema.name().schema.clone();
2393                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2394                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2395                    )));
2396                };
2397
2398                // Add an entry to the audit log.
2399                let database_name = database_spec
2400                    .id()
2401                    .map(|id| state.get_database(&id).name.clone());
2402                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2403                    id: schema_id.to_string(),
2404                    old_name: schema.name().schema.clone(),
2405                    new_name: new_name.clone(),
2406                    database_name,
2407                });
2408                CatalogState::add_to_audit_log(
2409                    &state.system_configuration,
2410                    oracle_write_ts,
2411                    session,
2412                    tx,
2413                    audit_events,
2414                    EventType::Alter,
2415                    mz_audit_log::ObjectType::Schema,
2416                    details,
2417                )?;
2418
2419                // Update the schema itself.
2420                let mut new_schema = schema.clone();
2421                new_schema.name.schema.clone_from(&new_name);
2422                tx.update_schema(schema_id, new_schema.into())?;
2423
2424                for id in updates {
2425                    Self::log_update(state, id);
2426                }
2427            }
2428            Op::UpdateOwner { id, new_owner } => {
2429                let conn_id = session
2430                    .map(|session| session.conn_id())
2431                    .unwrap_or(&SYSTEM_CONN_ID);
2432                let old_owner = state
2433                    .get_owner_id(&id, conn_id)
2434                    .expect("cannot update the owner of an object without an owner");
2435                match &id {
2436                    ObjectId::Cluster(id) => {
2437                        let mut cluster = state.get_cluster(*id).clone();
2438                        if id.is_system() {
2439                            return Err(AdapterError::Catalog(Error::new(
2440                                ErrorKind::ReadOnlyCluster(cluster.name),
2441                            )));
2442                        }
2443                        Self::update_privilege_owners(
2444                            &mut cluster.privileges,
2445                            cluster.owner_id,
2446                            new_owner,
2447                        );
2448                        cluster.owner_id = new_owner;
2449                        tx.update_cluster(*id, cluster.into())?;
2450                    }
2451                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2452                        let cluster = state.get_cluster(*cluster_id);
2453                        let mut replica = cluster
2454                            .replica(*replica_id)
2455                            .expect("catalog out of sync")
2456                            .clone();
2457                        if replica_id.is_system() {
2458                            return Err(AdapterError::Catalog(Error::new(
2459                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2460                            )));
2461                        }
2462                        replica.owner_id = new_owner;
2463                        tx.update_cluster_replica(*replica_id, replica.into())?;
2464                    }
2465                    ObjectId::Database(id) => {
2466                        let mut database = state.get_database(id).clone();
2467                        if id.is_system() {
2468                            return Err(AdapterError::Catalog(Error::new(
2469                                ErrorKind::ReadOnlyDatabase(database.name),
2470                            )));
2471                        }
2472                        Self::update_privilege_owners(
2473                            &mut database.privileges,
2474                            database.owner_id,
2475                            new_owner,
2476                        );
2477                        database.owner_id = new_owner;
2478                        tx.update_database(*id, database.clone().into())?;
2479                    }
2480                    ObjectId::Schema((database_spec, schema_spec)) => {
2481                        let schema_id: SchemaId = schema_spec.clone().into();
2482                        let mut schema = state
2483                            .get_schema(database_spec, schema_spec, conn_id)
2484                            .clone();
2485                        if schema_id.is_system() {
2486                            let name = schema.name();
2487                            let full_name = state.resolve_full_schema_name(name);
2488                            return Err(AdapterError::Catalog(Error::new(
2489                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2490                            )));
2491                        }
2492                        Self::update_privilege_owners(
2493                            &mut schema.privileges,
2494                            schema.owner_id,
2495                            new_owner,
2496                        );
2497                        schema.owner_id = new_owner;
2498                        tx.update_schema(schema_id, schema.into())?;
2499                    }
2500                    ObjectId::Item(id) => {
2501                        let entry = state.get_entry(id);
2502                        let mut new_entry = entry.clone();
2503                        if id.is_system() {
2504                            let full_name = state.resolve_full_name(
2505                                new_entry.name(),
2506                                session.map(|session| session.conn_id()),
2507                            );
2508                            return Err(AdapterError::Catalog(Error::new(
2509                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2510                            )));
2511                        }
2512                        Self::update_privilege_owners(
2513                            &mut new_entry.privileges,
2514                            new_entry.owner_id,
2515                            new_owner,
2516                        );
2517                        new_entry.owner_id = new_owner;
2518                        if !new_entry.item().is_temporary() {
2519                            tx.update_item(*id, new_entry.into())?;
2520                        } else {
2521                            temporary_item_updates
2522                                .push((entry.clone().into(), StateDiff::Retraction));
2523                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2524                        }
2525                    }
2526                    ObjectId::NetworkPolicy(id) => {
2527                        let mut policy = state.get_network_policy(id).clone();
2528                        if id.is_system() {
2529                            return Err(AdapterError::Catalog(Error::new(
2530                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2531                            )));
2532                        }
2533                        Self::update_privilege_owners(
2534                            &mut policy.privileges,
2535                            policy.owner_id,
2536                            new_owner,
2537                        );
2538                        policy.owner_id = new_owner;
2539                        tx.update_network_policy(*id, policy.into())?;
2540                    }
2541                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2542                }
2543                let object_type = state.get_object_type(&id);
2544                CatalogState::add_to_audit_log(
2545                    &state.system_configuration,
2546                    oracle_write_ts,
2547                    session,
2548                    tx,
2549                    audit_events,
2550                    EventType::Alter,
2551                    object_type_to_audit_object_type(object_type),
2552                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2553                        object_id: id.to_string(),
2554                        old_owner_id: old_owner.to_string(),
2555                        new_owner_id: new_owner.to_string(),
2556                    }),
2557                )?;
2558            }
2559            Op::UpdateClusterConfig { id, name, config } => {
2560                let mut cluster = state.get_cluster(id).clone();
2561                cluster.config = config;
2562                tx.update_cluster(id, cluster.into())?;
2563                info!("update cluster {}", name);
2564
2565                CatalogState::add_to_audit_log(
2566                    &state.system_configuration,
2567                    oracle_write_ts,
2568                    session,
2569                    tx,
2570                    audit_events,
2571                    EventType::Alter,
2572                    ObjectType::Cluster,
2573                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2574                        id: id.to_string(),
2575                        name,
2576                    }),
2577                )?;
2578            }
2579            Op::UpdateClusterReplicaConfig {
2580                replica_id,
2581                cluster_id,
2582                config,
2583            } => {
2584                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2585                info!("update replica {}", replica.name);
2586                tx.update_cluster_replica(
2587                    replica_id,
2588                    mz_catalog::durable::ClusterReplica {
2589                        cluster_id,
2590                        replica_id,
2591                        name: replica.name.clone(),
2592                        config: config.clone().into(),
2593                        owner_id: replica.owner_id,
2594                    },
2595                )?;
2596            }
2597            Op::UpdateItem { id, name, to_item } => {
2598                let mut entry = state.get_entry(&id).clone();
2599                entry.name = name.clone();
2600                entry.item = to_item.clone();
2601                tx.update_item(id, entry.into())?;
2602
2603                if Self::should_audit_log_item(&to_item) {
2604                    let mut full_name = Self::full_name_detail(
2605                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2606                    );
2607                    full_name.item = name.item;
2608
2609                    CatalogState::add_to_audit_log(
2610                        &state.system_configuration,
2611                        oracle_write_ts,
2612                        session,
2613                        tx,
2614                        audit_events,
2615                        EventType::Alter,
2616                        catalog_type_to_audit_object_type(to_item.typ()),
2617                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2618                            id: id.to_string(),
2619                            name: full_name,
2620                        }),
2621                    )?;
2622                }
2623
2624                Self::log_update(state, &id);
2625            }
2626            Op::UpdateSystemConfiguration { name, value } => {
2627                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2628                tx.upsert_system_config(&name, parsed_value.clone())?;
2629                // This mirrors some "system vars" into the catalog storage
2630                // "config" collection so that we can toggle the flag with
2631                // Launch Darkly, but use it in boot before Launch Darkly is
2632                // available.
2633                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2634                    let with_0dt_deployment_max_wait =
2635                        Duration::parse(VarInput::Flat(&parsed_value))
2636                            .expect("parsing succeeded above");
2637                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2638                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2639                    let with_0dt_deployment_ddl_check_interval =
2640                        Duration::parse(VarInput::Flat(&parsed_value))
2641                            .expect("parsing succeeded above");
2642                    tx.set_0dt_deployment_ddl_check_interval(
2643                        with_0dt_deployment_ddl_check_interval,
2644                    )?;
2645                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2646                    let panic_after_timeout =
2647                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2648                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2649                }
2650
2651                CatalogState::add_to_audit_log(
2652                    &state.system_configuration,
2653                    oracle_write_ts,
2654                    session,
2655                    tx,
2656                    audit_events,
2657                    EventType::Alter,
2658                    ObjectType::System,
2659                    EventDetails::SetV1(mz_audit_log::SetV1 {
2660                        name,
2661                        value: Some(value.borrow().to_vec().join(", ")),
2662                    }),
2663                )?;
2664            }
2665            Op::ResetSystemConfiguration { name } => {
2666                tx.remove_system_config(&name);
2667                // This mirrors some "system vars" into the catalog storage
2668                // "config" collection so that we can toggle the flag with
2669                // Launch Darkly, but use it in boot before Launch Darkly is
2670                // available.
2671                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2672                    tx.reset_0dt_deployment_max_wait()?;
2673                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2674                    tx.reset_0dt_deployment_ddl_check_interval()?;
2675                }
2676
2677                CatalogState::add_to_audit_log(
2678                    &state.system_configuration,
2679                    oracle_write_ts,
2680                    session,
2681                    tx,
2682                    audit_events,
2683                    EventType::Alter,
2684                    ObjectType::System,
2685                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2686                )?;
2687            }
2688            Op::ResetAllSystemConfiguration => {
2689                tx.clear_system_configs();
2690                tx.reset_0dt_deployment_max_wait()?;
2691                tx.reset_0dt_deployment_ddl_check_interval()?;
2692
2693                CatalogState::add_to_audit_log(
2694                    &state.system_configuration,
2695                    oracle_write_ts,
2696                    session,
2697                    tx,
2698                    audit_events,
2699                    EventType::Alter,
2700                    ObjectType::System,
2701                    EventDetails::ResetAllV1,
2702                )?;
2703            }
2704            Op::WeirdStorageUsageUpdates {
2705                object_id,
2706                size_bytes,
2707                collection_timestamp,
2708            } => {
2709                let id = tx.allocate_storage_usage_ids()?;
2710                let metric =
2711                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2712                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2713                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2714                weird_builtin_table_update = Some(builtin_table_update);
2715            }
2716            Op::InjectAuditEvents { events } => {
2717                for event in events {
2718                    let id = tx.allocate_audit_log_id()?;
2719                    let ev = VersionedEvent::new(
2720                        id,
2721                        event.event_type,
2722                        event.object_type,
2723                        event.details,
2724                        event.user,
2725                        oracle_write_ts.into(),
2726                    );
2727                    audit_events.push(ev.clone());
2728                    tx.insert_audit_log_event(ev);
2729                }
2730            }
2731        };
2732        Ok((weird_builtin_table_update, temporary_item_updates))
2733    }
2734
2735    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2736        let entry = state.get_entry(id);
2737        info!(
2738            "update {} {} ({})",
2739            entry.item_type(),
2740            state.resolve_full_name(entry.name(), entry.conn_id()),
2741            id
2742        );
2743    }
2744
2745    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2746    /// implementation:
2747    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2748    fn update_privilege_owners(
2749        privileges: &mut PrivilegeMap,
2750        old_owner: RoleId,
2751        new_owner: RoleId,
2752    ) {
2753        // TODO(jkosh44) Would be nice not to clone every privilege.
2754        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2755
2756        let mut new_present = false;
2757        for privilege in flat_privileges.iter_mut() {
2758            // Old owner's granted privilege are updated to be granted by the new
2759            // owner.
2760            if privilege.grantor == old_owner {
2761                privilege.grantor = new_owner;
2762            } else if privilege.grantor == new_owner {
2763                new_present = true;
2764            }
2765            // Old owner's privileges is given to the new owner.
2766            if privilege.grantee == old_owner {
2767                privilege.grantee = new_owner;
2768            } else if privilege.grantee == new_owner {
2769                new_present = true;
2770            }
2771        }
2772
2773        // If the old privilege list contained references to the new owner, we may
2774        // have created duplicate entries. Here we try and consolidate them. This
2775        // is inspired by PostgreSQL's algorithm but not identical.
2776        if new_present {
2777            // Group privileges by (grantee, grantor).
2778            let privilege_map: BTreeMap<_, Vec<_>> =
2779                flat_privileges
2780                    .into_iter()
2781                    .fold(BTreeMap::new(), |mut accum, privilege| {
2782                        accum
2783                            .entry((privilege.grantee, privilege.grantor))
2784                            .or_default()
2785                            .push(privilege);
2786                        accum
2787                    });
2788
2789            // Consolidate and update all privileges.
2790            flat_privileges = privilege_map
2791                .into_iter()
2792                .map(|((grantee, grantor), values)|
2793                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2794                    values.into_iter().fold(
2795                        MzAclItem::empty(grantee, grantor),
2796                        |mut accum, mz_aclitem| {
2797                            accum.acl_mode =
2798                                accum.acl_mode.union(mz_aclitem.acl_mode);
2799                            accum
2800                        },
2801                    ))
2802                .collect();
2803        }
2804
2805        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2806    }
2807}
2808
2809/// Prepare the given transaction for replacing a catalog item with a new version.
2810///
2811/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2812/// dependent objects to refer to that new ID (at a previous version).
2813///
2814/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2815/// for catalog items. We currently think that there are no use cases that require this assumption,
2816/// but no way to know for sure.
2817fn tx_replace_item(
2818    tx: &mut Transaction<'_>,
2819    state: &CatalogState,
2820    id: CatalogItemId,
2821    new_entry: CatalogEntry,
2822) -> Result<(), AdapterError> {
2823    let new_id = new_entry.id;
2824
2825    // Rewrite dependent objects to point to the new ID.
2826    for use_id in new_entry.referenced_by() {
2827        // The dependent might be dropped in the same tx, so check.
2828        if tx.get_item(use_id).is_none() {
2829            continue;
2830        }
2831
2832        let mut dependent = state.get_entry(use_id).clone();
2833        dependent.item = dependent.item.replace_item_refs(id, new_id);
2834        tx.update_item(*use_id, dependent.into())?;
2835    }
2836
2837    // Move comments to the new ID.
2838    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2839    let new_comment_id = new_entry.comment_object_id();
2840    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2841        tx.drop_comments(&[old_comment_id].into())?;
2842        for (sub, comment) in comments {
2843            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2844        }
2845    }
2846
2847    let mz_catalog::durable::Item {
2848        id: _,
2849        oid,
2850        global_id,
2851        schema_id,
2852        name,
2853        create_sql,
2854        owner_id,
2855        privileges,
2856        extra_versions,
2857    } = new_entry.into();
2858
2859    tx.remove_item(id)?;
2860    tx.insert_item(
2861        new_id,
2862        oid,
2863        global_id,
2864        schema_id,
2865        &name,
2866        create_sql,
2867        owner_id,
2868        privileges,
2869        extra_versions,
2870    )?;
2871
2872    Ok(())
2873}
2874
2875/// Generate audit events for a replacement apply operation.
2876fn apply_replacement_audit_events(
2877    state: &CatalogState,
2878    target: &CatalogEntry,
2879    replacement: &CatalogEntry,
2880) -> Vec<(EventType, EventDetails)> {
2881    let mut events = Vec::new();
2882
2883    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2884    let target_id_name = IdFullNameV1 {
2885        id: target.id().to_string(),
2886        name: Catalog::full_name_detail(&target_name),
2887    };
2888    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2889    let replacement_id_name = IdFullNameV1 {
2890        id: replacement.id().to_string(),
2891        name: Catalog::full_name_detail(&replacement_name),
2892    };
2893
2894    if Catalog::should_audit_log_item(&replacement.item) {
2895        events.push((
2896            EventType::Drop,
2897            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2898        ));
2899    }
2900
2901    if Catalog::should_audit_log_item(&target.item) {
2902        events.push((
2903            EventType::Alter,
2904            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2905                target: target_id_name.clone(),
2906                replacement: replacement_id_name,
2907            }),
2908        ));
2909
2910        if let Some(old_cluster_id) = target.cluster_id()
2911            && let Some(new_cluster_id) = replacement.cluster_id()
2912            && old_cluster_id != new_cluster_id
2913        {
2914            // When the replacement is applied, the target takes on the ID of the replacement, so
2915            // we should use that ID for subsequent events.
2916            events.push((
2917                EventType::Alter,
2918                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2919                    id: replacement.id().to_string(),
2920                    name: target_id_name.name,
2921                    old_cluster_id: old_cluster_id.to_string(),
2922                    new_cluster_id: new_cluster_id.to_string(),
2923                }),
2924            ));
2925        }
2926    }
2927
2928    events
2929}
2930
2931/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2932///
2933/// Note: Previously we used to omit a single `Op::DropObject` for every object
2934/// we needed to drop. But removing a batch of objects from a durable Catalog
2935/// Transaction is O(n) where `n` is the number of objects that exist in the
2936/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2937/// `DROP ... CASCADE` statement.
2938#[derive(Debug, Default)]
2939pub(crate) struct ObjectsToDrop {
2940    pub comments: BTreeSet<CommentObjectId>,
2941    pub databases: BTreeSet<DatabaseId>,
2942    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2943    pub clusters: BTreeSet<ClusterId>,
2944    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2945    pub roles: BTreeSet<RoleId>,
2946    pub items: Vec<CatalogItemId>,
2947    pub network_policies: BTreeSet<NetworkPolicyId>,
2948}
2949
2950impl ObjectsToDrop {
2951    pub fn generate(
2952        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2953        state: &CatalogState,
2954        session: Option<&ConnMeta>,
2955    ) -> Result<Self, AdapterError> {
2956        let mut delta = ObjectsToDrop::default();
2957
2958        for drop_object_info in drop_object_infos {
2959            delta.add_item(drop_object_info, state, session)?;
2960        }
2961
2962        Ok(delta)
2963    }
2964
2965    fn add_item(
2966        &mut self,
2967        drop_object_info: DropObjectInfo,
2968        state: &CatalogState,
2969        session: Option<&ConnMeta>,
2970    ) -> Result<(), AdapterError> {
2971        self.comments
2972            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2973
2974        match drop_object_info {
2975            DropObjectInfo::Database(database_id) => {
2976                let database = &state.database_by_id[&database_id];
2977                if database_id.is_system() {
2978                    return Err(AdapterError::Catalog(Error::new(
2979                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2980                    )));
2981                }
2982
2983                self.databases.insert(database_id);
2984            }
2985            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2986                let schema = state.get_schema(
2987                    &database_spec,
2988                    &schema_spec,
2989                    session
2990                        .map(|session| session.conn_id())
2991                        .unwrap_or(&SYSTEM_CONN_ID),
2992                );
2993                let schema_id: SchemaId = schema_spec.into();
2994                if schema_id.is_system() {
2995                    let name = schema.name();
2996                    let full_name = state.resolve_full_schema_name(name);
2997                    return Err(AdapterError::Catalog(Error::new(
2998                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2999                    )));
3000                }
3001
3002                self.schemas.insert(schema_spec, database_spec);
3003            }
3004            DropObjectInfo::Role(role_id) => {
3005                let name = state.get_role(&role_id).name().to_string();
3006                if role_id.is_system() || role_id.is_predefined() {
3007                    return Err(AdapterError::Catalog(Error::new(
3008                        ErrorKind::ReservedRoleName(name.clone()),
3009                    )));
3010                }
3011                state.ensure_not_reserved_role(&role_id)?;
3012
3013                self.roles.insert(role_id);
3014            }
3015            DropObjectInfo::Cluster(cluster_id) => {
3016                let cluster = state.get_cluster(cluster_id);
3017                let name = &cluster.name;
3018                if cluster_id.is_system() {
3019                    return Err(AdapterError::Catalog(Error::new(
3020                        ErrorKind::ReadOnlyCluster(name.clone()),
3021                    )));
3022                }
3023
3024                self.clusters.insert(cluster_id);
3025            }
3026            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3027                let cluster = state.get_cluster(cluster_id);
3028                let replica = cluster.replica(replica_id).expect("Must exist");
3029
3030                self.replicas
3031                    .insert(replica.replica_id, (cluster.id, reason));
3032
3033                // Implicitly drop materialized views that target this replica.
3034                // When the target replica is gone, no replica advances the
3035                // persist shard's upper frontier, causing reads to hang.
3036                for (_id, entry) in state.get_entries() {
3037                    if let CatalogItem::MaterializedView(mv) = entry.item() {
3038                        if mv.target_replica == Some(replica_id)
3039                            && !self.items.contains(&entry.id())
3040                        {
3041                            tracing::warn!(
3042                                "implicitly dropping materialized view {} because target replica was dropped",
3043                                entry.name().item,
3044                            );
3045                            self.items.push(entry.id());
3046                        }
3047                    }
3048                }
3049            }
3050            DropObjectInfo::Item(item_id) => {
3051                let entry = state.get_entry(&item_id);
3052                if item_id.is_system() {
3053                    let name = entry.name();
3054                    let full_name =
3055                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
3056                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3057                        full_name.to_string(),
3058                    ))));
3059                }
3060
3061                self.items.push(item_id);
3062            }
3063            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3064                let policy = state.get_network_policy(&network_policy_id);
3065                let name = &policy.name;
3066                if network_policy_id.is_system() {
3067                    return Err(AdapterError::Catalog(Error::new(
3068                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3069                    )));
3070                }
3071
3072                self.network_policies.insert(network_policy_id);
3073            }
3074        }
3075
3076        Ok(())
3077    }
3078}
3079
3080#[cfg(test)]
3081mod tests {
3082    use mz_catalog::SYSTEM_CONN_ID;
3083    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3084    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3085    use mz_repr::role_id::RoleId;
3086    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3087    use mz_sql::DEFAULT_SCHEMA;
3088    use mz_sql::catalog::CatalogDatabase;
3089    use mz_sql::names::{
3090        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3091    };
3092    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3093
3094    use crate::catalog::{Catalog, Op};
3095    use crate::session::DEFAULT_DATABASE_NAME;
3096
3097    #[mz_ore::test]
3098    fn test_update_privilege_owners() {
3099        let old_owner = RoleId::User(1);
3100        let new_owner = RoleId::User(2);
3101        let other_role = RoleId::User(3);
3102
3103        // older owner exists as grantor.
3104        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3105            MzAclItem {
3106                grantee: other_role,
3107                grantor: old_owner,
3108                acl_mode: AclMode::UPDATE,
3109            },
3110            MzAclItem {
3111                grantee: other_role,
3112                grantor: new_owner,
3113                acl_mode: AclMode::SELECT,
3114            },
3115        ]);
3116        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3117        assert_eq!(1, privileges.all_values().count());
3118        assert_eq!(
3119            vec![MzAclItem {
3120                grantee: other_role,
3121                grantor: new_owner,
3122                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3123            }],
3124            privileges.all_values_owned().collect::<Vec<_>>()
3125        );
3126
3127        // older owner exists as grantee.
3128        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3129            MzAclItem {
3130                grantee: old_owner,
3131                grantor: other_role,
3132                acl_mode: AclMode::UPDATE,
3133            },
3134            MzAclItem {
3135                grantee: new_owner,
3136                grantor: other_role,
3137                acl_mode: AclMode::SELECT,
3138            },
3139        ]);
3140        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3141        assert_eq!(1, privileges.all_values().count());
3142        assert_eq!(
3143            vec![MzAclItem {
3144                grantee: new_owner,
3145                grantor: other_role,
3146                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3147            }],
3148            privileges.all_values_owned().collect::<Vec<_>>()
3149        );
3150
3151        // older owner exists as grantee and grantor.
3152        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3153            MzAclItem {
3154                grantee: old_owner,
3155                grantor: old_owner,
3156                acl_mode: AclMode::UPDATE,
3157            },
3158            MzAclItem {
3159                grantee: new_owner,
3160                grantor: new_owner,
3161                acl_mode: AclMode::SELECT,
3162            },
3163        ]);
3164        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3165        assert_eq!(1, privileges.all_values().count());
3166        assert_eq!(
3167            vec![MzAclItem {
3168                grantee: new_owner,
3169                grantor: new_owner,
3170                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3171            }],
3172            privileges.all_values_owned().collect::<Vec<_>>()
3173        );
3174    }
3175
3176    /// Verifies that `transact_incremental_dry_run` processes only new ops
3177    /// against the accumulated state, not all ops from scratch. Two paths are
3178    /// compared:
3179    ///   - Incremental: two separate calls, each with one op
3180    ///   - All-at-once: one call with both ops
3181    /// Both must produce equivalent catalog state.
3182    #[mz_ore::test(tokio::test)]
3183    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3184    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3185        Catalog::with_debug(|catalog| async move {
3186            // Resolve default database and schema.
3187            let database = catalog
3188                .resolve_database(DEFAULT_DATABASE_NAME)
3189                .expect("default database");
3190            let database_name = database.name.clone();
3191            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3192            let schema = catalog
3193                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3194                .expect("default schema");
3195            let schema_name = schema.name.schema.clone();
3196            let schema_spec = schema.id.clone();
3197
3198            // Allocate IDs for two tables.
3199            let (id_t1, global_id_t1) = catalog
3200                .allocate_user_id_for_test()
3201                .await
3202                .expect("allocate id for t1");
3203            let (id_t2, global_id_t2) = catalog
3204                .allocate_user_id_for_test()
3205                .await
3206                .expect("allocate id for t2");
3207
3208            let oracle_write_ts = catalog.current_upper().await;
3209
3210            // Build two CreateItem ops.
3211            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3212                id,
3213                name: QualifiedItemName {
3214                    qualifiers: ItemQualifiers {
3215                        database_spec: database_spec.clone(),
3216                        schema_spec: schema_spec.clone(),
3217                    },
3218                    item: name.to_string(),
3219                },
3220                item: CatalogItem::Table(Table {
3221                    create_sql: Some(format!(
3222                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3223                    )),
3224                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3225                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3226                    conn_id: None,
3227                    resolved_ids: ResolvedIds::empty(),
3228                    custom_logical_compaction_window: None,
3229                    is_retained_metrics_object: false,
3230                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3231                }),
3232                owner_id: MZ_SYSTEM_ROLE_ID,
3233            };
3234
3235            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3236            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3237
3238            let base_state = catalog.state().clone();
3239
3240            // --- Path A: Incremental (two separate dry-run calls) ---
3241
3242            // First call: only op_t1, no previous snapshot.
3243            let (state_after_t1, snapshot_after_t1) = catalog
3244                .transact_incremental_dry_run(
3245                    &base_state,
3246                    vec![op_t1.clone()],
3247                    None,
3248                    None,
3249                    oracle_write_ts,
3250                )
3251                .await
3252                .expect("first dry run");
3253
3254            // After first dry run: t1 exists, t2 does not.
3255            assert!(
3256                state_after_t1.try_get_entry(&id_t1).is_some(),
3257                "t1 should exist after first dry run"
3258            );
3259            assert_eq!(
3260                state_after_t1
3261                    .try_get_entry(&id_t1)
3262                    .expect("t1 entry")
3263                    .name()
3264                    .item,
3265                "t1"
3266            );
3267            assert!(
3268                state_after_t1.try_get_entry(&id_t2).is_none(),
3269                "t2 should NOT exist after first dry run"
3270            );
3271
3272            // Second call: only op_t2, using state/snapshot from first call.
3273            let (state_incremental, _) = catalog
3274                .transact_incremental_dry_run(
3275                    &state_after_t1,
3276                    vec![op_t2.clone()],
3277                    None,
3278                    Some(snapshot_after_t1),
3279                    oracle_write_ts,
3280                )
3281                .await
3282                .expect("second dry run");
3283
3284            // After second dry run: both t1 and t2 exist.
3285            assert!(
3286                state_incremental.try_get_entry(&id_t1).is_some(),
3287                "t1 should exist in incremental result"
3288            );
3289            assert!(
3290                state_incremental.try_get_entry(&id_t2).is_some(),
3291                "t2 should exist in incremental result"
3292            );
3293
3294            // --- Path B: All-at-once (single dry-run call with both ops) ---
3295
3296            let (state_all_at_once, _) = catalog
3297                .transact_incremental_dry_run(
3298                    &base_state,
3299                    vec![op_t1.clone(), op_t2.clone()],
3300                    None,
3301                    None,
3302                    oracle_write_ts,
3303                )
3304                .await
3305                .expect("all-at-once dry run");
3306
3307            assert!(
3308                state_all_at_once.try_get_entry(&id_t1).is_some(),
3309                "t1 should exist in all-at-once result"
3310            );
3311            assert!(
3312                state_all_at_once.try_get_entry(&id_t2).is_some(),
3313                "t2 should exist in all-at-once result"
3314            );
3315
3316            // --- Compare: both paths produce equivalent items ---
3317
3318            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3319            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3320            assert_eq!(inc_t1.name(), all_t1.name());
3321            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3322
3323            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3324            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3325            assert_eq!(inc_t2.name(), all_t2.name());
3326            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3327
3328            catalog.expire().await;
3329        })
3330        .await
3331    }
3332}