Skip to main content

mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
31use mz_catalog::expr_cache::LocalExpressions;
32use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, ClusterConfig, DataSourceDesc, DefaultPrivileges, SourceReferences,
35    StateDiff, StateUpdate, StateUpdateKind, TemporaryItem,
36};
37use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
38use mz_controller_types::{ClusterId, ReplicaId};
39use mz_ore::collections::HashSet;
40use mz_ore::instrument;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::optimize::OptimizerFeatures;
45use mz_repr::role_id::RoleId;
46use mz_repr::{CatalogItemId, ColumnName, GlobalId, SqlColumnType, strconv};
47use mz_sql::ast::RawDataType;
48use mz_sql::catalog::{
49    AutoProvisionSource, CatalogDatabase, CatalogError as SqlCatalogError,
50    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
51    DefaultPrivilegeObject, PasswordAction, PasswordConfig, RoleAttributesRaw, RoleMembership,
52    RoleVars,
53};
54use mz_sql::names::{
55    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
56    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
57};
58use mz_sql::plan::{NetworkPolicyRule, PlanError};
59use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
60use mz_sql::session::vars::OwnedVarInput;
61use mz_sql::session::vars::{Value as VarValue, VarInput};
62use mz_sql::{DEFAULT_SCHEMA, rbac};
63use mz_sql_parser::ast::{QualifiedReplica, Value};
64use mz_storage_client::storage_collections::StorageCollections;
65use serde::{Deserialize, Serialize};
66use tracing::{info, trace};
67
68use crate::AdapterError;
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
72    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
73    is_reserved_role_name, object_type_to_audit_object_type,
74    system_object_type_to_audit_object_type,
75};
76use crate::config::ScopedParameters;
77use crate::coord::ConnMeta;
78use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
79use crate::coord::cluster_scheduling::SchedulingDecision;
80use crate::util::ResultExt;
81
82/// A manually injected audit event.
83///
84/// Matches [`mz_audit_log::EventV1`], but without the `id` and `occurred_at` fields -- both are
85/// filled in automatically.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct InjectedAuditEvent {
88    pub event_type: EventType,
89    pub object_type: ObjectType,
90    pub details: EventDetails,
91    pub user: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub enum Op {
96    AlterRetainHistory {
97        id: CatalogItemId,
98        value: Option<Value>,
99        window: CompactionWindow,
100    },
101    AlterSourceTimestampInterval {
102        id: CatalogItemId,
103        value: Option<Value>,
104        interval: Duration,
105    },
106    AlterRole {
107        id: RoleId,
108        name: String,
109        attributes: RoleAttributesRaw,
110        nopassword: bool,
111        vars: RoleVars,
112    },
113    AlterNetworkPolicy {
114        id: NetworkPolicyId,
115        rules: Vec<NetworkPolicyRule>,
116        name: String,
117        owner_id: RoleId,
118    },
119    AlterAddColumn {
120        id: CatalogItemId,
121        new_global_id: GlobalId,
122        name: ColumnName,
123        typ: SqlColumnType,
124        sql: RawDataType,
125    },
126    AlterMaterializedViewApplyReplacement {
127        id: CatalogItemId,
128        replacement_id: CatalogItemId,
129    },
130    CreateDatabase {
131        name: String,
132        owner_id: RoleId,
133    },
134    CreateSchema {
135        database_id: ResolvedDatabaseSpecifier,
136        schema_name: String,
137        owner_id: RoleId,
138    },
139    CreateRole {
140        name: String,
141        attributes: RoleAttributesRaw,
142    },
143    CreateCluster {
144        id: ClusterId,
145        name: String,
146        introspection_sources: Vec<&'static BuiltinLog>,
147        owner_id: RoleId,
148        config: ClusterConfig,
149    },
150    CreateClusterReplica {
151        cluster_id: ClusterId,
152        name: String,
153        config: ReplicaConfig,
154        owner_id: RoleId,
155        reason: ReplicaCreateDropReason,
156    },
157    CreateItem {
158        id: CatalogItemId,
159        name: QualifiedItemName,
160        item: CatalogItem,
161        owner_id: RoleId,
162    },
163    CreateNetworkPolicy {
164        rules: Vec<NetworkPolicyRule>,
165        name: String,
166        owner_id: RoleId,
167    },
168    Comment {
169        object_id: CommentObjectId,
170        sub_component: Option<usize>,
171        comment: Option<String>,
172    },
173    DropObjects(Vec<DropObjectInfo>),
174    GrantRole {
175        role_id: RoleId,
176        member_id: RoleId,
177        grantor_id: RoleId,
178    },
179    RenameCluster {
180        id: ClusterId,
181        name: String,
182        to_name: String,
183        check_reserved_names: bool,
184    },
185    RenameClusterReplica {
186        cluster_id: ClusterId,
187        replica_id: ReplicaId,
188        name: QualifiedReplica,
189        to_name: String,
190    },
191    RenameItem {
192        id: CatalogItemId,
193        current_full_name: FullItemName,
194        to_name: String,
195    },
196    RenameSchema {
197        database_spec: ResolvedDatabaseSpecifier,
198        schema_spec: SchemaSpecifier,
199        new_name: String,
200        check_reserved_names: bool,
201    },
202    UpdateOwner {
203        id: ObjectId,
204        new_owner: RoleId,
205    },
206    UpdatePrivilege {
207        target_id: SystemObjectId,
208        privilege: MzAclItem,
209        variant: UpdatePrivilegeVariant,
210    },
211    UpdateDefaultPrivilege {
212        privilege_object: DefaultPrivilegeObject,
213        privilege_acl_item: DefaultPrivilegeAclItem,
214        variant: UpdatePrivilegeVariant,
215    },
216    RevokeRole {
217        role_id: RoleId,
218        member_id: RoleId,
219        grantor_id: RoleId,
220    },
221    UpdateClusterConfig {
222        id: ClusterId,
223        name: String,
224        config: ClusterConfig,
225    },
226    UpdateClusterReplicaConfig {
227        cluster_id: ClusterId,
228        replica_id: ReplicaId,
229        config: ReplicaConfig,
230    },
231    UpdateItem {
232        id: CatalogItemId,
233        name: QualifiedItemName,
234        to_item: CatalogItem,
235    },
236    UpdateSourceReferences {
237        source_id: CatalogItemId,
238        references: SourceReferences,
239    },
240    UpdateSystemConfiguration {
241        name: String,
242        value: OwnedVarInput,
243    },
244    ResetSystemConfiguration {
245        name: String,
246    },
247    ResetAllSystemConfiguration,
248    /// Persists the durable cache of scoped (per-cluster and per-replica)
249    /// system parameters to match `scoped`, the complete desired state computed
250    /// by the system-parameter sync loop (the sole writer). The handler diffs
251    /// against the current durable contents: it upserts changed/added entries
252    /// and removes entries the sync loop no longer serves, and lazily prunes
253    /// entries whose owning object id is absent from the catalog. See the
254    /// scoped feature flags design.
255    UpdateScopedSystemParameters {
256        scoped: ScopedParameters,
257    },
258    /// Injects audit events into the catalog.
259    ///
260    /// This is a nonstandard path used for manually appending audit events at the current time.
261    /// Mainly useful for correcting the audit log in the face of bugs that made us forget to emit
262    /// audit events.
263    InjectAuditEvents {
264        events: Vec<InjectedAuditEvent>,
265    },
266}
267
268/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
269/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
270/// the `Op::DropObjects`.
271#[derive(Debug, Clone)]
272pub enum DropObjectInfo {
273    Cluster(ClusterId),
274    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
275    Database(DatabaseId),
276    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
277    Role(RoleId),
278    Item(CatalogItemId),
279    NetworkPolicy(NetworkPolicyId),
280}
281
282impl DropObjectInfo {
283    /// Creates a `DropObjectInfo` from an `ObjectId`.
284    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
285    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
286        match id {
287            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
288            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
289                cluster_id,
290                replica_id,
291                ReplicaCreateDropReason::Manual,
292            )),
293            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
294            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
295            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
296            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
297            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
298        }
299    }
300
301    /// Creates an `ObjectId` from a `DropObjectInfo`.
302    /// Loses the `ReplicaCreateDropReason` if there is one!
303    fn to_object_id(&self) -> ObjectId {
304        match &self {
305            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
306            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
307                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
308            }
309            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
310            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
311            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
312            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
313            DropObjectInfo::NetworkPolicy(network_policy_id) => {
314                ObjectId::NetworkPolicy(network_policy_id.clone())
315            }
316        }
317    }
318}
319
320/// The reason for creating or dropping a replica.
321#[derive(Debug, Clone)]
322pub enum ReplicaCreateDropReason {
323    /// The user initiated the replica create or drop, e.g., by
324    /// - creating/dropping a cluster,
325    /// - ALTERing various options on a managed cluster,
326    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
327    Manual,
328    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
329    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
330    ClusterScheduling(Vec<SchedulingDecision>),
331}
332
333impl ReplicaCreateDropReason {
334    pub fn into_audit_log(
335        self,
336    ) -> (
337        CreateOrDropClusterReplicaReasonV1,
338        Option<SchedulingDecisionsWithReasonsV2>,
339    ) {
340        let (reason, scheduling_policies) = match self {
341            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
342            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
343                CreateOrDropClusterReplicaReasonV1::Schedule,
344                Some(scheduling_decisions),
345            ),
346        };
347        (
348            reason,
349            scheduling_policies
350                .as_ref()
351                .map(SchedulingDecision::reasons_to_audit_log_reasons),
352        )
353    }
354}
355
356pub struct TransactionResult {
357    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
358    /// Parsed catalog updates from which we will derive catalog implications.
359    pub catalog_updates: Vec<ParsedStateUpdate>,
360    pub audit_events: Vec<VersionedEvent>,
361}
362
363#[derive(Debug, Clone, Copy)]
364enum TransactInnerMode {
365    /// Prepare storage state and return a commit-ready transaction state.
366    ///
367    /// This mode is used by durable catalog transactions.
368    Commit,
369    /// Execute a dry run against a durable transaction that will not be
370    /// committed.
371    ///
372    /// This mode still validates and applies updates to an in-memory
373    /// `CatalogState`, but it must not call `prepare_state` because dry runs
374    /// must not trigger controller side effects.
375    DryRun,
376}
377
378impl Catalog {
379    fn should_audit_log_item(item: &CatalogItem) -> bool {
380        !item.is_temporary()
381    }
382
383    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
384    /// within a connection id.
385    fn temporary_ids(
386        &self,
387        ops: &[Op],
388        temporary_drops: BTreeSet<(&ConnectionId, String)>,
389    ) -> Result<BTreeSet<CatalogItemId>, Error> {
390        let mut creating = BTreeSet::new();
391        let mut temporary_ids = BTreeSet::new();
392        for op in ops.iter() {
393            if let Op::CreateItem {
394                id,
395                name,
396                item,
397                owner_id: _,
398            } = op
399            {
400                if let Some(conn_id) = item.conn_id() {
401                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
402                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
403                        || creating.contains(&(conn_id, &name.item))
404                    {
405                        return Err(
406                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
407                        );
408                    } else {
409                        creating.insert((conn_id, &name.item));
410                        temporary_ids.insert(id.clone());
411                    }
412                }
413            }
414        }
415        Ok(temporary_ids)
416    }
417
418    #[instrument(name = "catalog::transact")]
419    pub async fn transact(
420        &mut self,
421        // n.b. this is an option to prevent us from needing to build out a
422        // dummy impl of `StorageController` for tests.
423        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
424        oracle_write_ts: mz_repr::Timestamp,
425        session: Option<&ConnMeta>,
426        ops: Vec<Op>,
427    ) -> Result<TransactionResult, AdapterError> {
428        trace!("transact: {:?}", ops);
429        fail::fail_point!("catalog_transact", |arg| {
430            Err(AdapterError::Unstructured(anyhow::anyhow!(
431                "failpoint: {arg:?}"
432            )))
433        });
434
435        let drop_ids: BTreeSet<CatalogItemId> = ops
436            .iter()
437            .filter_map(|op| match op {
438                Op::DropObjects(drop_object_infos) => {
439                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
440                    let item_ids = ids.filter_map(|id| match id {
441                        ObjectId::Item(id) => Some(id),
442                        _ => None,
443                    });
444                    Some(item_ids)
445                }
446                _ => None,
447            })
448            .flatten()
449            .collect();
450        let temporary_drops = drop_ids
451            .iter()
452            .filter_map(|id| {
453                let entry = self.get_entry(id);
454                match entry.item().conn_id() {
455                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
456                    None => None,
457                }
458            })
459            .collect();
460
461        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
462        let mut builtin_table_updates = vec![];
463        let mut catalog_updates = vec![];
464        let mut audit_events = vec![];
465        let mut storage = self.storage().await;
466        let mut tx = storage
467            .transaction()
468            .await
469            .unwrap_or_terminate("starting catalog transaction");
470
471        let new_state = Self::transact_inner(
472            TransactInnerMode::Commit,
473            storage_collections,
474            oracle_write_ts,
475            session,
476            ops,
477            temporary_ids,
478            &mut builtin_table_updates,
479            &mut catalog_updates,
480            &mut audit_events,
481            &mut tx,
482            &self.state,
483        )
484        .await?;
485
486        // The user closure was successful, apply the updates. Terminate the
487        // process if this fails, because we have to restart envd due to
488        // indeterminate catalog state, which we only reconcile during catalog
489        // init.
490        tx.commit(oracle_write_ts)
491            .await
492            .unwrap_or_terminate("catalog storage transaction commit must succeed");
493
494        // Dropping here keeps the mutable borrow on self, preventing us accidentally
495        // mutating anything until after f is executed.
496        drop(storage);
497        if let Some(new_state) = new_state {
498            self.transient_revision += 1;
499            self.state = new_state;
500        }
501
502        Ok(TransactionResult {
503            builtin_table_updates,
504            catalog_updates,
505            audit_events,
506        })
507    }
508
509    /// Performs an incremental dry-run catalog transaction: processes only the
510    /// NEW ops against an accumulated `CatalogState` from previous dry runs.
511    /// This avoids the O(N^2) replay cost of replaying all accumulated ops.
512    ///
513    /// The durable transaction is intentionally never committed and no storage
514    /// controller prepare-state side effects are run.
515    ///
516    /// If `prev_snapshot` is `Some`, the transaction is initialized from that
517    /// snapshot (which represents the tx state after the previous dry run),
518    /// ensuring it starts in sync with `base_state`. If `None` (first
519    /// statement), a fresh transaction is loaded from durable storage.
520    ///
521    /// Returns the new accumulated state and a snapshot of the transaction's
522    /// state for use in subsequent incremental dry runs.
523    pub async fn transact_incremental_dry_run(
524        &self,
525        base_state: &CatalogState,
526        ops: Vec<Op>,
527        session: Option<&ConnMeta>,
528        prev_snapshot: Option<Snapshot>,
529        oracle_write_ts: mz_repr::Timestamp,
530    ) -> Result<(CatalogState, Snapshot), AdapterError> {
531        // For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
532        // but we still need to check for collisions.
533        let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
534
535        let mut builtin_table_updates = vec![];
536        let mut catalog_updates = vec![];
537        let mut audit_events = vec![];
538        let mut storage = self.storage().await;
539        let mut tx = if let Some(snapshot) = prev_snapshot {
540            // Restore transaction from saved snapshot so it starts in sync
541            // with the accumulated CatalogState from previous dry runs.
542            storage
543                .transaction_from_snapshot(snapshot)
544                .unwrap_or_terminate("starting catalog transaction from snapshot")
545        } else {
546            // First statement: fresh transaction from durable storage, which
547            // is in sync with the real catalog state.
548            storage
549                .transaction()
550                .await
551                .unwrap_or_terminate("starting catalog transaction")
552        };
553
554        // Process only the new ops against the accumulated state in dry-run mode.
555        let new_state = Self::transact_inner(
556            TransactInnerMode::DryRun,
557            None,
558            oracle_write_ts,
559            session,
560            ops,
561            temporary_ids,
562            &mut builtin_table_updates,
563            &mut catalog_updates,
564            &mut audit_events,
565            &mut tx,
566            base_state,
567        )
568        .await?;
569
570        // Save the transaction's current state as a snapshot for the next
571        // incremental dry run.
572        let new_snapshot = tx.current_snapshot();
573
574        // Transaction is NOT committed — drop it.
575        drop(storage);
576
577        // transact_inner returns Some(state) when ops produced changes.
578        let state = new_state.unwrap_or_else(|| base_state.clone());
579        Ok((state, new_snapshot))
580    }
581
582    /// Extracts optimized expressions from `Op::CreateItem` operations for views
583    /// and materialized views. These can be used to populate a `LocalExpressionCache`
584    /// to avoid re-optimization during `apply_updates`.
585    fn extract_expressions_from_ops(
586        ops: &[Op],
587        optimizer_features: &OptimizerFeatures,
588    ) -> BTreeMap<GlobalId, LocalExpressions> {
589        let mut exprs = BTreeMap::new();
590
591        for op in ops {
592            if let Op::CreateItem { item, .. } = op {
593                match item {
594                    CatalogItem::View(view) => {
595                        exprs.insert(
596                            view.global_id,
597                            LocalExpressions {
598                                local_mir: (*view.locally_optimized_expr).clone(),
599                                optimizer_features: optimizer_features.clone(),
600                            },
601                        );
602                    }
603                    CatalogItem::MaterializedView(mv) => {
604                        exprs.insert(
605                            mv.global_id_writes(),
606                            LocalExpressions {
607                                local_mir: (*mv.locally_optimized_expr).clone(),
608                                optimizer_features: optimizer_features.clone(),
609                            },
610                        );
611                    }
612                    CatalogItem::Table(_)
613                    | CatalogItem::Source(_)
614                    | CatalogItem::Log(_)
615                    | CatalogItem::Sink(_)
616                    | CatalogItem::Index(_)
617                    | CatalogItem::Type(_)
618                    | CatalogItem::Func(_)
619                    | CatalogItem::Secret(_)
620                    | CatalogItem::Connection(_) => {}
621                }
622            }
623        }
624
625        exprs
626    }
627
628    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
629    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
630    ///
631    /// `mode` controls whether storage prepare-state side effects are allowed.
632    /// In `DryRun` mode, this method may update the in-memory state returned to
633    /// the caller, but it must not trigger controller side effects.
634    ///
635    #[instrument(name = "catalog::transact_inner")]
636    async fn transact_inner(
637        mode: TransactInnerMode,
638        storage_collections: Option<&mut Arc<dyn StorageCollections + Send + Sync>>,
639        oracle_write_ts: mz_repr::Timestamp,
640        session: Option<&ConnMeta>,
641        ops: Vec<Op>,
642        temporary_ids: BTreeSet<CatalogItemId>,
643        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
644        parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
645        audit_events: &mut Vec<VersionedEvent>,
646        tx: &mut Transaction<'_>,
647        state: &CatalogState,
648    ) -> Result<Option<CatalogState>, AdapterError> {
649        // We come up with new catalog state, builtin state updates, and parsed
650        // catalog updates (for deriving catalog implications) in two phases:
651        //
652        // 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
653        //    one-by-one. This will give us the full list of updates to apply to
654        //    the catalog, which will allow us to apply it in one batch, which
655        //    in turn will allow the apply machinery to consolidate the updates.
656        // 2. We do one final apply call with all updates, which gives us the
657        //    final builtin table updates and parsed catalog updates.
658        //
659        // The reason is that the loop that is working off ops first does a
660        // transact_op to derive the state updates for that op, and then calls
661        // apply_updates on the catalog state. And successive ops might expect
662        // the catalog state to reflect the modified state _after_ applying
663        // previous ops.
664        //
665        // We want to, however, have one final apply_state that takes all the
666        // accumulated updates to derive the required controller updates and the
667        // builtin table updates.
668        //
669        // We won't win any DDL throughput benchmarks, but so far that's not
670        // what we're optimizing for and there would probably be other
671        // bottlenecks before we hit this one as a bottleneck.
672        //
673        // We could work around this by refactoring how the interplay of
674        // transact_op and apply_updates works, but that's a larger undertaking.
675        let mut preliminary_state = Cow::Borrowed(state);
676
677        // The final state that we will return, if modified.
678        let mut state = Cow::Borrowed(state);
679
680        if ops.is_empty() {
681            return Ok(None);
682        }
683
684        // Extract optimized expressions from CreateItem ops to avoid re-optimization
685        // during apply_updates. We extract before the loop since `ops` is moved there.
686        let optimizer_features = OptimizerFeatures::from(state.system_config());
687        let cached_exprs = Self::extract_expressions_from_ops(&ops, &optimizer_features);
688
689        let mut storage_collections_to_create = BTreeSet::new();
690        let mut storage_collections_to_drop = BTreeSet::new();
691        let mut storage_collections_to_register = BTreeMap::new();
692
693        let mut updates = Vec::new();
694
695        for op in ops {
696            let temporary_item_updates = Self::transact_op(
697                oracle_write_ts,
698                session,
699                op,
700                &temporary_ids,
701                audit_events,
702                tx,
703                &*preliminary_state,
704                &mut storage_collections_to_create,
705                &mut storage_collections_to_drop,
706                &mut storage_collections_to_register,
707            )
708            .await?;
709
710            // Temporary items are not stored in the durable catalog, so they need to be handled
711            // separately for updating state and builtin tables.
712            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
713            // in a multi-subscriber catalog world.
714            let upper = tx.upper();
715            let temporary_item_updates =
716                temporary_item_updates
717                    .into_iter()
718                    .map(|(item, diff)| StateUpdate {
719                        kind: StateUpdateKind::TemporaryItem(item),
720                        ts: upper,
721                        diff,
722                    });
723
724            let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
725            op_updates.extend(temporary_item_updates);
726            if !op_updates.is_empty() {
727                // Clone the cache so each apply_updates call has access to cached expressions.
728                // The cache uses `remove` semantics, so we need a fresh clone for each call.
729                let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
730                let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state
731                    .to_mut()
732                    .apply_updates(op_updates.clone(), &mut local_expr_cache)
733                    .await;
734            }
735            updates.append(&mut op_updates);
736        }
737
738        if !updates.is_empty() {
739            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
740            let (op_builtin_table_updates, op_catalog_updates) = state
741                .to_mut()
742                .apply_updates(updates.clone(), &mut local_expr_cache)
743                .await;
744            let op_builtin_table_updates = state
745                .to_mut()
746                .resolve_builtin_table_updates(op_builtin_table_updates);
747            builtin_table_updates.extend(op_builtin_table_updates);
748            parsed_catalog_updates.extend(op_catalog_updates);
749        }
750
751        match mode {
752            TransactInnerMode::Commit => {
753                // `storage_collections` can be `None` in tests.
754                if let Some(c) = storage_collections {
755                    c.prepare_state(
756                        tx,
757                        storage_collections_to_create,
758                        storage_collections_to_drop,
759                        storage_collections_to_register,
760                    )
761                    .await?;
762                }
763            }
764            TransactInnerMode::DryRun => {
765                debug_assert!(
766                    storage_collections.is_none(),
767                    "dry-run mode must not prepare storage state"
768                );
769            }
770        }
771
772        let updates = tx.get_and_commit_op_updates();
773        if !updates.is_empty() {
774            let mut local_expr_cache = LocalExpressionCache::new(cached_exprs.clone());
775            let (op_builtin_table_updates, op_catalog_updates) = state
776                .to_mut()
777                .apply_updates(updates.clone(), &mut local_expr_cache)
778                .await;
779            let op_builtin_table_updates = state
780                .to_mut()
781                .resolve_builtin_table_updates(op_builtin_table_updates);
782            builtin_table_updates.extend(op_builtin_table_updates);
783            parsed_catalog_updates.extend(op_catalog_updates);
784        }
785
786        match state {
787            Cow::Owned(state) => Ok(Some(state)),
788            Cow::Borrowed(_) => Ok(None),
789        }
790    }
791
792    /// Performs the transaction operation described by `op`. This function prepares the changes in
793    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
794    /// changes.
795    ///
796    /// Optionally returns a builtin table update for any builtin table updates than cannot be
797    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
798    /// scenarios and ideally in the future don't exist.
799    #[instrument]
800    async fn transact_op(
801        oracle_write_ts: mz_repr::Timestamp,
802        session: Option<&ConnMeta>,
803        op: Op,
804        temporary_ids: &BTreeSet<CatalogItemId>,
805        audit_events: &mut Vec<VersionedEvent>,
806        tx: &mut Transaction<'_>,
807        state: &CatalogState,
808        storage_collections_to_create: &mut BTreeSet<GlobalId>,
809        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
810        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
811    ) -> Result<Vec<(TemporaryItem, StateDiff)>, AdapterError> {
812        let mut temporary_item_updates = Vec::new();
813
814        match op {
815            Op::AlterRetainHistory { id, value, window } => {
816                let entry = state.get_entry(&id);
817                if id.is_system() {
818                    let name = entry.name();
819                    let full_name =
820                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
821                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
822                        full_name.to_string(),
823                    ))));
824                }
825
826                let mut new_entry = entry.clone();
827                let previous = new_entry
828                    .item
829                    .update_retain_history(value.clone(), window)
830                    .map_err(|_| {
831                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
832                            "planner should have rejected invalid alter retain history item type"
833                                .to_string(),
834                        )))
835                    })?;
836
837                if Self::should_audit_log_item(new_entry.item()) {
838                    let details =
839                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
840                            id: id.to_string(),
841                            old_history: previous.map(|previous| previous.to_string()),
842                            new_history: value.map(|v| v.to_string()),
843                        });
844                    CatalogState::add_to_audit_log(
845                        &state.system_configuration,
846                        oracle_write_ts,
847                        session,
848                        tx,
849                        audit_events,
850                        EventType::Alter,
851                        catalog_type_to_audit_object_type(new_entry.item().typ()),
852                        details,
853                    )?;
854                }
855
856                tx.update_item(id, new_entry.into())?;
857
858                Self::log_update(state, &id);
859            }
860            Op::AlterSourceTimestampInterval {
861                id,
862                value,
863                interval,
864            } => {
865                let entry = state.get_entry(&id);
866                if id.is_system() {
867                    let name = entry.name();
868                    let full_name =
869                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
870                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
871                        full_name.to_string(),
872                    ))));
873                }
874
875                let mut new_entry = entry.clone();
876                let previous = new_entry
877                    .item
878                    .update_timestamp_interval(value.clone(), interval)
879                    .map_err(|_| {
880                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
881                            "planner should have rejected invalid alter timestamp interval item type"
882                                .to_string(),
883                        )))
884                    })?;
885
886                if Self::should_audit_log_item(new_entry.item()) {
887                    let details = EventDetails::AlterSourceTimestampIntervalV1(
888                        mz_audit_log::AlterSourceTimestampIntervalV1 {
889                            id: id.to_string(),
890                            old_interval: previous.map(|previous| previous.to_string()),
891                            new_interval: value.map(|v| v.to_string()),
892                        },
893                    );
894                    CatalogState::add_to_audit_log(
895                        &state.system_configuration,
896                        oracle_write_ts,
897                        session,
898                        tx,
899                        audit_events,
900                        EventType::Alter,
901                        catalog_type_to_audit_object_type(new_entry.item().typ()),
902                        details,
903                    )?;
904                }
905
906                tx.update_item(id, new_entry.into())?;
907
908                Self::log_update(state, &id);
909            }
910            Op::AlterRole {
911                id,
912                name,
913                attributes,
914                nopassword,
915                vars,
916            } => {
917                state.ensure_not_reserved_role(&id)?;
918
919                let mut existing_role = state.get_role(&id).clone();
920                let password = attributes.password.clone();
921                let scram_iterations = attributes
922                    .scram_iterations
923                    .unwrap_or_else(|| state.system_config().scram_iterations());
924                existing_role.attributes = attributes.into();
925                existing_role.vars = vars;
926                let password_action = if nopassword {
927                    PasswordAction::Clear
928                } else if let Some(password) = password {
929                    PasswordAction::Set(PasswordConfig {
930                        password,
931                        scram_iterations,
932                    })
933                } else {
934                    PasswordAction::NoChange
935                };
936                tx.update_role(id, existing_role.into(), password_action)?;
937
938                CatalogState::add_to_audit_log(
939                    &state.system_configuration,
940                    oracle_write_ts,
941                    session,
942                    tx,
943                    audit_events,
944                    EventType::Alter,
945                    ObjectType::Role,
946                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
947                        id: id.to_string(),
948                        name: name.clone(),
949                    }),
950                )?;
951
952                info!("update role {name} ({id})");
953            }
954            Op::AlterNetworkPolicy {
955                id,
956                rules,
957                name,
958                owner_id: _owner_id,
959            } => {
960                let existing_policy = state.get_network_policy(&id).clone();
961                let mut policy: NetworkPolicy = existing_policy.into();
962                policy.rules = rules;
963                if is_reserved_name(&name) {
964                    return Err(AdapterError::Catalog(Error::new(
965                        ErrorKind::ReservedNetworkPolicyName(name),
966                    )));
967                }
968                tx.update_network_policy(id, policy.clone())?;
969
970                CatalogState::add_to_audit_log(
971                    &state.system_configuration,
972                    oracle_write_ts,
973                    session,
974                    tx,
975                    audit_events,
976                    EventType::Alter,
977                    ObjectType::NetworkPolicy,
978                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
979                        id: id.to_string(),
980                        name: name.clone(),
981                    }),
982                )?;
983
984                info!("update network policy {name} ({id})");
985            }
986            Op::AlterAddColumn {
987                id,
988                new_global_id,
989                name,
990                typ,
991                sql,
992            } => {
993                let column_name = name.to_string();
994                let column_type = typ.to_string();
995                let nullable = typ.nullable;
996                let mut new_entry = state.get_entry(&id).clone();
997                let version = new_entry.item.add_column(name, typ, sql)?;
998                // All versions of a table share the same shard, so it shouldn't matter what
999                // GlobalId we use here.
1000                let shard_id = state
1001                    .storage_metadata()
1002                    .get_collection_shard(new_entry.latest_global_id())?;
1003
1004                // TODO(alter_table): Support adding columns to sources.
1005                let CatalogItem::Table(table) = &mut new_entry.item else {
1006                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
1007                };
1008                table.collections.insert(version, new_global_id);
1009
1010                if Self::should_audit_log_item(new_entry.item()) {
1011                    let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
1012                        id: id.to_string(),
1013                        column: column_name,
1014                        column_type,
1015                        nullable,
1016                    });
1017                    CatalogState::add_to_audit_log(
1018                        &state.system_configuration,
1019                        oracle_write_ts,
1020                        session,
1021                        tx,
1022                        audit_events,
1023                        EventType::Alter,
1024                        catalog_type_to_audit_object_type(new_entry.item().typ()),
1025                        details,
1026                    )?;
1027                }
1028
1029                tx.update_item(id, new_entry.into())?;
1030                storage_collections_to_register.insert(new_global_id, shard_id);
1031            }
1032            Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
1033                let mut new_entry = state.get_entry(&id).clone();
1034                let replacement = state.get_entry(&replacement_id);
1035
1036                let new_audit_events =
1037                    apply_replacement_audit_events(state, &new_entry, replacement);
1038
1039                let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
1040                    return Err(AdapterError::internal(
1041                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1042                        "id must refer to a materialized view",
1043                    ));
1044                };
1045                let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
1046                    return Err(AdapterError::internal(
1047                        "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
1048                        "replacement_id must refer to a materialized view",
1049                    ));
1050                };
1051
1052                mv.apply_replacement(replacement_mv.clone());
1053
1054                tx.remove_item(replacement_id)?;
1055
1056                new_entry.id = replacement_id;
1057                tx_replace_item(tx, state, id, new_entry)?;
1058
1059                let comment_id = CommentObjectId::MaterializedView(replacement_id);
1060                tx.drop_comments(&[comment_id].into())?;
1061
1062                for (event_type, details) in new_audit_events {
1063                    CatalogState::add_to_audit_log(
1064                        &state.system_configuration,
1065                        oracle_write_ts,
1066                        session,
1067                        tx,
1068                        audit_events,
1069                        event_type,
1070                        ObjectType::MaterializedView,
1071                        details,
1072                    )?;
1073                }
1074            }
1075            Op::CreateDatabase { name, owner_id } => {
1076                let database_owner_privileges = vec![rbac::owner_privilege(
1077                    mz_sql::catalog::ObjectType::Database,
1078                    owner_id,
1079                )];
1080                let database_default_privileges = state
1081                    .default_privileges
1082                    .get_applicable_privileges(
1083                        owner_id,
1084                        None,
1085                        None,
1086                        mz_sql::catalog::ObjectType::Database,
1087                    )
1088                    .map(|item| item.mz_acl_item(owner_id));
1089                let database_privileges: Vec<_> = merge_mz_acl_items(
1090                    database_owner_privileges
1091                        .into_iter()
1092                        .chain(database_default_privileges),
1093                )
1094                .collect();
1095
1096                let schema_owner_privileges = vec![rbac::owner_privilege(
1097                    mz_sql::catalog::ObjectType::Schema,
1098                    owner_id,
1099                )];
1100                let schema_default_privileges = state
1101                    .default_privileges
1102                    .get_applicable_privileges(
1103                        owner_id,
1104                        None,
1105                        None,
1106                        mz_sql::catalog::ObjectType::Schema,
1107                    )
1108                    .map(|item| item.mz_acl_item(owner_id))
1109                    // Special default privilege on public schemas.
1110                    .chain(std::iter::once(MzAclItem {
1111                        grantee: RoleId::Public,
1112                        grantor: owner_id,
1113                        acl_mode: AclMode::USAGE,
1114                    }));
1115                let schema_privileges: Vec<_> = merge_mz_acl_items(
1116                    schema_owner_privileges
1117                        .into_iter()
1118                        .chain(schema_default_privileges),
1119                )
1120                .collect();
1121
1122                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1123                let (database_id, _) = tx.insert_user_database(
1124                    &name,
1125                    owner_id,
1126                    database_privileges.clone(),
1127                    &temporary_oids,
1128                )?;
1129                let (schema_id, _) = tx.insert_user_schema(
1130                    database_id,
1131                    DEFAULT_SCHEMA,
1132                    owner_id,
1133                    schema_privileges.clone(),
1134                    &temporary_oids,
1135                )?;
1136                CatalogState::add_to_audit_log(
1137                    &state.system_configuration,
1138                    oracle_write_ts,
1139                    session,
1140                    tx,
1141                    audit_events,
1142                    EventType::Create,
1143                    ObjectType::Database,
1144                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1145                        id: database_id.to_string(),
1146                        name: name.clone(),
1147                    }),
1148                )?;
1149                info!("create database {}", name);
1150
1151                CatalogState::add_to_audit_log(
1152                    &state.system_configuration,
1153                    oracle_write_ts,
1154                    session,
1155                    tx,
1156                    audit_events,
1157                    EventType::Create,
1158                    ObjectType::Schema,
1159                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1160                        id: schema_id.to_string(),
1161                        name: DEFAULT_SCHEMA.to_string(),
1162                        database_name: Some(name),
1163                    }),
1164                )?;
1165            }
1166            Op::CreateSchema {
1167                database_id,
1168                schema_name,
1169                owner_id,
1170            } => {
1171                if is_reserved_name(&schema_name) {
1172                    return Err(AdapterError::Catalog(Error::new(
1173                        ErrorKind::ReservedSchemaName(schema_name),
1174                    )));
1175                }
1176                let database_id = match database_id {
1177                    ResolvedDatabaseSpecifier::Id(id) => id,
1178                    ResolvedDatabaseSpecifier::Ambient => {
1179                        return Err(AdapterError::Catalog(Error::new(
1180                            ErrorKind::ReadOnlySystemSchema(schema_name),
1181                        )));
1182                    }
1183                };
1184                let owner_privileges = vec![rbac::owner_privilege(
1185                    mz_sql::catalog::ObjectType::Schema,
1186                    owner_id,
1187                )];
1188                let default_privileges = state
1189                    .default_privileges
1190                    .get_applicable_privileges(
1191                        owner_id,
1192                        Some(database_id),
1193                        None,
1194                        mz_sql::catalog::ObjectType::Schema,
1195                    )
1196                    .map(|item| item.mz_acl_item(owner_id));
1197                let privileges: Vec<_> =
1198                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1199                        .collect();
1200                let (schema_id, _) = tx.insert_user_schema(
1201                    database_id,
1202                    &schema_name,
1203                    owner_id,
1204                    privileges.clone(),
1205                    &state.get_temporary_oids().collect(),
1206                )?;
1207                CatalogState::add_to_audit_log(
1208                    &state.system_configuration,
1209                    oracle_write_ts,
1210                    session,
1211                    tx,
1212                    audit_events,
1213                    EventType::Create,
1214                    ObjectType::Schema,
1215                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1216                        id: schema_id.to_string(),
1217                        name: schema_name.clone(),
1218                        database_name: Some(state.database_by_id[&database_id].name.clone()),
1219                    }),
1220                )?;
1221            }
1222            Op::CreateRole { name, attributes } => {
1223                if is_reserved_role_name(&name) {
1224                    return Err(AdapterError::Catalog(Error::new(
1225                        ErrorKind::ReservedRoleName(name),
1226                    )));
1227                }
1228                let membership = RoleMembership::new();
1229                let vars = RoleVars::default();
1230                let (id, _) = tx.insert_user_role(
1231                    name.clone(),
1232                    attributes.clone(),
1233                    membership.clone(),
1234                    vars.clone(),
1235                    &state.get_temporary_oids().collect(),
1236                )?;
1237                CatalogState::add_to_audit_log(
1238                    &state.system_configuration,
1239                    oracle_write_ts,
1240                    session,
1241                    tx,
1242                    audit_events,
1243                    EventType::Create,
1244                    ObjectType::Role,
1245                    EventDetails::CreateRoleV1(mz_audit_log::CreateRoleV1 {
1246                        id: id.to_string(),
1247                        name: name.clone(),
1248                        auto_provision_source: attributes.auto_provision_source.map(|s| match s {
1249                            AutoProvisionSource::Oidc => "oidc".to_string(),
1250                            AutoProvisionSource::Frontegg => "frontegg".to_string(),
1251                            AutoProvisionSource::None => "none".to_string(),
1252                        }),
1253                    }),
1254                )?;
1255                info!("create role {}", name);
1256            }
1257            Op::CreateCluster {
1258                id,
1259                name,
1260                introspection_sources,
1261                owner_id,
1262                config,
1263            } => {
1264                if is_reserved_name(&name) {
1265                    return Err(AdapterError::Catalog(Error::new(
1266                        ErrorKind::ReservedClusterName(name),
1267                    )));
1268                }
1269                let owner_privileges = vec![rbac::owner_privilege(
1270                    mz_sql::catalog::ObjectType::Cluster,
1271                    owner_id,
1272                )];
1273                let default_privileges = state
1274                    .default_privileges
1275                    .get_applicable_privileges(
1276                        owner_id,
1277                        None,
1278                        None,
1279                        mz_sql::catalog::ObjectType::Cluster,
1280                    )
1281                    .map(|item| item.mz_acl_item(owner_id));
1282                let privileges: Vec<_> =
1283                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1284                        .collect();
1285                let introspection_source_ids: Vec<_> = introspection_sources
1286                    .iter()
1287                    .map(|introspection_source| {
1288                        Transaction::allocate_introspection_source_index_id(
1289                            &id,
1290                            introspection_source.variant,
1291                        )
1292                    })
1293                    .collect();
1294
1295                let introspection_sources = introspection_sources
1296                    .into_iter()
1297                    .zip_eq(introspection_source_ids)
1298                    .map(|(log, (item_id, gid))| (log, item_id, gid))
1299                    .collect();
1300
1301                tx.insert_user_cluster(
1302                    id,
1303                    &name,
1304                    introspection_sources,
1305                    owner_id,
1306                    privileges.clone(),
1307                    config.clone().into(),
1308                    &state.get_temporary_oids().collect(),
1309                )?;
1310                CatalogState::add_to_audit_log(
1311                    &state.system_configuration,
1312                    oracle_write_ts,
1313                    session,
1314                    tx,
1315                    audit_events,
1316                    EventType::Create,
1317                    ObjectType::Cluster,
1318                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1319                        id: id.to_string(),
1320                        name: name.clone(),
1321                    }),
1322                )?;
1323                info!("create cluster {}", name);
1324            }
1325            Op::CreateClusterReplica {
1326                cluster_id,
1327                name,
1328                config,
1329                owner_id,
1330                reason,
1331            } => {
1332                if is_reserved_name(&name) {
1333                    return Err(AdapterError::Catalog(Error::new(
1334                        ErrorKind::ReservedReplicaName(name),
1335                    )));
1336                }
1337                let cluster = state.get_cluster(cluster_id);
1338                let id =
1339                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1340                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1341                    size,
1342                    billed_as,
1343                    internal,
1344                    ..
1345                }) = &config.location
1346                {
1347                    let (reason, scheduling_policies) = reason.into_audit_log();
1348                    let details = EventDetails::CreateClusterReplicaV4(
1349                        mz_audit_log::CreateClusterReplicaV4 {
1350                            cluster_id: cluster_id.to_string(),
1351                            cluster_name: cluster.name.clone(),
1352                            replica_id: Some(id.to_string()),
1353                            replica_name: name.clone(),
1354                            logical_size: size.clone(),
1355                            billed_as: billed_as.clone(),
1356                            internal: *internal,
1357                            reason,
1358                            scheduling_policies,
1359                        },
1360                    );
1361                    CatalogState::add_to_audit_log(
1362                        &state.system_configuration,
1363                        oracle_write_ts,
1364                        session,
1365                        tx,
1366                        audit_events,
1367                        EventType::Create,
1368                        ObjectType::ClusterReplica,
1369                        details,
1370                    )?;
1371                }
1372            }
1373            Op::CreateItem {
1374                id,
1375                name,
1376                item,
1377                owner_id,
1378            } => {
1379                state.check_unstable_dependencies(&item)?;
1380
1381                match &item {
1382                    CatalogItem::Table(table) => {
1383                        let gids: Vec<_> = table.global_ids().collect();
1384                        assert_eq!(gids.len(), 1);
1385                        storage_collections_to_create.extend(gids);
1386                    }
1387                    CatalogItem::Source(source) => {
1388                        storage_collections_to_create.insert(source.global_id());
1389                    }
1390                    CatalogItem::MaterializedView(mv) => {
1391                        let mv_gid = mv.global_id_writes();
1392                        if let Some(target_id) = mv.replacement_target {
1393                            let target_gid = state.get_entry(&target_id).latest_global_id();
1394                            let shard_id =
1395                                state.storage_metadata().get_collection_shard(target_gid)?;
1396                            storage_collections_to_register.insert(mv_gid, shard_id);
1397                        } else {
1398                            storage_collections_to_create.insert(mv_gid);
1399                        }
1400                    }
1401                    CatalogItem::Sink(sink) => {
1402                        storage_collections_to_create.insert(sink.global_id());
1403                    }
1404                    CatalogItem::Log(_)
1405                    | CatalogItem::View(_)
1406                    | CatalogItem::Index(_)
1407                    | CatalogItem::Type(_)
1408                    | CatalogItem::Func(_)
1409                    | CatalogItem::Secret(_)
1410                    | CatalogItem::Connection(_) => (),
1411                }
1412
1413                let system_user = session.map_or(false, |s| s.user().is_system_user());
1414                if !system_user {
1415                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1416                        let cluster_name = state.clusters_by_id[&id].name.clone();
1417                        return Err(AdapterError::Catalog(Error::new(
1418                            ErrorKind::ReadOnlyCluster(cluster_name),
1419                        )));
1420                    }
1421                }
1422
1423                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1424                let default_privileges = state
1425                    .default_privileges
1426                    .get_applicable_privileges(
1427                        owner_id,
1428                        name.qualifiers.database_spec.id(),
1429                        Some(name.qualifiers.schema_spec.into()),
1430                        item.typ().into(),
1431                    )
1432                    .map(|item| item.mz_acl_item(owner_id));
1433                // mz_support can read all progress sources.
1434                let progress_source_privilege = if item.is_progress_source() {
1435                    Some(MzAclItem {
1436                        grantee: MZ_SUPPORT_ROLE_ID,
1437                        grantor: owner_id,
1438                        acl_mode: AclMode::SELECT,
1439                    })
1440                } else {
1441                    None
1442                };
1443                let privileges: Vec<_> = merge_mz_acl_items(
1444                    owner_privileges
1445                        .into_iter()
1446                        .chain(default_privileges)
1447                        .chain(progress_source_privilege),
1448                )
1449                .collect();
1450
1451                let temporary_oids = state.get_temporary_oids().collect();
1452
1453                if item.is_temporary() {
1454                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1455                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1456                    {
1457                        return Err(AdapterError::Catalog(Error::new(
1458                            ErrorKind::InvalidTemporarySchema,
1459                        )));
1460                    }
1461                    let oid = tx.allocate_oid(&temporary_oids)?;
1462
1463                    let schema_id = name.qualifiers.schema_spec.clone().into();
1464                    let item_type = item.typ();
1465                    let (create_sql, global_id, versions) = item.to_serialized();
1466
1467                    let item = TemporaryItem {
1468                        id,
1469                        oid,
1470                        global_id,
1471                        schema_id,
1472                        name: name.item.clone(),
1473                        create_sql,
1474                        conn_id: item.conn_id().cloned(),
1475                        owner_id,
1476                        privileges: privileges.clone(),
1477                        extra_versions: versions,
1478                    };
1479                    temporary_item_updates.push((item, StateDiff::Addition));
1480
1481                    info!(
1482                        "create temporary {} {} ({})",
1483                        item_type,
1484                        state.resolve_full_name(&name, None),
1485                        id
1486                    );
1487                } else {
1488                    if let Some(temp_id) =
1489                        item.uses()
1490                            .iter()
1491                            .find(|id| match state.try_get_entry(*id) {
1492                                Some(entry) => entry.item().is_temporary(),
1493                                None => temporary_ids.contains(id),
1494                            })
1495                    {
1496                        let temp_item = state.get_entry(temp_id);
1497                        return Err(AdapterError::Catalog(Error::new(
1498                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1499                        )));
1500                    }
1501                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1502                        && !system_user
1503                    {
1504                        let schema_name = state
1505                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1506                            .schema;
1507                        return Err(AdapterError::Catalog(Error::new(
1508                            ErrorKind::ReadOnlySystemSchema(schema_name),
1509                        )));
1510                    }
1511                    let schema_id = name.qualifiers.schema_spec.clone().into();
1512                    let item_type = item.typ();
1513                    let (create_sql, global_id, versions) = item.to_serialized();
1514                    tx.insert_user_item(
1515                        id,
1516                        global_id,
1517                        schema_id,
1518                        &name.item,
1519                        create_sql,
1520                        owner_id,
1521                        privileges.clone(),
1522                        &temporary_oids,
1523                        versions,
1524                    )?;
1525                    info!(
1526                        "create {} {} ({})",
1527                        item_type,
1528                        state.resolve_full_name(&name, None),
1529                        id
1530                    );
1531                }
1532
1533                if Self::should_audit_log_item(&item) {
1534                    let name = Self::full_name_detail(
1535                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1536                    );
1537                    let details = match &item {
1538                        CatalogItem::Source(s) => {
1539                            let cluster_id = match s.data_source {
1540                                // Ingestion exports don't have their own cluster, but
1541                                // run on their ingestion's cluster.
1542                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1543                                    match state.get_entry(&ingestion_id).cluster_id() {
1544                                        Some(cluster_id) => Some(cluster_id.to_string()),
1545                                        None => None,
1546                                    }
1547                                }
1548                                _ => match item.cluster_id() {
1549                                    Some(cluster_id) => Some(cluster_id.to_string()),
1550                                    None => None,
1551                                },
1552                            };
1553
1554                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1555                                id: id.to_string(),
1556                                cluster_id,
1557                                name,
1558                                external_type: s.source_type().to_string(),
1559                            })
1560                        }
1561                        CatalogItem::Sink(s) => {
1562                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1563                                id: id.to_string(),
1564                                cluster_id: Some(s.cluster_id.to_string()),
1565                                name,
1566                                external_type: s.sink_type().to_string(),
1567                            })
1568                        }
1569                        CatalogItem::Index(i) => {
1570                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1571                                id: id.to_string(),
1572                                name,
1573                                cluster_id: i.cluster_id.to_string(),
1574                            })
1575                        }
1576                        CatalogItem::MaterializedView(mv) => {
1577                            EventDetails::CreateMaterializedViewV1(
1578                                mz_audit_log::CreateMaterializedViewV1 {
1579                                    id: id.to_string(),
1580                                    name,
1581                                    cluster_id: mv.cluster_id.to_string(),
1582                                    replacement_target_id: mv
1583                                        .replacement_target
1584                                        .map(|id| id.to_string()),
1585                                },
1586                            )
1587                        }
1588                        CatalogItem::Table(_)
1589                        | CatalogItem::Log(_)
1590                        | CatalogItem::View(_)
1591                        | CatalogItem::Type(_)
1592                        | CatalogItem::Func(_)
1593                        | CatalogItem::Secret(_)
1594                        | CatalogItem::Connection(_) => EventDetails::IdFullNameV1(IdFullNameV1 {
1595                            id: id.to_string(),
1596                            name,
1597                        }),
1598                    };
1599                    CatalogState::add_to_audit_log(
1600                        &state.system_configuration,
1601                        oracle_write_ts,
1602                        session,
1603                        tx,
1604                        audit_events,
1605                        EventType::Create,
1606                        catalog_type_to_audit_object_type(item.typ()),
1607                        details,
1608                    )?;
1609                }
1610            }
1611            Op::CreateNetworkPolicy {
1612                rules,
1613                name,
1614                owner_id,
1615            } => {
1616                if state.network_policies_by_name.contains_key(&name) {
1617                    return Err(AdapterError::PlanError(PlanError::Catalog(
1618                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1619                    )));
1620                }
1621                if is_reserved_name(&name) {
1622                    return Err(AdapterError::Catalog(Error::new(
1623                        ErrorKind::ReservedNetworkPolicyName(name),
1624                    )));
1625                }
1626
1627                let owner_privileges = vec![rbac::owner_privilege(
1628                    mz_sql::catalog::ObjectType::NetworkPolicy,
1629                    owner_id,
1630                )];
1631                let default_privileges = state
1632                    .default_privileges
1633                    .get_applicable_privileges(
1634                        owner_id,
1635                        None,
1636                        None,
1637                        mz_sql::catalog::ObjectType::NetworkPolicy,
1638                    )
1639                    .map(|item| item.mz_acl_item(owner_id));
1640                let privileges: Vec<_> =
1641                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1642                        .collect();
1643
1644                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1645                let id = tx.insert_user_network_policy(
1646                    name.clone(),
1647                    rules,
1648                    privileges,
1649                    owner_id,
1650                    &temporary_oids,
1651                )?;
1652
1653                CatalogState::add_to_audit_log(
1654                    &state.system_configuration,
1655                    oracle_write_ts,
1656                    session,
1657                    tx,
1658                    audit_events,
1659                    EventType::Create,
1660                    ObjectType::NetworkPolicy,
1661                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1662                        id: id.to_string(),
1663                        name: name.clone(),
1664                    }),
1665                )?;
1666
1667                info!("created network policy {name} ({id})");
1668            }
1669            Op::Comment {
1670                object_id,
1671                sub_component,
1672                comment,
1673            } => {
1674                tx.update_comment(object_id, sub_component, comment)?;
1675                let entry = state.get_comment_id_entry(&object_id);
1676                let should_log = entry
1677                    .map(|entry| Self::should_audit_log_item(entry.item()))
1678                    // Things that aren't catalog entries can't be temp, so should be logged.
1679                    .unwrap_or(true);
1680                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1681                // comments won't be logged for now.
1682                if let (Some(conn_id), true) =
1683                    (session.map(|session| session.conn_id()), should_log)
1684                {
1685                    CatalogState::add_to_audit_log(
1686                        &state.system_configuration,
1687                        oracle_write_ts,
1688                        session,
1689                        tx,
1690                        audit_events,
1691                        EventType::Comment,
1692                        comment_id_to_audit_object_type(object_id),
1693                        EventDetails::IdNameV1(IdNameV1 {
1694                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1695                            id: format!("{object_id:?}"),
1696                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1697                        }),
1698                    )?;
1699                }
1700            }
1701            Op::UpdateSourceReferences {
1702                source_id,
1703                references,
1704            } => {
1705                tx.update_source_references(
1706                    source_id,
1707                    references
1708                        .references
1709                        .into_iter()
1710                        .map(|reference| reference.into())
1711                        .collect(),
1712                    references.updated_at,
1713                )?;
1714            }
1715            Op::DropObjects(drop_object_infos) => {
1716                // Generate all of the objects that need to get dropped.
1717                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1718
1719                // Drop any associated comments.
1720                tx.drop_comments(&delta.comments)?;
1721
1722                // Drop any items.
1723                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1724                    delta
1725                        .items
1726                        .iter()
1727                        .map(|id| id)
1728                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1729                tx.remove_items(&durable_items_to_drop)?;
1730                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1731                    let entry = state.get_entry(&id);
1732                    (entry.clone().into(), StateDiff::Retraction)
1733                }));
1734
1735                for item_id in delta.items {
1736                    let entry = state.get_entry(&item_id);
1737
1738                    if entry.item().is_storage_collection() {
1739                        storage_collections_to_drop.extend(entry.global_ids());
1740                    }
1741
1742                    if state.source_references.contains_key(&item_id) {
1743                        tx.remove_source_references(item_id)?;
1744                    }
1745
1746                    if Self::should_audit_log_item(entry.item()) {
1747                        CatalogState::add_to_audit_log(
1748                            &state.system_configuration,
1749                            oracle_write_ts,
1750                            session,
1751                            tx,
1752                            audit_events,
1753                            EventType::Drop,
1754                            catalog_type_to_audit_object_type(entry.item().typ()),
1755                            EventDetails::IdFullNameV1(IdFullNameV1 {
1756                                id: item_id.to_string(),
1757                                name: Self::full_name_detail(&state.resolve_full_name(
1758                                    entry.name(),
1759                                    session.map(|session| session.conn_id()),
1760                                )),
1761                            }),
1762                        )?;
1763                    }
1764                    info!(
1765                        "drop {} {} ({})",
1766                        entry.item_type(),
1767                        state.resolve_full_name(entry.name(), entry.conn_id()),
1768                        item_id
1769                    );
1770                }
1771
1772                // Drop any schemas.
1773                let schemas = delta
1774                    .schemas
1775                    .iter()
1776                    .map(|(schema_spec, database_spec)| {
1777                        (SchemaId::from(schema_spec), *database_spec)
1778                    })
1779                    .collect();
1780                tx.remove_schemas(&schemas)?;
1781
1782                for (schema_spec, database_spec) in delta.schemas {
1783                    let schema = state.get_schema(
1784                        &database_spec,
1785                        &schema_spec,
1786                        session
1787                            .map(|session| session.conn_id())
1788                            .unwrap_or(&SYSTEM_CONN_ID),
1789                    );
1790
1791                    let schema_id = SchemaId::from(schema_spec);
1792                    let database_id = match database_spec {
1793                        ResolvedDatabaseSpecifier::Ambient => None,
1794                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1795                    };
1796
1797                    CatalogState::add_to_audit_log(
1798                        &state.system_configuration,
1799                        oracle_write_ts,
1800                        session,
1801                        tx,
1802                        audit_events,
1803                        EventType::Drop,
1804                        ObjectType::Schema,
1805                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1806                            id: schema_id.to_string(),
1807                            name: schema.name.schema.to_string(),
1808                            database_name: database_id
1809                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1810                        }),
1811                    )?;
1812                }
1813
1814                // Drop any databases.
1815                tx.remove_databases(&delta.databases)?;
1816
1817                for database_id in delta.databases {
1818                    let database = state.get_database(&database_id).clone();
1819
1820                    CatalogState::add_to_audit_log(
1821                        &state.system_configuration,
1822                        oracle_write_ts,
1823                        session,
1824                        tx,
1825                        audit_events,
1826                        EventType::Drop,
1827                        ObjectType::Database,
1828                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1829                            id: database_id.to_string(),
1830                            name: database.name.clone(),
1831                        }),
1832                    )?;
1833                }
1834
1835                // Drop any roles.
1836                tx.remove_user_roles(&delta.roles)?;
1837
1838                for role_id in delta.roles {
1839                    let role = state
1840                        .roles_by_id
1841                        .get(&role_id)
1842                        .expect("catalog out of sync");
1843
1844                    CatalogState::add_to_audit_log(
1845                        &state.system_configuration,
1846                        oracle_write_ts,
1847                        session,
1848                        tx,
1849                        audit_events,
1850                        EventType::Drop,
1851                        ObjectType::Role,
1852                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1853                            id: role.id.to_string(),
1854                            name: role.name.clone(),
1855                        }),
1856                    )?;
1857                    info!("drop role {}", role.name());
1858                }
1859
1860                // Drop any network policies.
1861                tx.remove_network_policies(&delta.network_policies)?;
1862
1863                for network_policy_id in delta.network_policies {
1864                    let policy = state
1865                        .network_policies_by_id
1866                        .get(&network_policy_id)
1867                        .expect("catalog out of sync");
1868
1869                    CatalogState::add_to_audit_log(
1870                        &state.system_configuration,
1871                        oracle_write_ts,
1872                        session,
1873                        tx,
1874                        audit_events,
1875                        EventType::Drop,
1876                        ObjectType::NetworkPolicy,
1877                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1878                            id: policy.id.to_string(),
1879                            name: policy.name.clone(),
1880                        }),
1881                    )?;
1882                    info!("drop network policy {}", policy.name.clone());
1883                }
1884
1885                // Drop any replicas.
1886                let replicas = delta.replicas.keys().copied().collect();
1887                tx.remove_cluster_replicas(&replicas)?;
1888
1889                for (replica_id, (cluster_id, reason)) in delta.replicas {
1890                    let cluster = state.get_cluster(cluster_id);
1891                    let replica = cluster.replica(replica_id).expect("Must exist");
1892
1893                    let (reason, scheduling_policies) = reason.into_audit_log();
1894                    let details =
1895                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1896                            cluster_id: cluster_id.to_string(),
1897                            cluster_name: cluster.name.clone(),
1898                            replica_id: Some(replica_id.to_string()),
1899                            replica_name: replica.name.clone(),
1900                            reason,
1901                            scheduling_policies,
1902                        });
1903                    CatalogState::add_to_audit_log(
1904                        &state.system_configuration,
1905                        oracle_write_ts,
1906                        session,
1907                        tx,
1908                        audit_events,
1909                        EventType::Drop,
1910                        ObjectType::ClusterReplica,
1911                        details,
1912                    )?;
1913                }
1914
1915                // Drop any clusters.
1916                tx.remove_clusters(&delta.clusters)?;
1917
1918                for cluster_id in delta.clusters {
1919                    let cluster = state.get_cluster(cluster_id);
1920
1921                    CatalogState::add_to_audit_log(
1922                        &state.system_configuration,
1923                        oracle_write_ts,
1924                        session,
1925                        tx,
1926                        audit_events,
1927                        EventType::Drop,
1928                        ObjectType::Cluster,
1929                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1930                            id: cluster.id.to_string(),
1931                            name: cluster.name.clone(),
1932                        }),
1933                    )?;
1934                }
1935            }
1936            Op::GrantRole {
1937                role_id,
1938                member_id,
1939                grantor_id,
1940            } => {
1941                state.ensure_not_reserved_role(&member_id)?;
1942                state.ensure_grantable_role(&role_id)?;
1943                if state.collect_role_membership(&role_id).contains(&member_id) {
1944                    let group_role = state.get_role(&role_id);
1945                    let member_role = state.get_role(&member_id);
1946                    return Err(AdapterError::Catalog(Error::new(
1947                        ErrorKind::CircularRoleMembership {
1948                            role_name: group_role.name().to_string(),
1949                            member_name: member_role.name().to_string(),
1950                        },
1951                    )));
1952                }
1953                let mut member_role = state.get_role(&member_id).clone();
1954                member_role.membership.map.insert(role_id, grantor_id);
1955                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1956
1957                CatalogState::add_to_audit_log(
1958                    &state.system_configuration,
1959                    oracle_write_ts,
1960                    session,
1961                    tx,
1962                    audit_events,
1963                    EventType::Grant,
1964                    ObjectType::Role,
1965                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1966                        role_id: role_id.to_string(),
1967                        member_id: member_id.to_string(),
1968                        grantor_id: grantor_id.to_string(),
1969                        executed_by: session
1970                            .map(|session| session.authenticated_role_id())
1971                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1972                            .to_string(),
1973                    }),
1974                )?;
1975            }
1976            Op::RevokeRole {
1977                role_id,
1978                member_id,
1979                grantor_id,
1980            } => {
1981                state.ensure_not_reserved_role(&member_id)?;
1982                state.ensure_grantable_role(&role_id)?;
1983                let mut member_role = state.get_role(&member_id).clone();
1984                member_role.membership.map.remove(&role_id);
1985                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1986
1987                CatalogState::add_to_audit_log(
1988                    &state.system_configuration,
1989                    oracle_write_ts,
1990                    session,
1991                    tx,
1992                    audit_events,
1993                    EventType::Revoke,
1994                    ObjectType::Role,
1995                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1996                        role_id: role_id.to_string(),
1997                        member_id: member_id.to_string(),
1998                        grantor_id: grantor_id.to_string(),
1999                        executed_by: session
2000                            .map(|session| session.authenticated_role_id())
2001                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
2002                            .to_string(),
2003                    }),
2004                )?;
2005            }
2006            Op::UpdatePrivilege {
2007                target_id,
2008                privilege,
2009                variant,
2010            } => {
2011                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
2012                    UpdatePrivilegeVariant::Grant => {
2013                        privileges.grant(privilege);
2014                    }
2015                    UpdatePrivilegeVariant::Revoke => {
2016                        privileges.revoke(&privilege);
2017                    }
2018                };
2019                match &target_id {
2020                    SystemObjectId::Object(object_id) => match object_id {
2021                        ObjectId::Cluster(id) => {
2022                            let mut cluster = state.get_cluster(*id).clone();
2023                            update_privilege_fn(&mut cluster.privileges);
2024                            tx.update_cluster(*id, cluster.into())?;
2025                        }
2026                        ObjectId::Database(id) => {
2027                            let mut database = state.get_database(id).clone();
2028                            update_privilege_fn(&mut database.privileges);
2029                            tx.update_database(*id, database.into())?;
2030                        }
2031                        ObjectId::NetworkPolicy(id) => {
2032                            let mut policy = state.get_network_policy(id).clone();
2033                            update_privilege_fn(&mut policy.privileges);
2034                            tx.update_network_policy(*id, policy.into())?;
2035                        }
2036                        ObjectId::Schema((database_spec, schema_spec)) => {
2037                            let schema_id = schema_spec.clone().into();
2038                            let mut schema = state
2039                                .get_schema(
2040                                    database_spec,
2041                                    schema_spec,
2042                                    session
2043                                        .map(|session| session.conn_id())
2044                                        .unwrap_or(&SYSTEM_CONN_ID),
2045                                )
2046                                .clone();
2047                            update_privilege_fn(&mut schema.privileges);
2048                            tx.update_schema(schema_id, schema.into())?;
2049                        }
2050                        ObjectId::Item(id) => {
2051                            let entry = state.get_entry(id);
2052                            let mut new_entry = entry.clone();
2053                            update_privilege_fn(&mut new_entry.privileges);
2054                            if !new_entry.item().is_temporary() {
2055                                tx.update_item(*id, new_entry.into())?;
2056                            } else {
2057                                temporary_item_updates
2058                                    .push((entry.clone().into(), StateDiff::Retraction));
2059                                temporary_item_updates
2060                                    .push((new_entry.into(), StateDiff::Addition));
2061                            }
2062                        }
2063                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
2064                    },
2065                    SystemObjectId::System => {
2066                        let mut system_privileges = PrivilegeMap::clone(&state.system_privileges);
2067                        update_privilege_fn(&mut system_privileges);
2068                        let new_privilege =
2069                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
2070                        tx.set_system_privilege(
2071                            privilege.grantee,
2072                            privilege.grantor,
2073                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
2074                        )?;
2075                    }
2076                }
2077                let object_type = state.get_system_object_type(&target_id);
2078                let object_id_str = match &target_id {
2079                    SystemObjectId::System => "SYSTEM".to_string(),
2080                    SystemObjectId::Object(id) => id.to_string(),
2081                };
2082                CatalogState::add_to_audit_log(
2083                    &state.system_configuration,
2084                    oracle_write_ts,
2085                    session,
2086                    tx,
2087                    audit_events,
2088                    variant.into(),
2089                    system_object_type_to_audit_object_type(&object_type),
2090                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
2091                        object_id: object_id_str,
2092                        grantee_id: privilege.grantee.to_string(),
2093                        grantor_id: privilege.grantor.to_string(),
2094                        privileges: privilege.acl_mode.to_string(),
2095                    }),
2096                )?;
2097            }
2098            Op::UpdateDefaultPrivilege {
2099                privilege_object,
2100                privilege_acl_item,
2101                variant,
2102            } => {
2103                let mut default_privileges = DefaultPrivileges::clone(&state.default_privileges);
2104                match variant {
2105                    UpdatePrivilegeVariant::Grant => default_privileges
2106                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
2107                    UpdatePrivilegeVariant::Revoke => {
2108                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
2109                    }
2110                }
2111                let new_acl_mode = default_privileges
2112                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
2113                tx.set_default_privilege(
2114                    privilege_object.role_id,
2115                    privilege_object.database_id,
2116                    privilege_object.schema_id,
2117                    privilege_object.object_type,
2118                    privilege_acl_item.grantee,
2119                    new_acl_mode.cloned(),
2120                )?;
2121                CatalogState::add_to_audit_log(
2122                    &state.system_configuration,
2123                    oracle_write_ts,
2124                    session,
2125                    tx,
2126                    audit_events,
2127                    variant.into(),
2128                    object_type_to_audit_object_type(privilege_object.object_type),
2129                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
2130                        role_id: privilege_object.role_id.to_string(),
2131                        database_id: privilege_object.database_id.map(|id| id.to_string()),
2132                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
2133                        grantee_id: privilege_acl_item.grantee.to_string(),
2134                        privileges: privilege_acl_item.acl_mode.to_string(),
2135                    }),
2136                )?;
2137            }
2138            Op::RenameCluster {
2139                id,
2140                name,
2141                to_name,
2142                check_reserved_names,
2143            } => {
2144                if id.is_system() {
2145                    return Err(AdapterError::Catalog(Error::new(
2146                        ErrorKind::ReadOnlyCluster(name.clone()),
2147                    )));
2148                }
2149                if check_reserved_names && is_reserved_name(&to_name) {
2150                    return Err(AdapterError::Catalog(Error::new(
2151                        ErrorKind::ReservedClusterName(to_name),
2152                    )));
2153                }
2154                tx.rename_cluster(id, &name, &to_name)?;
2155                CatalogState::add_to_audit_log(
2156                    &state.system_configuration,
2157                    oracle_write_ts,
2158                    session,
2159                    tx,
2160                    audit_events,
2161                    EventType::Alter,
2162                    ObjectType::Cluster,
2163                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
2164                        id: id.to_string(),
2165                        old_name: name.clone(),
2166                        new_name: to_name.clone(),
2167                    }),
2168                )?;
2169                info!("rename cluster {name} to {to_name}");
2170            }
2171            Op::RenameClusterReplica {
2172                cluster_id,
2173                replica_id,
2174                name,
2175                to_name,
2176            } => {
2177                if is_reserved_name(&to_name) {
2178                    return Err(AdapterError::Catalog(Error::new(
2179                        ErrorKind::ReservedReplicaName(to_name),
2180                    )));
2181                }
2182                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
2183                CatalogState::add_to_audit_log(
2184                    &state.system_configuration,
2185                    oracle_write_ts,
2186                    session,
2187                    tx,
2188                    audit_events,
2189                    EventType::Alter,
2190                    ObjectType::ClusterReplica,
2191                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
2192                        cluster_id: cluster_id.to_string(),
2193                        replica_id: replica_id.to_string(),
2194                        old_name: name.replica.as_str().to_string(),
2195                        new_name: to_name.clone(),
2196                    }),
2197                )?;
2198                info!("rename cluster replica {name} to {to_name}");
2199            }
2200            Op::RenameItem {
2201                id,
2202                to_name,
2203                current_full_name,
2204            } => {
2205                let mut updates = Vec::new();
2206
2207                let entry = state.get_entry(&id);
2208                if let CatalogItem::Type(_) = entry.item() {
2209                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
2210                        current_full_name.to_string(),
2211                    ))));
2212                }
2213
2214                if entry.id().is_system() {
2215                    let name = state
2216                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
2217                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2218                        name.to_string(),
2219                    ))));
2220                }
2221
2222                let mut to_full_name = current_full_name.clone();
2223                to_full_name.item.clone_from(&to_name);
2224
2225                let mut to_qualified_name = entry.name().clone();
2226                to_qualified_name.item.clone_from(&to_name);
2227
2228                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
2229                    id: id.to_string(),
2230                    old_name: Self::full_name_detail(&current_full_name),
2231                    new_name: Self::full_name_detail(&to_full_name),
2232                });
2233                if Self::should_audit_log_item(entry.item()) {
2234                    CatalogState::add_to_audit_log(
2235                        &state.system_configuration,
2236                        oracle_write_ts,
2237                        session,
2238                        tx,
2239                        audit_events,
2240                        EventType::Alter,
2241                        catalog_type_to_audit_object_type(entry.item().typ()),
2242                        details,
2243                    )?;
2244                }
2245
2246                // Rename item itself.
2247                let mut new_entry = entry.clone();
2248                new_entry.name.item.clone_from(&to_name);
2249                new_entry.item = entry
2250                    .item()
2251                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
2252                    .map_err(|e| {
2253                        Error::new(ErrorKind::from(AmbiguousRename {
2254                            depender: state
2255                                .resolve_full_name(entry.name(), entry.conn_id())
2256                                .to_string(),
2257                            dependee: state
2258                                .resolve_full_name(entry.name(), entry.conn_id())
2259                                .to_string(),
2260                            message: e,
2261                        }))
2262                    })?;
2263
2264                for id in entry.referenced_by() {
2265                    let dependent_item = state.get_entry(id);
2266                    let mut to_entry = dependent_item.clone();
2267                    to_entry.item = dependent_item
2268                        .item()
2269                        .rename_item_refs(
2270                            current_full_name.clone(),
2271                            to_full_name.item.clone(),
2272                            false,
2273                        )
2274                        .map_err(|e| {
2275                            Error::new(ErrorKind::from(AmbiguousRename {
2276                                depender: state
2277                                    .resolve_full_name(
2278                                        dependent_item.name(),
2279                                        dependent_item.conn_id(),
2280                                    )
2281                                    .to_string(),
2282                                dependee: state
2283                                    .resolve_full_name(entry.name(), entry.conn_id())
2284                                    .to_string(),
2285                                message: e,
2286                            }))
2287                        })?;
2288
2289                    if !to_entry.item().is_temporary() {
2290                        tx.update_item(*id, to_entry.into())?;
2291                    } else {
2292                        temporary_item_updates
2293                            .push((dependent_item.clone().into(), StateDiff::Retraction));
2294                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
2295                    }
2296                    updates.push(*id);
2297                }
2298                if !new_entry.item().is_temporary() {
2299                    tx.update_item(id, new_entry.into())?;
2300                } else {
2301                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2302                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2303                }
2304
2305                updates.push(id);
2306                for id in updates {
2307                    Self::log_update(state, &id);
2308                }
2309            }
2310            Op::RenameSchema {
2311                database_spec,
2312                schema_spec,
2313                new_name,
2314                check_reserved_names,
2315            } => {
2316                if check_reserved_names && is_reserved_name(&new_name) {
2317                    return Err(AdapterError::Catalog(Error::new(
2318                        ErrorKind::ReservedSchemaName(new_name),
2319                    )));
2320                }
2321
2322                let conn_id = session
2323                    .map(|session| session.conn_id())
2324                    .unwrap_or(&SYSTEM_CONN_ID);
2325
2326                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
2327                let cur_name = schema.name().schema.clone();
2328
2329                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
2330                    return Err(AdapterError::Catalog(Error::new(
2331                        ErrorKind::AmbientSchemaRename(cur_name),
2332                    )));
2333                };
2334                let database = state.get_database(&database_id);
2335                let database_name = &database.name;
2336
2337                let mut updates = Vec::new();
2338                let mut items_to_update = BTreeMap::new();
2339
2340                let mut update_item = |id| {
2341                    if items_to_update.contains_key(id) {
2342                        return Ok(());
2343                    }
2344
2345                    let entry = state.get_entry(id);
2346
2347                    // Update our item.
2348                    let mut new_entry = entry.clone();
2349                    new_entry.item = entry
2350                        .item
2351                        .rename_schema_refs(database_name, &cur_name, &new_name)
2352                        .map_err(|(s, _i)| {
2353                            Error::new(ErrorKind::from(AmbiguousRename {
2354                                depender: state
2355                                    .resolve_full_name(entry.name(), entry.conn_id())
2356                                    .to_string(),
2357                                dependee: format!("{database_name}.{cur_name}"),
2358                                message: format!("ambiguous reference to schema named {s}"),
2359                            }))
2360                        })?;
2361
2362                    // Queue updates for Catalog storage and Builtin Tables.
2363                    if !new_entry.item().is_temporary() {
2364                        items_to_update.insert(*id, new_entry.into());
2365                    } else {
2366                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2367                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2368                    }
2369                    updates.push(id);
2370
2371                    Ok::<_, AdapterError>(())
2372                };
2373
2374                // Update all of the items in the schema.
2375                for (_name, item_id) in &schema.items {
2376                    // Update the item itself.
2377                    update_item(item_id)?;
2378
2379                    // Update everything that depends on this item.
2380                    for id in state.get_entry(item_id).referenced_by() {
2381                        update_item(id)?;
2382                    }
2383                }
2384                // Note: When updating the transaction it's very important that we update the
2385                // items as a whole group, otherwise we exhibit quadratic behavior.
2386                tx.update_items(items_to_update)?;
2387
2388                // Renaming temporary schemas is not supported.
2389                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2390                    let schema_name = schema.name().schema.clone();
2391                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2392                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2393                    )));
2394                };
2395
2396                // Add an entry to the audit log.
2397                let database_name = database_spec
2398                    .id()
2399                    .map(|id| state.get_database(&id).name.clone());
2400                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2401                    id: schema_id.to_string(),
2402                    old_name: schema.name().schema.clone(),
2403                    new_name: new_name.clone(),
2404                    database_name,
2405                });
2406                CatalogState::add_to_audit_log(
2407                    &state.system_configuration,
2408                    oracle_write_ts,
2409                    session,
2410                    tx,
2411                    audit_events,
2412                    EventType::Alter,
2413                    mz_audit_log::ObjectType::Schema,
2414                    details,
2415                )?;
2416
2417                // Update the schema itself.
2418                let mut new_schema = schema.clone();
2419                new_schema.name.schema.clone_from(&new_name);
2420                tx.update_schema(schema_id, new_schema.into())?;
2421
2422                for id in updates {
2423                    Self::log_update(state, id);
2424                }
2425            }
2426            Op::UpdateOwner { id, new_owner } => {
2427                let conn_id = session
2428                    .map(|session| session.conn_id())
2429                    .unwrap_or(&SYSTEM_CONN_ID);
2430                let old_owner = state
2431                    .get_owner_id(&id, conn_id)
2432                    .expect("cannot update the owner of an object without an owner");
2433                match &id {
2434                    ObjectId::Cluster(id) => {
2435                        let mut cluster = state.get_cluster(*id).clone();
2436                        if id.is_system() {
2437                            return Err(AdapterError::Catalog(Error::new(
2438                                ErrorKind::ReadOnlyCluster(cluster.name),
2439                            )));
2440                        }
2441                        Self::update_privilege_owners(
2442                            &mut cluster.privileges,
2443                            cluster.owner_id,
2444                            new_owner,
2445                        );
2446                        cluster.owner_id = new_owner;
2447                        tx.update_cluster(*id, cluster.into())?;
2448                    }
2449                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2450                        let cluster = state.get_cluster(*cluster_id);
2451                        let mut replica = cluster
2452                            .replica(*replica_id)
2453                            .expect("catalog out of sync")
2454                            .clone();
2455                        if replica_id.is_system() {
2456                            return Err(AdapterError::Catalog(Error::new(
2457                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2458                            )));
2459                        }
2460                        replica.owner_id = new_owner;
2461                        tx.update_cluster_replica(*replica_id, replica.into())?;
2462                    }
2463                    ObjectId::Database(id) => {
2464                        let mut database = state.get_database(id).clone();
2465                        if id.is_system() {
2466                            return Err(AdapterError::Catalog(Error::new(
2467                                ErrorKind::ReadOnlyDatabase(database.name),
2468                            )));
2469                        }
2470                        Self::update_privilege_owners(
2471                            &mut database.privileges,
2472                            database.owner_id,
2473                            new_owner,
2474                        );
2475                        database.owner_id = new_owner;
2476                        tx.update_database(*id, database.clone().into())?;
2477                    }
2478                    ObjectId::Schema((database_spec, schema_spec)) => {
2479                        let schema_id: SchemaId = schema_spec.clone().into();
2480                        let mut schema = state
2481                            .get_schema(database_spec, schema_spec, conn_id)
2482                            .clone();
2483                        if schema_id.is_system() {
2484                            let name = schema.name();
2485                            let full_name = state.resolve_full_schema_name(name);
2486                            return Err(AdapterError::Catalog(Error::new(
2487                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2488                            )));
2489                        }
2490                        Self::update_privilege_owners(
2491                            &mut schema.privileges,
2492                            schema.owner_id,
2493                            new_owner,
2494                        );
2495                        schema.owner_id = new_owner;
2496                        tx.update_schema(schema_id, schema.into())?;
2497                    }
2498                    ObjectId::Item(id) => {
2499                        let entry = state.get_entry(id);
2500                        let mut new_entry = entry.clone();
2501                        if id.is_system() {
2502                            let full_name = state.resolve_full_name(
2503                                new_entry.name(),
2504                                session.map(|session| session.conn_id()),
2505                            );
2506                            return Err(AdapterError::Catalog(Error::new(
2507                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2508                            )));
2509                        }
2510                        Self::update_privilege_owners(
2511                            &mut new_entry.privileges,
2512                            new_entry.owner_id,
2513                            new_owner,
2514                        );
2515                        new_entry.owner_id = new_owner;
2516                        if !new_entry.item().is_temporary() {
2517                            tx.update_item(*id, new_entry.into())?;
2518                        } else {
2519                            temporary_item_updates
2520                                .push((entry.clone().into(), StateDiff::Retraction));
2521                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2522                        }
2523                    }
2524                    ObjectId::NetworkPolicy(id) => {
2525                        let mut policy = state.get_network_policy(id).clone();
2526                        if id.is_system() {
2527                            return Err(AdapterError::Catalog(Error::new(
2528                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2529                            )));
2530                        }
2531                        Self::update_privilege_owners(
2532                            &mut policy.privileges,
2533                            policy.owner_id,
2534                            new_owner,
2535                        );
2536                        policy.owner_id = new_owner;
2537                        tx.update_network_policy(*id, policy.into())?;
2538                    }
2539                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2540                }
2541                let object_type = state.get_object_type(&id);
2542                CatalogState::add_to_audit_log(
2543                    &state.system_configuration,
2544                    oracle_write_ts,
2545                    session,
2546                    tx,
2547                    audit_events,
2548                    EventType::Alter,
2549                    object_type_to_audit_object_type(object_type),
2550                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2551                        object_id: id.to_string(),
2552                        old_owner_id: old_owner.to_string(),
2553                        new_owner_id: new_owner.to_string(),
2554                    }),
2555                )?;
2556            }
2557            Op::UpdateClusterConfig { id, name, config } => {
2558                let mut cluster = state.get_cluster(id).clone();
2559                cluster.config = config;
2560                tx.update_cluster(id, cluster.into())?;
2561                info!("update cluster {}", name);
2562
2563                CatalogState::add_to_audit_log(
2564                    &state.system_configuration,
2565                    oracle_write_ts,
2566                    session,
2567                    tx,
2568                    audit_events,
2569                    EventType::Alter,
2570                    ObjectType::Cluster,
2571                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2572                        id: id.to_string(),
2573                        name,
2574                    }),
2575                )?;
2576            }
2577            Op::UpdateClusterReplicaConfig {
2578                replica_id,
2579                cluster_id,
2580                config,
2581            } => {
2582                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2583                info!("update replica {}", replica.name);
2584                tx.update_cluster_replica(
2585                    replica_id,
2586                    mz_catalog::durable::ClusterReplica {
2587                        cluster_id,
2588                        replica_id,
2589                        name: replica.name.clone(),
2590                        config: config.clone().into(),
2591                        owner_id: replica.owner_id,
2592                    },
2593                )?;
2594            }
2595            Op::UpdateItem { id, name, to_item } => {
2596                let mut entry = state.get_entry(&id).clone();
2597                entry.name = name.clone();
2598                entry.item = to_item.clone();
2599                tx.update_item(id, entry.into())?;
2600
2601                if Self::should_audit_log_item(&to_item) {
2602                    let mut full_name = Self::full_name_detail(
2603                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2604                    );
2605                    full_name.item = name.item;
2606
2607                    CatalogState::add_to_audit_log(
2608                        &state.system_configuration,
2609                        oracle_write_ts,
2610                        session,
2611                        tx,
2612                        audit_events,
2613                        EventType::Alter,
2614                        catalog_type_to_audit_object_type(to_item.typ()),
2615                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2616                            id: id.to_string(),
2617                            name: full_name,
2618                        }),
2619                    )?;
2620                }
2621
2622                Self::log_update(state, &id);
2623            }
2624            Op::UpdateSystemConfiguration { name, value } => {
2625                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2626                tx.upsert_system_config(&name, parsed_value.clone())?;
2627                // This mirrors some "system vars" into the catalog storage
2628                // "config" collection so that we can toggle the flag with
2629                // Launch Darkly, but use it in boot before Launch Darkly is
2630                // available.
2631                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2632                    let with_0dt_deployment_max_wait =
2633                        Duration::parse(VarInput::Flat(&parsed_value))
2634                            .expect("parsing succeeded above");
2635                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2636                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2637                    let with_0dt_deployment_ddl_check_interval =
2638                        Duration::parse(VarInput::Flat(&parsed_value))
2639                            .expect("parsing succeeded above");
2640                    tx.set_0dt_deployment_ddl_check_interval(
2641                        with_0dt_deployment_ddl_check_interval,
2642                    )?;
2643                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2644                    let panic_after_timeout =
2645                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2646                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2647                }
2648
2649                CatalogState::add_to_audit_log(
2650                    &state.system_configuration,
2651                    oracle_write_ts,
2652                    session,
2653                    tx,
2654                    audit_events,
2655                    EventType::Alter,
2656                    ObjectType::System,
2657                    EventDetails::SetV1(mz_audit_log::SetV1 {
2658                        name,
2659                        value: Some(value.borrow().to_vec().join(", ")),
2660                    }),
2661                )?;
2662            }
2663            Op::ResetSystemConfiguration { name } => {
2664                tx.remove_system_config(&name);
2665                // This mirrors some "system vars" into the catalog storage
2666                // "config" collection so that we can toggle the flag with
2667                // Launch Darkly, but use it in boot before Launch Darkly is
2668                // available.
2669                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2670                    tx.reset_0dt_deployment_max_wait()?;
2671                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2672                    tx.reset_0dt_deployment_ddl_check_interval()?;
2673                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2674                    tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2675                }
2676
2677                CatalogState::add_to_audit_log(
2678                    &state.system_configuration,
2679                    oracle_write_ts,
2680                    session,
2681                    tx,
2682                    audit_events,
2683                    EventType::Alter,
2684                    ObjectType::System,
2685                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2686                )?;
2687            }
2688            Op::ResetAllSystemConfiguration => {
2689                tx.clear_system_configs();
2690                tx.reset_0dt_deployment_max_wait()?;
2691                tx.reset_0dt_deployment_ddl_check_interval()?;
2692                tx.reset_enable_0dt_deployment_panic_after_timeout()?;
2693
2694                CatalogState::add_to_audit_log(
2695                    &state.system_configuration,
2696                    oracle_write_ts,
2697                    session,
2698                    tx,
2699                    audit_events,
2700                    EventType::Alter,
2701                    ObjectType::System,
2702                    EventDetails::ResetAllV1,
2703                )?;
2704            }
2705            Op::UpdateScopedSystemParameters { scoped } => {
2706                // The sync loop pushes the *complete* desired state each tick;
2707                // diff it against the durable cache and persist only the delta.
2708                // Entries whose owning object id is absent from the catalog are
2709                // pruned (lazy GC); object ids are never reused, so such entries
2710                // are inert and pruning is hygiene only.
2711                let live_clusters: BTreeSet<ClusterId> =
2712                    tx.get_clusters().map(|cluster| cluster.id).collect();
2713                let live_replicas: BTreeSet<ReplicaId> = tx
2714                    .get_cluster_replicas()
2715                    .map(|replica| replica.replica_id)
2716                    .collect();
2717
2718                // Cluster-coherent scope.
2719                let existing_cluster: BTreeMap<(ClusterId, String), String> = tx
2720                    .get_cluster_system_configurations()
2721                    .map(|c| ((c.cluster_id, c.name), c.value))
2722                    .collect();
2723                let mut desired_cluster: BTreeSet<(ClusterId, String)> = BTreeSet::new();
2724                for (cluster_id, values) in &scoped.cluster {
2725                    if !live_clusters.contains(cluster_id) {
2726                        continue;
2727                    }
2728                    for (name, value) in values {
2729                        desired_cluster.insert((*cluster_id, name.clone()));
2730                        if existing_cluster.get(&(*cluster_id, name.clone())) != Some(value) {
2731                            tx.upsert_cluster_system_config(*cluster_id, name, value.clone())?;
2732                        }
2733                    }
2734                }
2735                // Non-live clusters never enter `desired_cluster` (they `continue`
2736                // above), so a missing entry already covers dropped clusters.
2737                for (cluster_id, name) in existing_cluster.into_keys() {
2738                    if !desired_cluster.contains(&(cluster_id, name.clone())) {
2739                        tx.remove_cluster_system_config(cluster_id, &name);
2740                    }
2741                }
2742
2743                // Replica-local scope.
2744                let existing_replica: BTreeMap<(ReplicaId, String), String> = tx
2745                    .get_replica_system_configurations()
2746                    .map(|r| ((r.replica_id, r.name), r.value))
2747                    .collect();
2748                let mut desired_replica: BTreeSet<(ReplicaId, String)> = BTreeSet::new();
2749                for (replica_id, values) in &scoped.replica {
2750                    if !live_replicas.contains(replica_id) {
2751                        continue;
2752                    }
2753                    for (name, value) in values {
2754                        desired_replica.insert((*replica_id, name.clone()));
2755                        if existing_replica.get(&(*replica_id, name.clone())) != Some(value) {
2756                            tx.upsert_replica_system_config(*replica_id, name, value.clone())?;
2757                        }
2758                    }
2759                }
2760                for (replica_id, name) in existing_replica.into_keys() {
2761                    if !desired_replica.contains(&(replica_id, name.clone())) {
2762                        tx.remove_replica_system_config(replica_id, &name);
2763                    }
2764                }
2765            }
2766            Op::InjectAuditEvents { events } => {
2767                for event in events {
2768                    let id = tx.allocate_audit_log_id()?;
2769                    let ev = VersionedEvent::new(
2770                        id,
2771                        event.event_type,
2772                        event.object_type,
2773                        event.details,
2774                        event.user,
2775                        oracle_write_ts.into(),
2776                    );
2777                    audit_events.push(ev.clone());
2778                    tx.insert_audit_log_event(ev);
2779                }
2780            }
2781        };
2782        Ok(temporary_item_updates)
2783    }
2784
2785    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2786        let entry = state.get_entry(id);
2787        info!(
2788            "update {} {} ({})",
2789            entry.item_type(),
2790            state.resolve_full_name(entry.name(), entry.conn_id()),
2791            id
2792        );
2793    }
2794
2795    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2796    /// implementation:
2797    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2798    fn update_privilege_owners(
2799        privileges: &mut PrivilegeMap,
2800        old_owner: RoleId,
2801        new_owner: RoleId,
2802    ) {
2803        // TODO(jkosh44) Would be nice not to clone every privilege.
2804        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2805
2806        let mut new_present = false;
2807        for privilege in flat_privileges.iter_mut() {
2808            // Old owner's granted privilege are updated to be granted by the new
2809            // owner.
2810            if privilege.grantor == old_owner {
2811                privilege.grantor = new_owner;
2812            } else if privilege.grantor == new_owner {
2813                new_present = true;
2814            }
2815            // Old owner's privileges is given to the new owner.
2816            if privilege.grantee == old_owner {
2817                privilege.grantee = new_owner;
2818            } else if privilege.grantee == new_owner {
2819                new_present = true;
2820            }
2821        }
2822
2823        // If the old privilege list contained references to the new owner, we may
2824        // have created duplicate entries. Here we try and consolidate them. This
2825        // is inspired by PostgreSQL's algorithm but not identical.
2826        if new_present {
2827            // Group privileges by (grantee, grantor).
2828            let privilege_map: BTreeMap<_, Vec<_>> =
2829                flat_privileges
2830                    .into_iter()
2831                    .fold(BTreeMap::new(), |mut accum, privilege| {
2832                        accum
2833                            .entry((privilege.grantee, privilege.grantor))
2834                            .or_default()
2835                            .push(privilege);
2836                        accum
2837                    });
2838
2839            // Consolidate and update all privileges.
2840            flat_privileges = privilege_map
2841                .into_iter()
2842                .map(|((grantee, grantor), values)|
2843                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2844                    values.into_iter().fold(
2845                        MzAclItem::empty(grantee, grantor),
2846                        |mut accum, mz_aclitem| {
2847                            accum.acl_mode =
2848                                accum.acl_mode.union(mz_aclitem.acl_mode);
2849                            accum
2850                        },
2851                    ))
2852                .collect();
2853        }
2854
2855        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2856    }
2857}
2858
2859/// Prepare the given transaction for replacing a catalog item with a new version.
2860///
2861/// The new version gets a new `CatalogItemId`, which requires rewriting the `create_sql` of all
2862/// dependent objects to refer to that new ID (at a previous version).
2863///
2864/// Note that here is where we break the assumption that the `CatalogItemId` is a stable identifier
2865/// for catalog items. We currently think that there are no use cases that require this assumption,
2866/// but no way to know for sure.
2867fn tx_replace_item(
2868    tx: &mut Transaction<'_>,
2869    state: &CatalogState,
2870    id: CatalogItemId,
2871    new_entry: CatalogEntry,
2872) -> Result<(), AdapterError> {
2873    let new_id = new_entry.id;
2874
2875    // Rewrite dependent objects to point to the new ID.
2876    for use_id in new_entry.referenced_by() {
2877        // The dependent might be dropped in the same tx, so check.
2878        if tx.get_item(use_id).is_none() {
2879            continue;
2880        }
2881
2882        let mut dependent = state.get_entry(use_id).clone();
2883        dependent.item = dependent.item.replace_item_refs(id, new_id);
2884        tx.update_item(*use_id, dependent.into())?;
2885    }
2886
2887    // Move comments to the new ID.
2888    let old_comment_id = state.get_comment_id(ObjectId::Item(id));
2889    let new_comment_id = new_entry.comment_object_id();
2890    if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
2891        tx.drop_comments(&[old_comment_id].into())?;
2892        for (sub, comment) in comments {
2893            tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
2894        }
2895    }
2896
2897    let mz_catalog::durable::Item {
2898        id: _,
2899        oid,
2900        global_id,
2901        schema_id,
2902        name,
2903        create_sql,
2904        owner_id,
2905        privileges,
2906        extra_versions,
2907    } = new_entry.into();
2908
2909    tx.remove_item(id)?;
2910    tx.insert_item(
2911        new_id,
2912        oid,
2913        global_id,
2914        schema_id,
2915        &name,
2916        create_sql,
2917        owner_id,
2918        privileges,
2919        extra_versions,
2920    )?;
2921
2922    Ok(())
2923}
2924
2925/// Generate audit events for a replacement apply operation.
2926fn apply_replacement_audit_events(
2927    state: &CatalogState,
2928    target: &CatalogEntry,
2929    replacement: &CatalogEntry,
2930) -> Vec<(EventType, EventDetails)> {
2931    let mut events = Vec::new();
2932
2933    let target_name = state.resolve_full_name(target.name(), target.conn_id());
2934    let target_id_name = IdFullNameV1 {
2935        id: target.id().to_string(),
2936        name: Catalog::full_name_detail(&target_name),
2937    };
2938    let replacement_name = state.resolve_full_name(replacement.name(), replacement.conn_id());
2939    let replacement_id_name = IdFullNameV1 {
2940        id: replacement.id().to_string(),
2941        name: Catalog::full_name_detail(&replacement_name),
2942    };
2943
2944    if Catalog::should_audit_log_item(&replacement.item) {
2945        events.push((
2946            EventType::Drop,
2947            EventDetails::IdFullNameV1(replacement_id_name.clone()),
2948        ));
2949    }
2950
2951    if Catalog::should_audit_log_item(&target.item) {
2952        events.push((
2953            EventType::Alter,
2954            EventDetails::AlterApplyReplacementV1(mz_audit_log::AlterApplyReplacementV1 {
2955                target: target_id_name.clone(),
2956                replacement: replacement_id_name,
2957            }),
2958        ));
2959
2960        if let Some(old_cluster_id) = target.cluster_id()
2961            && let Some(new_cluster_id) = replacement.cluster_id()
2962            && old_cluster_id != new_cluster_id
2963        {
2964            // When the replacement is applied, the target takes on the ID of the replacement, so
2965            // we should use that ID for subsequent events.
2966            events.push((
2967                EventType::Alter,
2968                EventDetails::AlterSetClusterV1(mz_audit_log::AlterSetClusterV1 {
2969                    id: replacement.id().to_string(),
2970                    name: target_id_name.name,
2971                    old_cluster_id: old_cluster_id.to_string(),
2972                    new_cluster_id: new_cluster_id.to_string(),
2973                }),
2974            ));
2975        }
2976    }
2977
2978    events
2979}
2980
2981/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2982///
2983/// Note: Previously we used to omit a single `Op::DropObject` for every object
2984/// we needed to drop. But removing a batch of objects from a durable Catalog
2985/// Transaction is O(n) where `n` is the number of objects that exist in the
2986/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2987/// `DROP ... CASCADE` statement.
2988#[derive(Debug, Default)]
2989pub(crate) struct ObjectsToDrop {
2990    pub comments: BTreeSet<CommentObjectId>,
2991    pub databases: BTreeSet<DatabaseId>,
2992    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2993    pub clusters: BTreeSet<ClusterId>,
2994    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2995    pub roles: BTreeSet<RoleId>,
2996    pub items: Vec<CatalogItemId>,
2997    pub network_policies: BTreeSet<NetworkPolicyId>,
2998}
2999
3000impl ObjectsToDrop {
3001    pub fn generate(
3002        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
3003        state: &CatalogState,
3004        session: Option<&ConnMeta>,
3005    ) -> Result<Self, AdapterError> {
3006        let mut delta = ObjectsToDrop::default();
3007
3008        for drop_object_info in drop_object_infos {
3009            delta.add_item(drop_object_info, state, session)?;
3010        }
3011
3012        Ok(delta)
3013    }
3014
3015    fn add_item(
3016        &mut self,
3017        drop_object_info: DropObjectInfo,
3018        state: &CatalogState,
3019        session: Option<&ConnMeta>,
3020    ) -> Result<(), AdapterError> {
3021        self.comments
3022            .insert(state.get_comment_id(drop_object_info.to_object_id()));
3023
3024        match drop_object_info {
3025            DropObjectInfo::Database(database_id) => {
3026                let database = &state.database_by_id[&database_id];
3027                if database_id.is_system() {
3028                    return Err(AdapterError::Catalog(Error::new(
3029                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
3030                    )));
3031                }
3032
3033                self.databases.insert(database_id);
3034            }
3035            DropObjectInfo::Schema((database_spec, schema_spec)) => {
3036                let schema = state.get_schema(
3037                    &database_spec,
3038                    &schema_spec,
3039                    session
3040                        .map(|session| session.conn_id())
3041                        .unwrap_or(&SYSTEM_CONN_ID),
3042                );
3043                let schema_id: SchemaId = schema_spec.into();
3044                if schema_id.is_system() {
3045                    let name = schema.name();
3046                    let full_name = state.resolve_full_schema_name(name);
3047                    return Err(AdapterError::Catalog(Error::new(
3048                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
3049                    )));
3050                }
3051
3052                self.schemas.insert(schema_spec, database_spec);
3053            }
3054            DropObjectInfo::Role(role_id) => {
3055                let name = state.get_role(&role_id).name().to_string();
3056                if role_id.is_system() || role_id.is_predefined() {
3057                    return Err(AdapterError::Catalog(Error::new(
3058                        ErrorKind::ReservedRoleName(name.clone()),
3059                    )));
3060                }
3061                state.ensure_not_reserved_role(&role_id)?;
3062
3063                self.roles.insert(role_id);
3064            }
3065            DropObjectInfo::Cluster(cluster_id) => {
3066                let cluster = state.get_cluster(cluster_id);
3067                let name = &cluster.name;
3068                if cluster_id.is_system() {
3069                    return Err(AdapterError::Catalog(Error::new(
3070                        ErrorKind::ReadOnlyCluster(name.clone()),
3071                    )));
3072                }
3073
3074                self.clusters.insert(cluster_id);
3075            }
3076            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
3077                let cluster = state.get_cluster(cluster_id);
3078                let replica = cluster.replica(replica_id).expect("Must exist");
3079
3080                self.replicas
3081                    .insert(replica.replica_id, (cluster.id, reason));
3082
3083                // Implicitly drop materialized views that target this replica.
3084                // When the target replica is gone, no replica advances the
3085                // persist shard's upper frontier, causing reads to hang. Cascade
3086                // to anything depending on the implicitly-dropped MV so we don't
3087                // leave dangling references in the catalog.
3088                //
3089                // Plan-driven drops already include these dependents as their
3090                // own `DropObjectInfo::Item` entries (the plan stage expands
3091                // them via `cluster_replica_dependents`), so each one's comment
3092                // is recorded by the top-of-function `self.comments` insert.
3093                // This branch handles internal callers that build
3094                // `Op::DropObjects` directly with only the replica, so we expand
3095                // the dependents and record their comments ourselves.
3096                //
3097                // `seen` is seeded from the items already collected so that the
3098                // plan-driven path (where the dependents are processed before
3099                // the replica, in reverse-dependency order) does not re-add them
3100                // here.
3101                let mut seen: BTreeSet<ObjectId> =
3102                    self.items.iter().copied().map(ObjectId::Item).collect();
3103                for dep in state.cluster_replica_dependents(cluster_id, replica_id, &mut seen) {
3104                    if let ObjectId::Item(dep_id) = dep {
3105                        info!(
3106                            "implicitly dropping {} because target replica was dropped",
3107                            state.get_entry(&dep_id).name().item,
3108                        );
3109                        self.comments
3110                            .insert(state.get_comment_id(ObjectId::Item(dep_id)));
3111                        self.items.push(dep_id);
3112                    }
3113                }
3114            }
3115            DropObjectInfo::Item(item_id) => {
3116                let entry = state.get_entry(&item_id);
3117                if item_id.is_system() {
3118                    let name = entry.name();
3119                    let full_name =
3120                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
3121                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
3122                        full_name.to_string(),
3123                    ))));
3124                }
3125
3126                self.items.push(item_id);
3127            }
3128            DropObjectInfo::NetworkPolicy(network_policy_id) => {
3129                let policy = state.get_network_policy(&network_policy_id);
3130                let name = &policy.name;
3131                if network_policy_id.is_system() {
3132                    return Err(AdapterError::Catalog(Error::new(
3133                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
3134                    )));
3135                }
3136
3137                self.network_policies.insert(network_policy_id);
3138            }
3139        }
3140
3141        Ok(())
3142    }
3143}
3144
3145#[cfg(test)]
3146mod tests {
3147    use mz_catalog::SYSTEM_CONN_ID;
3148    use mz_catalog::memory::objects::{CatalogItem, Table, TableDataSource};
3149    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
3150    use mz_repr::role_id::RoleId;
3151    use mz_repr::{RelationDesc, RelationVersion, VersionedRelationDesc};
3152    use mz_sql::DEFAULT_SCHEMA;
3153    use mz_sql::catalog::CatalogDatabase;
3154    use mz_sql::names::{
3155        ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
3156    };
3157    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
3158
3159    use crate::catalog::{Catalog, Op};
3160    use crate::session::DEFAULT_DATABASE_NAME;
3161
3162    #[mz_ore::test]
3163    fn test_update_privilege_owners() {
3164        let old_owner = RoleId::User(1);
3165        let new_owner = RoleId::User(2);
3166        let other_role = RoleId::User(3);
3167
3168        // older owner exists as grantor.
3169        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3170            MzAclItem {
3171                grantee: other_role,
3172                grantor: old_owner,
3173                acl_mode: AclMode::UPDATE,
3174            },
3175            MzAclItem {
3176                grantee: other_role,
3177                grantor: new_owner,
3178                acl_mode: AclMode::SELECT,
3179            },
3180        ]);
3181        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3182        assert_eq!(1, privileges.all_values().count());
3183        assert_eq!(
3184            vec![MzAclItem {
3185                grantee: other_role,
3186                grantor: new_owner,
3187                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3188            }],
3189            privileges.all_values_owned().collect::<Vec<_>>()
3190        );
3191
3192        // older owner exists as grantee.
3193        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3194            MzAclItem {
3195                grantee: old_owner,
3196                grantor: other_role,
3197                acl_mode: AclMode::UPDATE,
3198            },
3199            MzAclItem {
3200                grantee: new_owner,
3201                grantor: other_role,
3202                acl_mode: AclMode::SELECT,
3203            },
3204        ]);
3205        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3206        assert_eq!(1, privileges.all_values().count());
3207        assert_eq!(
3208            vec![MzAclItem {
3209                grantee: new_owner,
3210                grantor: other_role,
3211                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3212            }],
3213            privileges.all_values_owned().collect::<Vec<_>>()
3214        );
3215
3216        // older owner exists as grantee and grantor.
3217        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
3218            MzAclItem {
3219                grantee: old_owner,
3220                grantor: old_owner,
3221                acl_mode: AclMode::UPDATE,
3222            },
3223            MzAclItem {
3224                grantee: new_owner,
3225                grantor: new_owner,
3226                acl_mode: AclMode::SELECT,
3227            },
3228        ]);
3229        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
3230        assert_eq!(1, privileges.all_values().count());
3231        assert_eq!(
3232            vec![MzAclItem {
3233                grantee: new_owner,
3234                grantor: new_owner,
3235                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
3236            }],
3237            privileges.all_values_owned().collect::<Vec<_>>()
3238        );
3239    }
3240
3241    /// Verifies that `transact_incremental_dry_run` processes only new ops
3242    /// against the accumulated state, not all ops from scratch. Two paths are
3243    /// compared:
3244    ///   - Incremental: two separate calls, each with one op
3245    ///   - All-at-once: one call with both ops
3246    /// Both must produce equivalent catalog state.
3247    #[mz_ore::test(tokio::test)]
3248    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method`
3249    async fn test_transact_incremental_dry_run_processes_only_new_ops() {
3250        Catalog::with_debug(|catalog| async move {
3251            // Resolve default database and schema.
3252            let database = catalog
3253                .resolve_database(DEFAULT_DATABASE_NAME)
3254                .expect("default database");
3255            let database_name = database.name.clone();
3256            let database_spec = ResolvedDatabaseSpecifier::Id(database.id());
3257            let schema = catalog
3258                .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID)
3259                .expect("default schema");
3260            let schema_name = schema.name.schema.clone();
3261            let schema_spec = schema.id.clone();
3262
3263            // Allocate IDs for two tables.
3264            let (id_t1, global_id_t1) = catalog
3265                .allocate_user_id_for_test()
3266                .await
3267                .expect("allocate id for t1");
3268            let (id_t2, global_id_t2) = catalog
3269                .allocate_user_id_for_test()
3270                .await
3271                .expect("allocate id for t2");
3272
3273            let oracle_write_ts = catalog.current_upper().await;
3274
3275            // Build two CreateItem ops.
3276            let make_table_op = |id, global_id, name: &str| Op::CreateItem {
3277                id,
3278                name: QualifiedItemName {
3279                    qualifiers: ItemQualifiers {
3280                        database_spec: database_spec.clone(),
3281                        schema_spec: schema_spec.clone(),
3282                    },
3283                    item: name.to_string(),
3284                },
3285                item: CatalogItem::Table(Table {
3286                    create_sql: Some(format!(
3287                        "CREATE TABLE {database_name}.{schema_name}.{name} ()"
3288                    )),
3289                    desc: VersionedRelationDesc::new(RelationDesc::empty()),
3290                    collections: [(RelationVersion::root(), global_id)].into_iter().collect(),
3291                    conn_id: None,
3292                    resolved_ids: ResolvedIds::empty(),
3293                    custom_logical_compaction_window: None,
3294                    is_retained_metrics_object: false,
3295                    data_source: TableDataSource::TableWrites { defaults: vec![] },
3296                }),
3297                owner_id: MZ_SYSTEM_ROLE_ID,
3298            };
3299
3300            let op_t1 = make_table_op(id_t1, global_id_t1, "t1");
3301            let op_t2 = make_table_op(id_t2, global_id_t2, "t2");
3302
3303            let base_state = catalog.state().clone();
3304
3305            // --- Path A: Incremental (two separate dry-run calls) ---
3306
3307            // First call: only op_t1, no previous snapshot.
3308            let (state_after_t1, snapshot_after_t1) = catalog
3309                .transact_incremental_dry_run(
3310                    &base_state,
3311                    vec![op_t1.clone()],
3312                    None,
3313                    None,
3314                    oracle_write_ts,
3315                )
3316                .await
3317                .expect("first dry run");
3318
3319            // After first dry run: t1 exists, t2 does not.
3320            assert!(
3321                state_after_t1.try_get_entry(&id_t1).is_some(),
3322                "t1 should exist after first dry run"
3323            );
3324            assert_eq!(
3325                state_after_t1
3326                    .try_get_entry(&id_t1)
3327                    .expect("t1 entry")
3328                    .name()
3329                    .item,
3330                "t1"
3331            );
3332            assert!(
3333                state_after_t1.try_get_entry(&id_t2).is_none(),
3334                "t2 should NOT exist after first dry run"
3335            );
3336
3337            // Second call: only op_t2, using state/snapshot from first call.
3338            let (state_incremental, _) = catalog
3339                .transact_incremental_dry_run(
3340                    &state_after_t1,
3341                    vec![op_t2.clone()],
3342                    None,
3343                    Some(snapshot_after_t1),
3344                    oracle_write_ts,
3345                )
3346                .await
3347                .expect("second dry run");
3348
3349            // After second dry run: both t1 and t2 exist.
3350            assert!(
3351                state_incremental.try_get_entry(&id_t1).is_some(),
3352                "t1 should exist in incremental result"
3353            );
3354            assert!(
3355                state_incremental.try_get_entry(&id_t2).is_some(),
3356                "t2 should exist in incremental result"
3357            );
3358
3359            // --- Path B: All-at-once (single dry-run call with both ops) ---
3360
3361            let (state_all_at_once, _) = catalog
3362                .transact_incremental_dry_run(
3363                    &base_state,
3364                    vec![op_t1.clone(), op_t2.clone()],
3365                    None,
3366                    None,
3367                    oracle_write_ts,
3368                )
3369                .await
3370                .expect("all-at-once dry run");
3371
3372            assert!(
3373                state_all_at_once.try_get_entry(&id_t1).is_some(),
3374                "t1 should exist in all-at-once result"
3375            );
3376            assert!(
3377                state_all_at_once.try_get_entry(&id_t2).is_some(),
3378                "t2 should exist in all-at-once result"
3379            );
3380
3381            // --- Compare: both paths produce equivalent items ---
3382
3383            let inc_t1 = state_incremental.try_get_entry(&id_t1).expect("inc t1");
3384            let all_t1 = state_all_at_once.try_get_entry(&id_t1).expect("all t1");
3385            assert_eq!(inc_t1.name(), all_t1.name());
3386            assert_eq!(inc_t1.owner_id, all_t1.owner_id);
3387
3388            let inc_t2 = state_incremental.try_get_entry(&id_t2).expect("inc t2");
3389            let all_t2 = state_all_at_once.try_get_entry(&id_t2).expect("all t2");
3390            assert_eq!(inc_t2.name(), all_t2.name());
3391            assert_eq!(inc_t2.owner_id, all_t2.owner_id);
3392
3393            catalog.expire().await;
3394        })
3395        .await
3396    }
3397}