mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34    StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction, PasswordConfig,
50    RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69    is_reserved_role_name, object_type_to_audit_object_type,
70    system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::cluster_scheduling::SchedulingDecision;
74use crate::util::ResultExt;
75
76#[derive(Debug, Clone)]
77pub enum Op {
78    AlterRetainHistory {
79        id: CatalogItemId,
80        value: Option<Value>,
81        window: CompactionWindow,
82    },
83    AlterRole {
84        id: RoleId,
85        name: String,
86        attributes: RoleAttributesRaw,
87        nopassword: bool,
88        vars: RoleVars,
89    },
90    AlterNetworkPolicy {
91        id: NetworkPolicyId,
92        rules: Vec<NetworkPolicyRule>,
93        name: String,
94        owner_id: RoleId,
95    },
96    AlterAddColumn {
97        id: CatalogItemId,
98        new_global_id: GlobalId,
99        name: ColumnName,
100        typ: SqlColumnType,
101        sql: RawDataType,
102    },
103    CreateDatabase {
104        name: String,
105        owner_id: RoleId,
106    },
107    CreateSchema {
108        database_id: ResolvedDatabaseSpecifier,
109        schema_name: String,
110        owner_id: RoleId,
111    },
112    CreateRole {
113        name: String,
114        attributes: RoleAttributesRaw,
115    },
116    CreateCluster {
117        id: ClusterId,
118        name: String,
119        introspection_sources: Vec<&'static BuiltinLog>,
120        owner_id: RoleId,
121        config: ClusterConfig,
122    },
123    CreateClusterReplica {
124        cluster_id: ClusterId,
125        name: String,
126        config: ReplicaConfig,
127        owner_id: RoleId,
128        reason: ReplicaCreateDropReason,
129    },
130    CreateItem {
131        id: CatalogItemId,
132        name: QualifiedItemName,
133        item: CatalogItem,
134        owner_id: RoleId,
135    },
136    CreateNetworkPolicy {
137        rules: Vec<NetworkPolicyRule>,
138        name: String,
139        owner_id: RoleId,
140    },
141    Comment {
142        object_id: CommentObjectId,
143        sub_component: Option<usize>,
144        comment: Option<String>,
145    },
146    DropObjects(Vec<DropObjectInfo>),
147    GrantRole {
148        role_id: RoleId,
149        member_id: RoleId,
150        grantor_id: RoleId,
151    },
152    RenameCluster {
153        id: ClusterId,
154        name: String,
155        to_name: String,
156        check_reserved_names: bool,
157    },
158    RenameClusterReplica {
159        cluster_id: ClusterId,
160        replica_id: ReplicaId,
161        name: QualifiedReplica,
162        to_name: String,
163    },
164    RenameItem {
165        id: CatalogItemId,
166        current_full_name: FullItemName,
167        to_name: String,
168    },
169    RenameSchema {
170        database_spec: ResolvedDatabaseSpecifier,
171        schema_spec: SchemaSpecifier,
172        new_name: String,
173        check_reserved_names: bool,
174    },
175    UpdateOwner {
176        id: ObjectId,
177        new_owner: RoleId,
178    },
179    UpdatePrivilege {
180        target_id: SystemObjectId,
181        privilege: MzAclItem,
182        variant: UpdatePrivilegeVariant,
183    },
184    UpdateDefaultPrivilege {
185        privilege_object: DefaultPrivilegeObject,
186        privilege_acl_item: DefaultPrivilegeAclItem,
187        variant: UpdatePrivilegeVariant,
188    },
189    RevokeRole {
190        role_id: RoleId,
191        member_id: RoleId,
192        grantor_id: RoleId,
193    },
194    UpdateClusterConfig {
195        id: ClusterId,
196        name: String,
197        config: ClusterConfig,
198    },
199    UpdateClusterReplicaConfig {
200        cluster_id: ClusterId,
201        replica_id: ReplicaId,
202        config: ReplicaConfig,
203    },
204    UpdateItem {
205        id: CatalogItemId,
206        name: QualifiedItemName,
207        to_item: CatalogItem,
208    },
209    UpdateSourceReferences {
210        source_id: CatalogItemId,
211        references: SourceReferences,
212    },
213    UpdateSystemConfiguration {
214        name: String,
215        value: OwnedVarInput,
216    },
217    ResetSystemConfiguration {
218        name: String,
219    },
220    ResetAllSystemConfiguration,
221    /// Performs updates to the storage usage table, which probably should be a builtin source.
222    ///
223    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
224    /// might not work. If a process crashes after collecting storage usage events
225    /// but before updating the builtin table, then another listening catalog
226    /// will never know to update the builtin table.
227    WeirdStorageUsageUpdates {
228        object_id: Option<String>,
229        size_bytes: u64,
230        collection_timestamp: EpochMillis,
231    },
232    /// Performs a dry run of the commit, but errors with
233    /// [`AdapterError::TransactionDryRun`].
234    ///
235    /// When using this value, it should be included only as the last element of
236    /// the transaction and should not be the only value in the transaction.
237    TransactionDryRun,
238}
239
240/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
241/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
242/// the `Op::DropObjects`.
243#[derive(Debug, Clone)]
244pub enum DropObjectInfo {
245    Cluster(ClusterId),
246    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
247    Database(DatabaseId),
248    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
249    Role(RoleId),
250    Item(CatalogItemId),
251    NetworkPolicy(NetworkPolicyId),
252}
253
254impl DropObjectInfo {
255    /// Creates a `DropObjectInfo` from an `ObjectId`.
256    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
257    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
258        match id {
259            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
260            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
261                cluster_id,
262                replica_id,
263                ReplicaCreateDropReason::Manual,
264            )),
265            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
266            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
267            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
268            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
269            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
270        }
271    }
272
273    /// Creates an `ObjectId` from a `DropObjectInfo`.
274    /// Loses the `ReplicaCreateDropReason` if there is one!
275    fn to_object_id(&self) -> ObjectId {
276        match &self {
277            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
278            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
279                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
280            }
281            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
282            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
283            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
284            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
285            DropObjectInfo::NetworkPolicy(network_policy_id) => {
286                ObjectId::NetworkPolicy(network_policy_id.clone())
287            }
288        }
289    }
290}
291
292/// The reason for creating or dropping a replica.
293#[derive(Debug, Clone)]
294pub enum ReplicaCreateDropReason {
295    /// The user initiated the replica create or drop, e.g., by
296    /// - creating/dropping a cluster,
297    /// - ALTERing various options on a managed cluster,
298    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
299    Manual,
300    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
301    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
302    ClusterScheduling(Vec<SchedulingDecision>),
303}
304
305impl ReplicaCreateDropReason {
306    pub fn into_audit_log(
307        self,
308    ) -> (
309        CreateOrDropClusterReplicaReasonV1,
310        Option<SchedulingDecisionsWithReasonsV2>,
311    ) {
312        let (reason, scheduling_policies) = match self {
313            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
314            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
315                CreateOrDropClusterReplicaReasonV1::Schedule,
316                Some(scheduling_decisions),
317            ),
318        };
319        (
320            reason,
321            scheduling_policies
322                .as_ref()
323                .map(SchedulingDecision::reasons_to_audit_log_reasons),
324        )
325    }
326}
327
328pub struct TransactionResult {
329    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
330    pub audit_events: Vec<VersionedEvent>,
331}
332
333impl Catalog {
334    fn should_audit_log_item(item: &CatalogItem) -> bool {
335        !item.is_temporary()
336    }
337
338    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
339    /// within a connection id.
340    fn temporary_ids(
341        &self,
342        ops: &[Op],
343        temporary_drops: BTreeSet<(&ConnectionId, String)>,
344    ) -> Result<BTreeSet<CatalogItemId>, Error> {
345        let mut creating = BTreeSet::new();
346        let mut temporary_ids = BTreeSet::new();
347        for op in ops.iter() {
348            if let Op::CreateItem {
349                id,
350                name,
351                item,
352                owner_id: _,
353            } = op
354            {
355                if let Some(conn_id) = item.conn_id() {
356                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
357                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
358                        || creating.contains(&(conn_id, &name.item))
359                    {
360                        return Err(
361                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
362                        );
363                    } else {
364                        creating.insert((conn_id, &name.item));
365                        temporary_ids.insert(id.clone());
366                    }
367                }
368            }
369        }
370        Ok(temporary_ids)
371    }
372
373    #[instrument(name = "catalog::transact")]
374    pub async fn transact(
375        &mut self,
376        // n.b. this is an option to prevent us from needing to build out a
377        // dummy impl of `StorageController` for tests.
378        storage_collections: Option<
379            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
380        >,
381        oracle_write_ts: mz_repr::Timestamp,
382        session: Option<&ConnMeta>,
383        ops: Vec<Op>,
384    ) -> Result<TransactionResult, AdapterError> {
385        trace!("transact: {:?}", ops);
386        fail::fail_point!("catalog_transact", |arg| {
387            Err(AdapterError::Unstructured(anyhow::anyhow!(
388                "failpoint: {arg:?}"
389            )))
390        });
391
392        let drop_ids: BTreeSet<CatalogItemId> = ops
393            .iter()
394            .filter_map(|op| match op {
395                Op::DropObjects(drop_object_infos) => {
396                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
397                    let item_ids = ids.filter_map(|id| match id {
398                        ObjectId::Item(id) => Some(id),
399                        _ => None,
400                    });
401                    Some(item_ids)
402                }
403                _ => None,
404            })
405            .flatten()
406            .collect();
407        let temporary_drops = drop_ids
408            .iter()
409            .filter_map(|id| {
410                let entry = self.get_entry(id);
411                match entry.item().conn_id() {
412                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
413                    None => None,
414                }
415            })
416            .collect();
417        let dropped_global_ids = drop_ids
418            .iter()
419            .flat_map(|item_id| self.get_global_ids(item_id))
420            .collect();
421
422        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
423        let mut builtin_table_updates = vec![];
424        let mut audit_events = vec![];
425        let mut storage = self.storage().await;
426        let mut tx = storage
427            .transaction()
428            .await
429            .unwrap_or_terminate("starting catalog transaction");
430
431        let new_state = Self::transact_inner(
432            storage_collections,
433            oracle_write_ts,
434            session,
435            ops,
436            temporary_ids,
437            &mut builtin_table_updates,
438            &mut audit_events,
439            &mut tx,
440            &self.state,
441        )
442        .await?;
443
444        // The user closure was successful, apply the updates. Terminate the
445        // process if this fails, because we have to restart envd due to
446        // indeterminate catalog state, which we only reconcile during catalog
447        // init.
448        tx.commit(oracle_write_ts)
449            .await
450            .unwrap_or_terminate("catalog storage transaction commit must succeed");
451
452        // Dropping here keeps the mutable borrow on self, preventing us accidentally
453        // mutating anything until after f is executed.
454        drop(storage);
455        if let Some(new_state) = new_state {
456            self.transient_revision += 1;
457            self.state = new_state;
458        }
459
460        // Drop in-memory planning metadata.
461        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
462        if self.state.system_config().enable_mz_notices() {
463            // Generate retractions for the Builtin tables.
464            self.state().pack_optimizer_notices(
465                &mut builtin_table_updates,
466                dropped_notices.iter(),
467                Diff::MINUS_ONE,
468            );
469        }
470
471        Ok(TransactionResult {
472            builtin_table_updates,
473            audit_events,
474        })
475    }
476
477    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
478    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
479    ///
480    /// # Panics
481    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
482    ///   final element.
483    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
484    #[instrument(name = "catalog::transact_inner")]
485    async fn transact_inner(
486        storage_collections: Option<
487            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
488        >,
489        oracle_write_ts: mz_repr::Timestamp,
490        session: Option<&ConnMeta>,
491        mut ops: Vec<Op>,
492        temporary_ids: BTreeSet<CatalogItemId>,
493        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
494        audit_events: &mut Vec<VersionedEvent>,
495        tx: &mut Transaction<'_>,
496        state: &CatalogState,
497    ) -> Result<Option<CatalogState>, AdapterError> {
498        let mut state = Cow::Borrowed(state);
499
500        let dry_run_ops = match ops.last() {
501            Some(Op::TransactionDryRun) => {
502                // Remove dry run marker.
503                ops.pop();
504                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
505                ops.clone()
506            }
507            Some(_) => vec![],
508            None => return Ok(None),
509        };
510
511        let mut storage_collections_to_create = BTreeSet::new();
512        let mut storage_collections_to_drop = BTreeSet::new();
513        let mut storage_collections_to_register = BTreeMap::new();
514
515        for op in ops {
516            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
517                oracle_write_ts,
518                session,
519                op,
520                &temporary_ids,
521                audit_events,
522                tx,
523                &*state,
524                &mut storage_collections_to_create,
525                &mut storage_collections_to_drop,
526                &mut storage_collections_to_register,
527            )
528            .await?;
529
530            // Certain builtin tables are not derived from the durable catalog state, so they need
531            // to be updated ad-hoc based on the current transaction. This is weird and will not
532            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
533            // this instance crashes after committing the durable catalog but before applying the
534            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
535            // world. Currently, this works fine and there are no correctness issues because
536            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
537            // state of all builtin tables to the correct values.
538            if let Some(builtin_table_update) = weird_builtin_table_update {
539                builtin_table_updates.push(builtin_table_update);
540            }
541
542            // Temporary items are not stored in the durable catalog, so they need to be handled
543            // separately for updating state and builtin tables.
544            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
545            // in a multi-subscriber catalog world.
546            let op_id = tx.op_id().into();
547            let temporary_item_updates =
548                temporary_item_updates
549                    .into_iter()
550                    .map(|(item, diff)| StateUpdate {
551                        kind: StateUpdateKind::TemporaryItem(item),
552                        ts: op_id,
553                        diff,
554                    });
555
556            let mut updates: Vec<_> = tx.get_and_commit_op_updates();
557            updates.extend(temporary_item_updates);
558            if !updates.is_empty() {
559                let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
560                let op_builtin_table_updates = state
561                    .to_mut()
562                    .resolve_builtin_table_updates(op_builtin_table_updates);
563                builtin_table_updates.extend(op_builtin_table_updates);
564            }
565        }
566
567        if dry_run_ops.is_empty() {
568            // `storage_collections` should only be `None` for tests.
569            if let Some(c) = storage_collections {
570                c.prepare_state(
571                    tx,
572                    storage_collections_to_create,
573                    storage_collections_to_drop,
574                    storage_collections_to_register,
575                )
576                .await?;
577            }
578
579            let updates = tx.get_and_commit_op_updates();
580            if !updates.is_empty() {
581                let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
582                let op_builtin_table_updates = state
583                    .to_mut()
584                    .resolve_builtin_table_updates(op_builtin_table_updates);
585                builtin_table_updates.extend(op_builtin_table_updates);
586            }
587
588            match state {
589                Cow::Owned(state) => Ok(Some(state)),
590                Cow::Borrowed(_) => Ok(None),
591            }
592        } else {
593            Err(AdapterError::TransactionDryRun {
594                new_ops: dry_run_ops,
595                new_state: state.into_owned(),
596            })
597        }
598    }
599
600    /// Performs the transaction operation described by `op`. This function prepares the changes in
601    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
602    /// changes.
603    ///
604    /// Optionally returns a builtin table update for any builtin table updates than cannot be
605    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
606    /// scenarios and ideally in the future don't exist.
607    #[instrument]
608    async fn transact_op(
609        oracle_write_ts: mz_repr::Timestamp,
610        session: Option<&ConnMeta>,
611        op: Op,
612        temporary_ids: &BTreeSet<CatalogItemId>,
613        audit_events: &mut Vec<VersionedEvent>,
614        tx: &mut Transaction<'_>,
615        state: &CatalogState,
616        storage_collections_to_create: &mut BTreeSet<GlobalId>,
617        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
618        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
619    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
620        let mut weird_builtin_table_update = None;
621        let mut temporary_item_updates = Vec::new();
622
623        match op {
624            Op::TransactionDryRun => {
625                unreachable!("TransactionDryRun can only be used a final element of ops")
626            }
627            Op::AlterRetainHistory { id, value, window } => {
628                let entry = state.get_entry(&id);
629                if id.is_system() {
630                    let name = entry.name();
631                    let full_name =
632                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
633                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
634                        full_name.to_string(),
635                    ))));
636                }
637
638                let mut new_entry = entry.clone();
639                let previous = new_entry
640                    .item
641                    .update_retain_history(value.clone(), window)
642                    .map_err(|_| {
643                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
644                            "planner should have rejected invalid alter retain history item type"
645                                .to_string(),
646                        )))
647                    })?;
648
649                if Self::should_audit_log_item(new_entry.item()) {
650                    let details =
651                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
652                            id: id.to_string(),
653                            old_history: previous.map(|previous| previous.to_string()),
654                            new_history: value.map(|v| v.to_string()),
655                        });
656                    CatalogState::add_to_audit_log(
657                        &state.system_configuration,
658                        oracle_write_ts,
659                        session,
660                        tx,
661                        audit_events,
662                        EventType::Alter,
663                        catalog_type_to_audit_object_type(new_entry.item().typ()),
664                        details,
665                    )?;
666                }
667
668                tx.update_item(id, new_entry.into())?;
669
670                Self::log_update(state, &id);
671            }
672            Op::AlterRole {
673                id,
674                name,
675                attributes,
676                nopassword,
677                vars,
678            } => {
679                state.ensure_not_reserved_role(&id)?;
680
681                let mut existing_role = state.get_role(&id).clone();
682                let password = attributes.password.clone();
683                let scram_iterations = attributes
684                    .scram_iterations
685                    .unwrap_or_else(|| state.system_config().scram_iterations());
686                existing_role.attributes = attributes.into();
687                existing_role.vars = vars;
688                let password_action = if nopassword {
689                    PasswordAction::Clear
690                } else if let Some(password) = password {
691                    PasswordAction::Set(PasswordConfig {
692                        password,
693                        scram_iterations,
694                    })
695                } else {
696                    PasswordAction::NoChange
697                };
698                tx.update_role(id, existing_role.into(), password_action)?;
699
700                CatalogState::add_to_audit_log(
701                    &state.system_configuration,
702                    oracle_write_ts,
703                    session,
704                    tx,
705                    audit_events,
706                    EventType::Alter,
707                    ObjectType::Role,
708                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
709                        id: id.to_string(),
710                        name: name.clone(),
711                    }),
712                )?;
713
714                info!("update role {name} ({id})");
715            }
716            Op::AlterNetworkPolicy {
717                id,
718                rules,
719                name,
720                owner_id: _owner_id,
721            } => {
722                let existing_policy = state.get_network_policy(&id).clone();
723                let mut policy: NetworkPolicy = existing_policy.into();
724                policy.rules = rules;
725                if is_reserved_name(&name) {
726                    return Err(AdapterError::Catalog(Error::new(
727                        ErrorKind::ReservedNetworkPolicyName(name),
728                    )));
729                }
730                tx.update_network_policy(id, policy.clone())?;
731
732                CatalogState::add_to_audit_log(
733                    &state.system_configuration,
734                    oracle_write_ts,
735                    session,
736                    tx,
737                    audit_events,
738                    EventType::Alter,
739                    ObjectType::NetworkPolicy,
740                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
741                        id: id.to_string(),
742                        name: name.clone(),
743                    }),
744                )?;
745
746                info!("update network policy {name} ({id})");
747            }
748            Op::AlterAddColumn {
749                id,
750                new_global_id,
751                name,
752                typ,
753                sql,
754            } => {
755                let mut new_entry = state.get_entry(&id).clone();
756                let version = new_entry.item.add_column(name, typ, sql)?;
757                // All versions of a table share the same shard, so it shouldn't matter what
758                // GlobalId we use here.
759                let shard_id = state
760                    .storage_metadata()
761                    .get_collection_shard(new_entry.latest_global_id())?;
762
763                // TODO(alter_table): Support adding columns to sources.
764                let CatalogItem::Table(table) = &mut new_entry.item else {
765                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
766                };
767                table.collections.insert(version, new_global_id);
768
769                tx.update_item(id, new_entry.into())?;
770                storage_collections_to_register.insert(new_global_id, shard_id);
771            }
772            Op::CreateDatabase { name, owner_id } => {
773                let database_owner_privileges = vec![rbac::owner_privilege(
774                    mz_sql::catalog::ObjectType::Database,
775                    owner_id,
776                )];
777                let database_default_privileges = state
778                    .default_privileges
779                    .get_applicable_privileges(
780                        owner_id,
781                        None,
782                        None,
783                        mz_sql::catalog::ObjectType::Database,
784                    )
785                    .map(|item| item.mz_acl_item(owner_id));
786                let database_privileges: Vec<_> = merge_mz_acl_items(
787                    database_owner_privileges
788                        .into_iter()
789                        .chain(database_default_privileges),
790                )
791                .collect();
792
793                let schema_owner_privileges = vec![rbac::owner_privilege(
794                    mz_sql::catalog::ObjectType::Schema,
795                    owner_id,
796                )];
797                let schema_default_privileges = state
798                    .default_privileges
799                    .get_applicable_privileges(
800                        owner_id,
801                        None,
802                        None,
803                        mz_sql::catalog::ObjectType::Schema,
804                    )
805                    .map(|item| item.mz_acl_item(owner_id))
806                    // Special default privilege on public schemas.
807                    .chain(std::iter::once(MzAclItem {
808                        grantee: RoleId::Public,
809                        grantor: owner_id,
810                        acl_mode: AclMode::USAGE,
811                    }));
812                let schema_privileges: Vec<_> = merge_mz_acl_items(
813                    schema_owner_privileges
814                        .into_iter()
815                        .chain(schema_default_privileges),
816                )
817                .collect();
818
819                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
820                let (database_id, _) = tx.insert_user_database(
821                    &name,
822                    owner_id,
823                    database_privileges.clone(),
824                    &temporary_oids,
825                )?;
826                let (schema_id, _) = tx.insert_user_schema(
827                    database_id,
828                    DEFAULT_SCHEMA,
829                    owner_id,
830                    schema_privileges.clone(),
831                    &temporary_oids,
832                )?;
833                CatalogState::add_to_audit_log(
834                    &state.system_configuration,
835                    oracle_write_ts,
836                    session,
837                    tx,
838                    audit_events,
839                    EventType::Create,
840                    ObjectType::Database,
841                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
842                        id: database_id.to_string(),
843                        name: name.clone(),
844                    }),
845                )?;
846                info!("create database {}", name);
847
848                CatalogState::add_to_audit_log(
849                    &state.system_configuration,
850                    oracle_write_ts,
851                    session,
852                    tx,
853                    audit_events,
854                    EventType::Create,
855                    ObjectType::Schema,
856                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
857                        id: schema_id.to_string(),
858                        name: DEFAULT_SCHEMA.to_string(),
859                        database_name: Some(name),
860                    }),
861                )?;
862            }
863            Op::CreateSchema {
864                database_id,
865                schema_name,
866                owner_id,
867            } => {
868                if is_reserved_name(&schema_name) {
869                    return Err(AdapterError::Catalog(Error::new(
870                        ErrorKind::ReservedSchemaName(schema_name),
871                    )));
872                }
873                let database_id = match database_id {
874                    ResolvedDatabaseSpecifier::Id(id) => id,
875                    ResolvedDatabaseSpecifier::Ambient => {
876                        return Err(AdapterError::Catalog(Error::new(
877                            ErrorKind::ReadOnlySystemSchema(schema_name),
878                        )));
879                    }
880                };
881                let owner_privileges = vec![rbac::owner_privilege(
882                    mz_sql::catalog::ObjectType::Schema,
883                    owner_id,
884                )];
885                let default_privileges = state
886                    .default_privileges
887                    .get_applicable_privileges(
888                        owner_id,
889                        Some(database_id),
890                        None,
891                        mz_sql::catalog::ObjectType::Schema,
892                    )
893                    .map(|item| item.mz_acl_item(owner_id));
894                let privileges: Vec<_> =
895                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
896                        .collect();
897                let (schema_id, _) = tx.insert_user_schema(
898                    database_id,
899                    &schema_name,
900                    owner_id,
901                    privileges.clone(),
902                    &state.get_temporary_oids().collect(),
903                )?;
904                CatalogState::add_to_audit_log(
905                    &state.system_configuration,
906                    oracle_write_ts,
907                    session,
908                    tx,
909                    audit_events,
910                    EventType::Create,
911                    ObjectType::Schema,
912                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
913                        id: schema_id.to_string(),
914                        name: schema_name.clone(),
915                        database_name: Some(state.database_by_id[&database_id].name.clone()),
916                    }),
917                )?;
918            }
919            Op::CreateRole { name, attributes } => {
920                if is_reserved_role_name(&name) {
921                    return Err(AdapterError::Catalog(Error::new(
922                        ErrorKind::ReservedRoleName(name),
923                    )));
924                }
925                let membership = RoleMembership::new();
926                let vars = RoleVars::default();
927                let (id, _) = tx.insert_user_role(
928                    name.clone(),
929                    attributes.clone(),
930                    membership.clone(),
931                    vars.clone(),
932                    &state.get_temporary_oids().collect(),
933                )?;
934                CatalogState::add_to_audit_log(
935                    &state.system_configuration,
936                    oracle_write_ts,
937                    session,
938                    tx,
939                    audit_events,
940                    EventType::Create,
941                    ObjectType::Role,
942                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
943                        id: id.to_string(),
944                        name: name.clone(),
945                    }),
946                )?;
947                info!("create role {}", name);
948            }
949            Op::CreateCluster {
950                id,
951                name,
952                introspection_sources,
953                owner_id,
954                config,
955            } => {
956                if is_reserved_name(&name) {
957                    return Err(AdapterError::Catalog(Error::new(
958                        ErrorKind::ReservedClusterName(name),
959                    )));
960                }
961                let owner_privileges = vec![rbac::owner_privilege(
962                    mz_sql::catalog::ObjectType::Cluster,
963                    owner_id,
964                )];
965                let default_privileges = state
966                    .default_privileges
967                    .get_applicable_privileges(
968                        owner_id,
969                        None,
970                        None,
971                        mz_sql::catalog::ObjectType::Cluster,
972                    )
973                    .map(|item| item.mz_acl_item(owner_id));
974                let privileges: Vec<_> =
975                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
976                        .collect();
977                let introspection_source_ids: Vec<_> = introspection_sources
978                    .iter()
979                    .map(|introspection_source| {
980                        Transaction::allocate_introspection_source_index_id(
981                            &id,
982                            introspection_source.variant,
983                        )
984                    })
985                    .collect();
986
987                let introspection_sources = introspection_sources
988                    .into_iter()
989                    .zip_eq(introspection_source_ids)
990                    .map(|(log, (item_id, gid))| (log, item_id, gid))
991                    .collect();
992
993                tx.insert_user_cluster(
994                    id,
995                    &name,
996                    introspection_sources,
997                    owner_id,
998                    privileges.clone(),
999                    config.clone().into(),
1000                    &state.get_temporary_oids().collect(),
1001                )?;
1002                CatalogState::add_to_audit_log(
1003                    &state.system_configuration,
1004                    oracle_write_ts,
1005                    session,
1006                    tx,
1007                    audit_events,
1008                    EventType::Create,
1009                    ObjectType::Cluster,
1010                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1011                        id: id.to_string(),
1012                        name: name.clone(),
1013                    }),
1014                )?;
1015                info!("create cluster {}", name);
1016            }
1017            Op::CreateClusterReplica {
1018                cluster_id,
1019                name,
1020                config,
1021                owner_id,
1022                reason,
1023            } => {
1024                if is_reserved_name(&name) {
1025                    return Err(AdapterError::Catalog(Error::new(
1026                        ErrorKind::ReservedReplicaName(name),
1027                    )));
1028                }
1029                let cluster = state.get_cluster(cluster_id);
1030                let id =
1031                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1032                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1033                    size,
1034                    billed_as,
1035                    internal,
1036                    ..
1037                }) = &config.location
1038                {
1039                    let (reason, scheduling_policies) = reason.into_audit_log();
1040                    let details = EventDetails::CreateClusterReplicaV4(
1041                        mz_audit_log::CreateClusterReplicaV4 {
1042                            cluster_id: cluster_id.to_string(),
1043                            cluster_name: cluster.name.clone(),
1044                            replica_id: Some(id.to_string()),
1045                            replica_name: name.clone(),
1046                            logical_size: size.clone(),
1047                            billed_as: billed_as.clone(),
1048                            internal: *internal,
1049                            reason,
1050                            scheduling_policies,
1051                        },
1052                    );
1053                    CatalogState::add_to_audit_log(
1054                        &state.system_configuration,
1055                        oracle_write_ts,
1056                        session,
1057                        tx,
1058                        audit_events,
1059                        EventType::Create,
1060                        ObjectType::ClusterReplica,
1061                        details,
1062                    )?;
1063                }
1064            }
1065            Op::CreateItem {
1066                id,
1067                name,
1068                item,
1069                owner_id,
1070            } => {
1071                state.check_unstable_dependencies(&item)?;
1072
1073                match &item {
1074                    CatalogItem::Table(table) => {
1075                        let gids: Vec<_> = table.global_ids().collect();
1076                        assert_eq!(gids.len(), 1);
1077                        storage_collections_to_create.extend(gids);
1078                    }
1079                    CatalogItem::Source(source) => {
1080                        storage_collections_to_create.insert(source.global_id());
1081                    }
1082                    CatalogItem::MaterializedView(mv) => {
1083                        storage_collections_to_create.insert(mv.global_id_writes());
1084                    }
1085                    CatalogItem::ContinualTask(ct) => {
1086                        storage_collections_to_create.insert(ct.global_id());
1087                    }
1088                    CatalogItem::Sink(sink) => {
1089                        storage_collections_to_create.insert(sink.global_id());
1090                    }
1091                    CatalogItem::Log(_)
1092                    | CatalogItem::View(_)
1093                    | CatalogItem::Index(_)
1094                    | CatalogItem::Type(_)
1095                    | CatalogItem::Func(_)
1096                    | CatalogItem::Secret(_)
1097                    | CatalogItem::Connection(_) => (),
1098                }
1099
1100                let system_user = session.map_or(false, |s| s.user().is_system_user());
1101                if !system_user {
1102                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1103                        let cluster_name = state.clusters_by_id[&id].name.clone();
1104                        return Err(AdapterError::Catalog(Error::new(
1105                            ErrorKind::ReadOnlyCluster(cluster_name),
1106                        )));
1107                    }
1108                }
1109
1110                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1111                let default_privileges = state
1112                    .default_privileges
1113                    .get_applicable_privileges(
1114                        owner_id,
1115                        name.qualifiers.database_spec.id(),
1116                        Some(name.qualifiers.schema_spec.into()),
1117                        item.typ().into(),
1118                    )
1119                    .map(|item| item.mz_acl_item(owner_id));
1120                // mz_support can read all progress sources.
1121                let progress_source_privilege = if item.is_progress_source() {
1122                    Some(MzAclItem {
1123                        grantee: MZ_SUPPORT_ROLE_ID,
1124                        grantor: owner_id,
1125                        acl_mode: AclMode::SELECT,
1126                    })
1127                } else {
1128                    None
1129                };
1130                let privileges: Vec<_> = merge_mz_acl_items(
1131                    owner_privileges
1132                        .into_iter()
1133                        .chain(default_privileges)
1134                        .chain(progress_source_privilege),
1135                )
1136                .collect();
1137
1138                let temporary_oids = state.get_temporary_oids().collect();
1139
1140                if item.is_temporary() {
1141                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1142                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1143                    {
1144                        return Err(AdapterError::Catalog(Error::new(
1145                            ErrorKind::InvalidTemporarySchema,
1146                        )));
1147                    }
1148                    let oid = tx.allocate_oid(&temporary_oids)?;
1149                    let item = TemporaryItem {
1150                        id,
1151                        oid,
1152                        name: name.clone(),
1153                        item: item.clone(),
1154                        owner_id,
1155                        privileges: PrivilegeMap::from_mz_acl_items(privileges),
1156                    };
1157                    temporary_item_updates.push((item, StateDiff::Addition));
1158                } else {
1159                    if let Some(temp_id) =
1160                        item.uses()
1161                            .iter()
1162                            .find(|id| match state.try_get_entry(*id) {
1163                                Some(entry) => entry.item().is_temporary(),
1164                                None => temporary_ids.contains(id),
1165                            })
1166                    {
1167                        let temp_item = state.get_entry(temp_id);
1168                        return Err(AdapterError::Catalog(Error::new(
1169                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1170                        )));
1171                    }
1172                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1173                        && !system_user
1174                    {
1175                        let schema_name = state
1176                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1177                            .schema;
1178                        return Err(AdapterError::Catalog(Error::new(
1179                            ErrorKind::ReadOnlySystemSchema(schema_name),
1180                        )));
1181                    }
1182                    let schema_id = name.qualifiers.schema_spec.clone().into();
1183                    let item_type = item.typ();
1184                    let (create_sql, global_id, versions) = item.to_serialized();
1185                    tx.insert_user_item(
1186                        id,
1187                        global_id,
1188                        schema_id,
1189                        &name.item,
1190                        create_sql,
1191                        owner_id,
1192                        privileges.clone(),
1193                        &temporary_oids,
1194                        versions,
1195                    )?;
1196                    info!(
1197                        "create {} {} ({})",
1198                        item_type,
1199                        state.resolve_full_name(&name, None),
1200                        id
1201                    );
1202                }
1203
1204                if Self::should_audit_log_item(&item) {
1205                    let name = Self::full_name_detail(
1206                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1207                    );
1208                    let details = match &item {
1209                        CatalogItem::Source(s) => {
1210                            let cluster_id = match s.data_source {
1211                                // Ingestion exports don't have their own cluster, but
1212                                // run on their ingestion's cluster.
1213                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1214                                    match state.get_entry(&ingestion_id).cluster_id() {
1215                                        Some(cluster_id) => Some(cluster_id.to_string()),
1216                                        None => None,
1217                                    }
1218                                }
1219                                _ => match item.cluster_id() {
1220                                    Some(cluster_id) => Some(cluster_id.to_string()),
1221                                    None => None,
1222                                },
1223                            };
1224
1225                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1226                                id: id.to_string(),
1227                                cluster_id,
1228                                name,
1229                                external_type: s.source_type().to_string(),
1230                            })
1231                        }
1232                        CatalogItem::Sink(s) => {
1233                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1234                                id: id.to_string(),
1235                                cluster_id: Some(s.cluster_id.to_string()),
1236                                name,
1237                                external_type: s.sink_type().to_string(),
1238                            })
1239                        }
1240                        CatalogItem::Index(i) => {
1241                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1242                                id: id.to_string(),
1243                                name,
1244                                cluster_id: i.cluster_id.to_string(),
1245                            })
1246                        }
1247                        CatalogItem::MaterializedView(mv) => {
1248                            EventDetails::CreateMaterializedViewV1(
1249                                mz_audit_log::CreateMaterializedViewV1 {
1250                                    id: id.to_string(),
1251                                    name,
1252                                    cluster_id: mv.cluster_id.to_string(),
1253                                },
1254                            )
1255                        }
1256                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1257                            id: id.to_string(),
1258                            name,
1259                        }),
1260                    };
1261                    CatalogState::add_to_audit_log(
1262                        &state.system_configuration,
1263                        oracle_write_ts,
1264                        session,
1265                        tx,
1266                        audit_events,
1267                        EventType::Create,
1268                        catalog_type_to_audit_object_type(item.typ()),
1269                        details,
1270                    )?;
1271                }
1272            }
1273            Op::CreateNetworkPolicy {
1274                rules,
1275                name,
1276                owner_id,
1277            } => {
1278                if state.network_policies_by_name.contains_key(&name) {
1279                    return Err(AdapterError::PlanError(PlanError::Catalog(
1280                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1281                    )));
1282                }
1283                if is_reserved_name(&name) {
1284                    return Err(AdapterError::Catalog(Error::new(
1285                        ErrorKind::ReservedNetworkPolicyName(name),
1286                    )));
1287                }
1288
1289                let owner_privileges = vec![rbac::owner_privilege(
1290                    mz_sql::catalog::ObjectType::NetworkPolicy,
1291                    owner_id,
1292                )];
1293                let default_privileges = state
1294                    .default_privileges
1295                    .get_applicable_privileges(
1296                        owner_id,
1297                        None,
1298                        None,
1299                        mz_sql::catalog::ObjectType::NetworkPolicy,
1300                    )
1301                    .map(|item| item.mz_acl_item(owner_id));
1302                let privileges: Vec<_> =
1303                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1304                        .collect();
1305
1306                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1307                let id = tx.insert_user_network_policy(
1308                    name.clone(),
1309                    rules,
1310                    privileges,
1311                    owner_id,
1312                    &temporary_oids,
1313                )?;
1314
1315                CatalogState::add_to_audit_log(
1316                    &state.system_configuration,
1317                    oracle_write_ts,
1318                    session,
1319                    tx,
1320                    audit_events,
1321                    EventType::Create,
1322                    ObjectType::NetworkPolicy,
1323                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1324                        id: id.to_string(),
1325                        name: name.clone(),
1326                    }),
1327                )?;
1328
1329                info!("created network policy {name} ({id})");
1330            }
1331            Op::Comment {
1332                object_id,
1333                sub_component,
1334                comment,
1335            } => {
1336                tx.update_comment(object_id, sub_component, comment)?;
1337                let entry = state.get_comment_id_entry(&object_id);
1338                let should_log = entry
1339                    .map(|entry| Self::should_audit_log_item(entry.item()))
1340                    // Things that aren't catalog entries can't be temp, so should be logged.
1341                    .unwrap_or(true);
1342                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1343                // comments won't be logged for now.
1344                if let (Some(conn_id), true) =
1345                    (session.map(|session| session.conn_id()), should_log)
1346                {
1347                    CatalogState::add_to_audit_log(
1348                        &state.system_configuration,
1349                        oracle_write_ts,
1350                        session,
1351                        tx,
1352                        audit_events,
1353                        EventType::Comment,
1354                        comment_id_to_audit_object_type(object_id),
1355                        EventDetails::IdNameV1(IdNameV1 {
1356                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1357                            id: format!("{object_id:?}"),
1358                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1359                        }),
1360                    )?;
1361                }
1362            }
1363            Op::UpdateSourceReferences {
1364                source_id,
1365                references,
1366            } => {
1367                tx.update_source_references(
1368                    source_id,
1369                    references
1370                        .references
1371                        .into_iter()
1372                        .map(|reference| reference.into())
1373                        .collect(),
1374                    references.updated_at,
1375                )?;
1376            }
1377            Op::DropObjects(drop_object_infos) => {
1378                // Generate all of the objects that need to get dropped.
1379                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1380
1381                // Drop any associated comments.
1382                tx.drop_comments(&delta.comments)?;
1383
1384                // Drop any items.
1385                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1386                    delta
1387                        .items
1388                        .iter()
1389                        .map(|id| id)
1390                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1391                tx.remove_items(&durable_items_to_drop)?;
1392                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1393                    let entry = state.get_entry(&id);
1394                    (entry.clone().into(), StateDiff::Retraction)
1395                }));
1396
1397                for item_id in delta.items {
1398                    let entry = state.get_entry(&item_id);
1399
1400                    if entry.item().is_storage_collection() {
1401                        storage_collections_to_drop.extend(entry.global_ids());
1402                    }
1403
1404                    if state.source_references.contains_key(&item_id) {
1405                        tx.remove_source_references(item_id)?;
1406                    }
1407
1408                    if Self::should_audit_log_item(entry.item()) {
1409                        CatalogState::add_to_audit_log(
1410                            &state.system_configuration,
1411                            oracle_write_ts,
1412                            session,
1413                            tx,
1414                            audit_events,
1415                            EventType::Drop,
1416                            catalog_type_to_audit_object_type(entry.item().typ()),
1417                            EventDetails::IdFullNameV1(IdFullNameV1 {
1418                                id: item_id.to_string(),
1419                                name: Self::full_name_detail(&state.resolve_full_name(
1420                                    entry.name(),
1421                                    session.map(|session| session.conn_id()),
1422                                )),
1423                            }),
1424                        )?;
1425                    }
1426                    info!(
1427                        "drop {} {} ({})",
1428                        entry.item_type(),
1429                        state.resolve_full_name(entry.name(), entry.conn_id()),
1430                        item_id
1431                    );
1432                }
1433
1434                // Drop any schemas.
1435                let schemas = delta
1436                    .schemas
1437                    .iter()
1438                    .map(|(schema_spec, database_spec)| {
1439                        (SchemaId::from(schema_spec), *database_spec)
1440                    })
1441                    .collect();
1442                tx.remove_schemas(&schemas)?;
1443
1444                for (schema_spec, database_spec) in delta.schemas {
1445                    let schema = state.get_schema(
1446                        &database_spec,
1447                        &schema_spec,
1448                        session
1449                            .map(|session| session.conn_id())
1450                            .unwrap_or(&SYSTEM_CONN_ID),
1451                    );
1452
1453                    let schema_id = SchemaId::from(schema_spec);
1454                    let database_id = match database_spec {
1455                        ResolvedDatabaseSpecifier::Ambient => None,
1456                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1457                    };
1458
1459                    CatalogState::add_to_audit_log(
1460                        &state.system_configuration,
1461                        oracle_write_ts,
1462                        session,
1463                        tx,
1464                        audit_events,
1465                        EventType::Drop,
1466                        ObjectType::Schema,
1467                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1468                            id: schema_id.to_string(),
1469                            name: schema.name.schema.to_string(),
1470                            database_name: database_id
1471                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1472                        }),
1473                    )?;
1474                }
1475
1476                // Drop any databases.
1477                tx.remove_databases(&delta.databases)?;
1478
1479                for database_id in delta.databases {
1480                    let database = state.get_database(&database_id).clone();
1481
1482                    CatalogState::add_to_audit_log(
1483                        &state.system_configuration,
1484                        oracle_write_ts,
1485                        session,
1486                        tx,
1487                        audit_events,
1488                        EventType::Drop,
1489                        ObjectType::Database,
1490                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1491                            id: database_id.to_string(),
1492                            name: database.name.clone(),
1493                        }),
1494                    )?;
1495                }
1496
1497                // Drop any roles.
1498                tx.remove_user_roles(&delta.roles)?;
1499
1500                for role_id in delta.roles {
1501                    let role = state
1502                        .roles_by_id
1503                        .get(&role_id)
1504                        .expect("catalog out of sync");
1505
1506                    CatalogState::add_to_audit_log(
1507                        &state.system_configuration,
1508                        oracle_write_ts,
1509                        session,
1510                        tx,
1511                        audit_events,
1512                        EventType::Drop,
1513                        ObjectType::Role,
1514                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1515                            id: role.id.to_string(),
1516                            name: role.name.clone(),
1517                        }),
1518                    )?;
1519                    info!("drop role {}", role.name());
1520                }
1521
1522                // Drop any network policies.
1523                tx.remove_network_policies(&delta.network_policies)?;
1524
1525                for network_policy_id in delta.network_policies {
1526                    let policy = state
1527                        .network_policies_by_id
1528                        .get(&network_policy_id)
1529                        .expect("catalog out of sync");
1530
1531                    CatalogState::add_to_audit_log(
1532                        &state.system_configuration,
1533                        oracle_write_ts,
1534                        session,
1535                        tx,
1536                        audit_events,
1537                        EventType::Drop,
1538                        ObjectType::NetworkPolicy,
1539                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1540                            id: policy.id.to_string(),
1541                            name: policy.name.clone(),
1542                        }),
1543                    )?;
1544                    info!("drop network policy {}", policy.name.clone());
1545                }
1546
1547                // Drop any replicas.
1548                let replicas = delta.replicas.keys().copied().collect();
1549                tx.remove_cluster_replicas(&replicas)?;
1550
1551                for (replica_id, (cluster_id, reason)) in delta.replicas {
1552                    let cluster = state.get_cluster(cluster_id);
1553                    let replica = cluster.replica(replica_id).expect("Must exist");
1554
1555                    let (reason, scheduling_policies) = reason.into_audit_log();
1556                    let details =
1557                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1558                            cluster_id: cluster_id.to_string(),
1559                            cluster_name: cluster.name.clone(),
1560                            replica_id: Some(replica_id.to_string()),
1561                            replica_name: replica.name.clone(),
1562                            reason,
1563                            scheduling_policies,
1564                        });
1565                    CatalogState::add_to_audit_log(
1566                        &state.system_configuration,
1567                        oracle_write_ts,
1568                        session,
1569                        tx,
1570                        audit_events,
1571                        EventType::Drop,
1572                        ObjectType::ClusterReplica,
1573                        details,
1574                    )?;
1575                }
1576
1577                // Drop any clusters.
1578                tx.remove_clusters(&delta.clusters)?;
1579
1580                for cluster_id in delta.clusters {
1581                    let cluster = state.get_cluster(cluster_id);
1582
1583                    CatalogState::add_to_audit_log(
1584                        &state.system_configuration,
1585                        oracle_write_ts,
1586                        session,
1587                        tx,
1588                        audit_events,
1589                        EventType::Drop,
1590                        ObjectType::Cluster,
1591                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1592                            id: cluster.id.to_string(),
1593                            name: cluster.name.clone(),
1594                        }),
1595                    )?;
1596                }
1597            }
1598            Op::GrantRole {
1599                role_id,
1600                member_id,
1601                grantor_id,
1602            } => {
1603                state.ensure_not_reserved_role(&member_id)?;
1604                state.ensure_grantable_role(&role_id)?;
1605                if state.collect_role_membership(&role_id).contains(&member_id) {
1606                    let group_role = state.get_role(&role_id);
1607                    let member_role = state.get_role(&member_id);
1608                    return Err(AdapterError::Catalog(Error::new(
1609                        ErrorKind::CircularRoleMembership {
1610                            role_name: group_role.name().to_string(),
1611                            member_name: member_role.name().to_string(),
1612                        },
1613                    )));
1614                }
1615                let mut member_role = state.get_role(&member_id).clone();
1616                member_role.membership.map.insert(role_id, grantor_id);
1617                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1618
1619                CatalogState::add_to_audit_log(
1620                    &state.system_configuration,
1621                    oracle_write_ts,
1622                    session,
1623                    tx,
1624                    audit_events,
1625                    EventType::Grant,
1626                    ObjectType::Role,
1627                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1628                        role_id: role_id.to_string(),
1629                        member_id: member_id.to_string(),
1630                        grantor_id: grantor_id.to_string(),
1631                        executed_by: session
1632                            .map(|session| session.authenticated_role_id())
1633                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1634                            .to_string(),
1635                    }),
1636                )?;
1637            }
1638            Op::RevokeRole {
1639                role_id,
1640                member_id,
1641                grantor_id,
1642            } => {
1643                state.ensure_not_reserved_role(&member_id)?;
1644                state.ensure_grantable_role(&role_id)?;
1645                let mut member_role = state.get_role(&member_id).clone();
1646                member_role.membership.map.remove(&role_id);
1647                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1648
1649                CatalogState::add_to_audit_log(
1650                    &state.system_configuration,
1651                    oracle_write_ts,
1652                    session,
1653                    tx,
1654                    audit_events,
1655                    EventType::Revoke,
1656                    ObjectType::Role,
1657                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1658                        role_id: role_id.to_string(),
1659                        member_id: member_id.to_string(),
1660                        grantor_id: grantor_id.to_string(),
1661                        executed_by: session
1662                            .map(|session| session.authenticated_role_id())
1663                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1664                            .to_string(),
1665                    }),
1666                )?;
1667            }
1668            Op::UpdatePrivilege {
1669                target_id,
1670                privilege,
1671                variant,
1672            } => {
1673                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1674                    UpdatePrivilegeVariant::Grant => {
1675                        privileges.grant(privilege);
1676                    }
1677                    UpdatePrivilegeVariant::Revoke => {
1678                        privileges.revoke(&privilege);
1679                    }
1680                };
1681                match &target_id {
1682                    SystemObjectId::Object(object_id) => match object_id {
1683                        ObjectId::Cluster(id) => {
1684                            let mut cluster = state.get_cluster(*id).clone();
1685                            update_privilege_fn(&mut cluster.privileges);
1686                            tx.update_cluster(*id, cluster.into())?;
1687                        }
1688                        ObjectId::Database(id) => {
1689                            let mut database = state.get_database(id).clone();
1690                            update_privilege_fn(&mut database.privileges);
1691                            tx.update_database(*id, database.into())?;
1692                        }
1693                        ObjectId::NetworkPolicy(id) => {
1694                            let mut policy = state.get_network_policy(id).clone();
1695                            update_privilege_fn(&mut policy.privileges);
1696                            tx.update_network_policy(*id, policy.into())?;
1697                        }
1698                        ObjectId::Schema((database_spec, schema_spec)) => {
1699                            let schema_id = schema_spec.clone().into();
1700                            let mut schema = state
1701                                .get_schema(
1702                                    database_spec,
1703                                    schema_spec,
1704                                    session
1705                                        .map(|session| session.conn_id())
1706                                        .unwrap_or(&SYSTEM_CONN_ID),
1707                                )
1708                                .clone();
1709                            update_privilege_fn(&mut schema.privileges);
1710                            tx.update_schema(schema_id, schema.into())?;
1711                        }
1712                        ObjectId::Item(id) => {
1713                            let entry = state.get_entry(id);
1714                            let mut new_entry = entry.clone();
1715                            update_privilege_fn(&mut new_entry.privileges);
1716                            if !new_entry.item().is_temporary() {
1717                                tx.update_item(*id, new_entry.into())?;
1718                            } else {
1719                                temporary_item_updates
1720                                    .push((entry.clone().into(), StateDiff::Retraction));
1721                                temporary_item_updates
1722                                    .push((new_entry.into(), StateDiff::Addition));
1723                            }
1724                        }
1725                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1726                    },
1727                    SystemObjectId::System => {
1728                        let mut system_privileges = state.system_privileges.clone();
1729                        update_privilege_fn(&mut system_privileges);
1730                        let new_privilege =
1731                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1732                        tx.set_system_privilege(
1733                            privilege.grantee,
1734                            privilege.grantor,
1735                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1736                        )?;
1737                    }
1738                }
1739                let object_type = state.get_system_object_type(&target_id);
1740                let object_id_str = match &target_id {
1741                    SystemObjectId::System => "SYSTEM".to_string(),
1742                    SystemObjectId::Object(id) => id.to_string(),
1743                };
1744                CatalogState::add_to_audit_log(
1745                    &state.system_configuration,
1746                    oracle_write_ts,
1747                    session,
1748                    tx,
1749                    audit_events,
1750                    variant.into(),
1751                    system_object_type_to_audit_object_type(&object_type),
1752                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1753                        object_id: object_id_str,
1754                        grantee_id: privilege.grantee.to_string(),
1755                        grantor_id: privilege.grantor.to_string(),
1756                        privileges: privilege.acl_mode.to_string(),
1757                    }),
1758                )?;
1759            }
1760            Op::UpdateDefaultPrivilege {
1761                privilege_object,
1762                privilege_acl_item,
1763                variant,
1764            } => {
1765                let mut default_privileges = state.default_privileges.clone();
1766                match variant {
1767                    UpdatePrivilegeVariant::Grant => default_privileges
1768                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1769                    UpdatePrivilegeVariant::Revoke => {
1770                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1771                    }
1772                }
1773                let new_acl_mode = default_privileges
1774                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1775                tx.set_default_privilege(
1776                    privilege_object.role_id,
1777                    privilege_object.database_id,
1778                    privilege_object.schema_id,
1779                    privilege_object.object_type,
1780                    privilege_acl_item.grantee,
1781                    new_acl_mode.cloned(),
1782                )?;
1783                CatalogState::add_to_audit_log(
1784                    &state.system_configuration,
1785                    oracle_write_ts,
1786                    session,
1787                    tx,
1788                    audit_events,
1789                    variant.into(),
1790                    object_type_to_audit_object_type(privilege_object.object_type),
1791                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1792                        role_id: privilege_object.role_id.to_string(),
1793                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1794                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1795                        grantee_id: privilege_acl_item.grantee.to_string(),
1796                        privileges: privilege_acl_item.acl_mode.to_string(),
1797                    }),
1798                )?;
1799            }
1800            Op::RenameCluster {
1801                id,
1802                name,
1803                to_name,
1804                check_reserved_names,
1805            } => {
1806                if id.is_system() {
1807                    return Err(AdapterError::Catalog(Error::new(
1808                        ErrorKind::ReadOnlyCluster(name.clone()),
1809                    )));
1810                }
1811                if check_reserved_names && is_reserved_name(&to_name) {
1812                    return Err(AdapterError::Catalog(Error::new(
1813                        ErrorKind::ReservedClusterName(to_name),
1814                    )));
1815                }
1816                tx.rename_cluster(id, &name, &to_name)?;
1817                CatalogState::add_to_audit_log(
1818                    &state.system_configuration,
1819                    oracle_write_ts,
1820                    session,
1821                    tx,
1822                    audit_events,
1823                    EventType::Alter,
1824                    ObjectType::Cluster,
1825                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1826                        id: id.to_string(),
1827                        old_name: name.clone(),
1828                        new_name: to_name.clone(),
1829                    }),
1830                )?;
1831                info!("rename cluster {name} to {to_name}");
1832            }
1833            Op::RenameClusterReplica {
1834                cluster_id,
1835                replica_id,
1836                name,
1837                to_name,
1838            } => {
1839                if is_reserved_name(&to_name) {
1840                    return Err(AdapterError::Catalog(Error::new(
1841                        ErrorKind::ReservedReplicaName(to_name),
1842                    )));
1843                }
1844                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1845                CatalogState::add_to_audit_log(
1846                    &state.system_configuration,
1847                    oracle_write_ts,
1848                    session,
1849                    tx,
1850                    audit_events,
1851                    EventType::Alter,
1852                    ObjectType::ClusterReplica,
1853                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1854                        cluster_id: cluster_id.to_string(),
1855                        replica_id: replica_id.to_string(),
1856                        old_name: name.replica.as_str().to_string(),
1857                        new_name: to_name.clone(),
1858                    }),
1859                )?;
1860                info!("rename cluster replica {name} to {to_name}");
1861            }
1862            Op::RenameItem {
1863                id,
1864                to_name,
1865                current_full_name,
1866            } => {
1867                let mut updates = Vec::new();
1868
1869                let entry = state.get_entry(&id);
1870                if let CatalogItem::Type(_) = entry.item() {
1871                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1872                        current_full_name.to_string(),
1873                    ))));
1874                }
1875
1876                if entry.id().is_system() {
1877                    let name = state
1878                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1879                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1880                        name.to_string(),
1881                    ))));
1882                }
1883
1884                let mut to_full_name = current_full_name.clone();
1885                to_full_name.item.clone_from(&to_name);
1886
1887                let mut to_qualified_name = entry.name().clone();
1888                to_qualified_name.item.clone_from(&to_name);
1889
1890                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1891                    id: id.to_string(),
1892                    old_name: Self::full_name_detail(&current_full_name),
1893                    new_name: Self::full_name_detail(&to_full_name),
1894                });
1895                if Self::should_audit_log_item(entry.item()) {
1896                    CatalogState::add_to_audit_log(
1897                        &state.system_configuration,
1898                        oracle_write_ts,
1899                        session,
1900                        tx,
1901                        audit_events,
1902                        EventType::Alter,
1903                        catalog_type_to_audit_object_type(entry.item().typ()),
1904                        details,
1905                    )?;
1906                }
1907
1908                // Rename item itself.
1909                let mut new_entry = entry.clone();
1910                new_entry.name.item.clone_from(&to_name);
1911                new_entry.item = entry
1912                    .item()
1913                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1914                    .map_err(|e| {
1915                        Error::new(ErrorKind::from(AmbiguousRename {
1916                            depender: state
1917                                .resolve_full_name(entry.name(), entry.conn_id())
1918                                .to_string(),
1919                            dependee: state
1920                                .resolve_full_name(entry.name(), entry.conn_id())
1921                                .to_string(),
1922                            message: e,
1923                        }))
1924                    })?;
1925
1926                for id in entry.referenced_by() {
1927                    let dependent_item = state.get_entry(id);
1928                    let mut to_entry = dependent_item.clone();
1929                    to_entry.item = dependent_item
1930                        .item()
1931                        .rename_item_refs(
1932                            current_full_name.clone(),
1933                            to_full_name.item.clone(),
1934                            false,
1935                        )
1936                        .map_err(|e| {
1937                            Error::new(ErrorKind::from(AmbiguousRename {
1938                                depender: state
1939                                    .resolve_full_name(
1940                                        dependent_item.name(),
1941                                        dependent_item.conn_id(),
1942                                    )
1943                                    .to_string(),
1944                                dependee: state
1945                                    .resolve_full_name(entry.name(), entry.conn_id())
1946                                    .to_string(),
1947                                message: e,
1948                            }))
1949                        })?;
1950
1951                    if !to_entry.item().is_temporary() {
1952                        tx.update_item(*id, to_entry.into())?;
1953                    } else {
1954                        temporary_item_updates
1955                            .push((dependent_item.clone().into(), StateDiff::Retraction));
1956                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1957                    }
1958                    updates.push(*id);
1959                }
1960                if !new_entry.item().is_temporary() {
1961                    tx.update_item(id, new_entry.into())?;
1962                } else {
1963                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1964                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1965                }
1966
1967                updates.push(id);
1968                for id in updates {
1969                    Self::log_update(state, &id);
1970                }
1971            }
1972            Op::RenameSchema {
1973                database_spec,
1974                schema_spec,
1975                new_name,
1976                check_reserved_names,
1977            } => {
1978                if check_reserved_names && is_reserved_name(&new_name) {
1979                    return Err(AdapterError::Catalog(Error::new(
1980                        ErrorKind::ReservedSchemaName(new_name),
1981                    )));
1982                }
1983
1984                let conn_id = session
1985                    .map(|session| session.conn_id())
1986                    .unwrap_or(&SYSTEM_CONN_ID);
1987
1988                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1989                let cur_name = schema.name().schema.clone();
1990
1991                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1992                    return Err(AdapterError::Catalog(Error::new(
1993                        ErrorKind::AmbientSchemaRename(cur_name),
1994                    )));
1995                };
1996                let database = state.get_database(&database_id);
1997                let database_name = &database.name;
1998
1999                let mut updates = Vec::new();
2000                let mut items_to_update = BTreeMap::new();
2001
2002                let mut update_item = |id| {
2003                    if items_to_update.contains_key(id) {
2004                        return Ok(());
2005                    }
2006
2007                    let entry = state.get_entry(id);
2008
2009                    // Update our item.
2010                    let mut new_entry = entry.clone();
2011                    new_entry.item = entry
2012                        .item
2013                        .rename_schema_refs(database_name, &cur_name, &new_name)
2014                        .map_err(|(s, _i)| {
2015                            Error::new(ErrorKind::from(AmbiguousRename {
2016                                depender: state
2017                                    .resolve_full_name(entry.name(), entry.conn_id())
2018                                    .to_string(),
2019                                dependee: format!("{database_name}.{cur_name}"),
2020                                message: format!("ambiguous reference to schema named {s}"),
2021                            }))
2022                        })?;
2023
2024                    // Queue updates for Catalog storage and Builtin Tables.
2025                    if !new_entry.item().is_temporary() {
2026                        items_to_update.insert(*id, new_entry.into());
2027                    } else {
2028                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2029                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2030                    }
2031                    updates.push(id);
2032
2033                    Ok::<_, AdapterError>(())
2034                };
2035
2036                // Update all of the items in the schema.
2037                for (_name, item_id) in &schema.items {
2038                    // Update the item itself.
2039                    update_item(item_id)?;
2040
2041                    // Update everything that depends on this item.
2042                    for id in state.get_entry(item_id).referenced_by() {
2043                        update_item(id)?;
2044                    }
2045                }
2046                // Note: When updating the transaction it's very important that we update the
2047                // items as a whole group, otherwise we exhibit quadratic behavior.
2048                tx.update_items(items_to_update)?;
2049
2050                // Renaming temporary schemas is not supported.
2051                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2052                    let schema_name = schema.name().schema.clone();
2053                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2054                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2055                    )));
2056                };
2057
2058                // Add an entry to the audit log.
2059                let database_name = database_spec
2060                    .id()
2061                    .map(|id| state.get_database(&id).name.clone());
2062                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2063                    id: schema_id.to_string(),
2064                    old_name: schema.name().schema.clone(),
2065                    new_name: new_name.clone(),
2066                    database_name,
2067                });
2068                CatalogState::add_to_audit_log(
2069                    &state.system_configuration,
2070                    oracle_write_ts,
2071                    session,
2072                    tx,
2073                    audit_events,
2074                    EventType::Alter,
2075                    mz_audit_log::ObjectType::Schema,
2076                    details,
2077                )?;
2078
2079                // Update the schema itself.
2080                let mut new_schema = schema.clone();
2081                new_schema.name.schema.clone_from(&new_name);
2082                tx.update_schema(schema_id, new_schema.into())?;
2083
2084                for id in updates {
2085                    Self::log_update(state, id);
2086                }
2087            }
2088            Op::UpdateOwner { id, new_owner } => {
2089                let conn_id = session
2090                    .map(|session| session.conn_id())
2091                    .unwrap_or(&SYSTEM_CONN_ID);
2092                let old_owner = state
2093                    .get_owner_id(&id, conn_id)
2094                    .expect("cannot update the owner of an object without an owner");
2095                match &id {
2096                    ObjectId::Cluster(id) => {
2097                        let mut cluster = state.get_cluster(*id).clone();
2098                        if id.is_system() {
2099                            return Err(AdapterError::Catalog(Error::new(
2100                                ErrorKind::ReadOnlyCluster(cluster.name),
2101                            )));
2102                        }
2103                        Self::update_privilege_owners(
2104                            &mut cluster.privileges,
2105                            cluster.owner_id,
2106                            new_owner,
2107                        );
2108                        cluster.owner_id = new_owner;
2109                        tx.update_cluster(*id, cluster.into())?;
2110                    }
2111                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2112                        let cluster = state.get_cluster(*cluster_id);
2113                        let mut replica = cluster
2114                            .replica(*replica_id)
2115                            .expect("catalog out of sync")
2116                            .clone();
2117                        if replica_id.is_system() {
2118                            return Err(AdapterError::Catalog(Error::new(
2119                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2120                            )));
2121                        }
2122                        replica.owner_id = new_owner;
2123                        tx.update_cluster_replica(*replica_id, replica.into())?;
2124                    }
2125                    ObjectId::Database(id) => {
2126                        let mut database = state.get_database(id).clone();
2127                        if id.is_system() {
2128                            return Err(AdapterError::Catalog(Error::new(
2129                                ErrorKind::ReadOnlyDatabase(database.name),
2130                            )));
2131                        }
2132                        Self::update_privilege_owners(
2133                            &mut database.privileges,
2134                            database.owner_id,
2135                            new_owner,
2136                        );
2137                        database.owner_id = new_owner;
2138                        tx.update_database(*id, database.clone().into())?;
2139                    }
2140                    ObjectId::Schema((database_spec, schema_spec)) => {
2141                        let schema_id: SchemaId = schema_spec.clone().into();
2142                        let mut schema = state
2143                            .get_schema(database_spec, schema_spec, conn_id)
2144                            .clone();
2145                        if schema_id.is_system() {
2146                            let name = schema.name();
2147                            let full_name = state.resolve_full_schema_name(name);
2148                            return Err(AdapterError::Catalog(Error::new(
2149                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2150                            )));
2151                        }
2152                        Self::update_privilege_owners(
2153                            &mut schema.privileges,
2154                            schema.owner_id,
2155                            new_owner,
2156                        );
2157                        schema.owner_id = new_owner;
2158                        tx.update_schema(schema_id, schema.into())?;
2159                    }
2160                    ObjectId::Item(id) => {
2161                        let entry = state.get_entry(id);
2162                        let mut new_entry = entry.clone();
2163                        if id.is_system() {
2164                            let full_name = state.resolve_full_name(
2165                                new_entry.name(),
2166                                session.map(|session| session.conn_id()),
2167                            );
2168                            return Err(AdapterError::Catalog(Error::new(
2169                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2170                            )));
2171                        }
2172                        Self::update_privilege_owners(
2173                            &mut new_entry.privileges,
2174                            new_entry.owner_id,
2175                            new_owner,
2176                        );
2177                        new_entry.owner_id = new_owner;
2178                        if !new_entry.item().is_temporary() {
2179                            tx.update_item(*id, new_entry.into())?;
2180                        } else {
2181                            temporary_item_updates
2182                                .push((entry.clone().into(), StateDiff::Retraction));
2183                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2184                        }
2185                    }
2186                    ObjectId::NetworkPolicy(id) => {
2187                        let mut policy = state.get_network_policy(id).clone();
2188                        if id.is_system() {
2189                            return Err(AdapterError::Catalog(Error::new(
2190                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2191                            )));
2192                        }
2193                        Self::update_privilege_owners(
2194                            &mut policy.privileges,
2195                            policy.owner_id,
2196                            new_owner,
2197                        );
2198                        policy.owner_id = new_owner;
2199                        tx.update_network_policy(*id, policy.into())?;
2200                    }
2201                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2202                }
2203                let object_type = state.get_object_type(&id);
2204                CatalogState::add_to_audit_log(
2205                    &state.system_configuration,
2206                    oracle_write_ts,
2207                    session,
2208                    tx,
2209                    audit_events,
2210                    EventType::Alter,
2211                    object_type_to_audit_object_type(object_type),
2212                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2213                        object_id: id.to_string(),
2214                        old_owner_id: old_owner.to_string(),
2215                        new_owner_id: new_owner.to_string(),
2216                    }),
2217                )?;
2218            }
2219            Op::UpdateClusterConfig { id, name, config } => {
2220                let mut cluster = state.get_cluster(id).clone();
2221                cluster.config = config;
2222                tx.update_cluster(id, cluster.into())?;
2223                info!("update cluster {}", name);
2224
2225                CatalogState::add_to_audit_log(
2226                    &state.system_configuration,
2227                    oracle_write_ts,
2228                    session,
2229                    tx,
2230                    audit_events,
2231                    EventType::Alter,
2232                    ObjectType::Cluster,
2233                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2234                        id: id.to_string(),
2235                        name,
2236                    }),
2237                )?;
2238            }
2239            Op::UpdateClusterReplicaConfig {
2240                replica_id,
2241                cluster_id,
2242                config,
2243            } => {
2244                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2245                info!("update replica {}", replica.name);
2246                tx.update_cluster_replica(
2247                    replica_id,
2248                    mz_catalog::durable::ClusterReplica {
2249                        cluster_id,
2250                        replica_id,
2251                        name: replica.name.clone(),
2252                        config: config.clone().into(),
2253                        owner_id: replica.owner_id,
2254                    },
2255                )?;
2256            }
2257            Op::UpdateItem { id, name, to_item } => {
2258                let mut entry = state.get_entry(&id).clone();
2259                entry.name = name.clone();
2260                entry.item = to_item.clone();
2261                tx.update_item(id, entry.into())?;
2262
2263                if Self::should_audit_log_item(&to_item) {
2264                    let mut full_name = Self::full_name_detail(
2265                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2266                    );
2267                    full_name.item = name.item;
2268
2269                    CatalogState::add_to_audit_log(
2270                        &state.system_configuration,
2271                        oracle_write_ts,
2272                        session,
2273                        tx,
2274                        audit_events,
2275                        EventType::Alter,
2276                        catalog_type_to_audit_object_type(to_item.typ()),
2277                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2278                            id: id.to_string(),
2279                            name: full_name,
2280                        }),
2281                    )?;
2282                }
2283
2284                Self::log_update(state, &id);
2285            }
2286            Op::UpdateSystemConfiguration { name, value } => {
2287                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2288                tx.upsert_system_config(&name, parsed_value.clone())?;
2289                // This mirrors some "system vars" into the catalog storage
2290                // "config" collection so that we can toggle the flag with
2291                // Launch Darkly, but use it in boot before Launch Darkly is
2292                // available.
2293                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2294                    let with_0dt_deployment_max_wait =
2295                        Duration::parse(VarInput::Flat(&parsed_value))
2296                            .expect("parsing succeeded above");
2297                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2298                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2299                    let with_0dt_deployment_ddl_check_interval =
2300                        Duration::parse(VarInput::Flat(&parsed_value))
2301                            .expect("parsing succeeded above");
2302                    tx.set_0dt_deployment_ddl_check_interval(
2303                        with_0dt_deployment_ddl_check_interval,
2304                    )?;
2305                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2306                    let panic_after_timeout =
2307                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2308                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2309                }
2310
2311                CatalogState::add_to_audit_log(
2312                    &state.system_configuration,
2313                    oracle_write_ts,
2314                    session,
2315                    tx,
2316                    audit_events,
2317                    EventType::Alter,
2318                    ObjectType::System,
2319                    EventDetails::SetV1(mz_audit_log::SetV1 {
2320                        name,
2321                        value: Some(value.borrow().to_vec().join(", ")),
2322                    }),
2323                )?;
2324            }
2325            Op::ResetSystemConfiguration { name } => {
2326                tx.remove_system_config(&name);
2327                // This mirrors some "system vars" into the catalog storage
2328                // "config" collection so that we can toggle the flag with
2329                // Launch Darkly, but use it in boot before Launch Darkly is
2330                // available.
2331                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2332                    tx.reset_0dt_deployment_max_wait()?;
2333                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2334                    tx.reset_0dt_deployment_ddl_check_interval()?;
2335                }
2336
2337                CatalogState::add_to_audit_log(
2338                    &state.system_configuration,
2339                    oracle_write_ts,
2340                    session,
2341                    tx,
2342                    audit_events,
2343                    EventType::Alter,
2344                    ObjectType::System,
2345                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2346                )?;
2347            }
2348            Op::ResetAllSystemConfiguration => {
2349                tx.clear_system_configs();
2350                tx.reset_0dt_deployment_max_wait()?;
2351                tx.reset_0dt_deployment_ddl_check_interval()?;
2352
2353                CatalogState::add_to_audit_log(
2354                    &state.system_configuration,
2355                    oracle_write_ts,
2356                    session,
2357                    tx,
2358                    audit_events,
2359                    EventType::Alter,
2360                    ObjectType::System,
2361                    EventDetails::ResetAllV1,
2362                )?;
2363            }
2364            Op::WeirdStorageUsageUpdates {
2365                object_id,
2366                size_bytes,
2367                collection_timestamp,
2368            } => {
2369                let id = tx.allocate_storage_usage_ids()?;
2370                let metric =
2371                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2372                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2373                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2374                weird_builtin_table_update = Some(builtin_table_update);
2375            }
2376        };
2377        Ok((weird_builtin_table_update, temporary_item_updates))
2378    }
2379
2380    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2381        let entry = state.get_entry(id);
2382        info!(
2383            "update {} {} ({})",
2384            entry.item_type(),
2385            state.resolve_full_name(entry.name(), entry.conn_id()),
2386            id
2387        );
2388    }
2389
2390    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2391    /// implementation:
2392    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2393    fn update_privilege_owners(
2394        privileges: &mut PrivilegeMap,
2395        old_owner: RoleId,
2396        new_owner: RoleId,
2397    ) {
2398        // TODO(jkosh44) Would be nice not to clone every privilege.
2399        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2400
2401        let mut new_present = false;
2402        for privilege in flat_privileges.iter_mut() {
2403            // Old owner's granted privilege are updated to be granted by the new
2404            // owner.
2405            if privilege.grantor == old_owner {
2406                privilege.grantor = new_owner;
2407            } else if privilege.grantor == new_owner {
2408                new_present = true;
2409            }
2410            // Old owner's privileges is given to the new owner.
2411            if privilege.grantee == old_owner {
2412                privilege.grantee = new_owner;
2413            } else if privilege.grantee == new_owner {
2414                new_present = true;
2415            }
2416        }
2417
2418        // If the old privilege list contained references to the new owner, we may
2419        // have created duplicate entries. Here we try and consolidate them. This
2420        // is inspired by PostgreSQL's algorithm but not identical.
2421        if new_present {
2422            // Group privileges by (grantee, grantor).
2423            let privilege_map: BTreeMap<_, Vec<_>> =
2424                flat_privileges
2425                    .into_iter()
2426                    .fold(BTreeMap::new(), |mut accum, privilege| {
2427                        accum
2428                            .entry((privilege.grantee, privilege.grantor))
2429                            .or_default()
2430                            .push(privilege);
2431                        accum
2432                    });
2433
2434            // Consolidate and update all privileges.
2435            flat_privileges = privilege_map
2436                .into_iter()
2437                .map(|((grantee, grantor), values)|
2438                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2439                    values.into_iter().fold(
2440                        MzAclItem::empty(grantee, grantor),
2441                        |mut accum, mz_aclitem| {
2442                            accum.acl_mode =
2443                                accum.acl_mode.union(mz_aclitem.acl_mode);
2444                            accum
2445                        },
2446                    ))
2447                .collect();
2448        }
2449
2450        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2451    }
2452}
2453
2454/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2455///
2456/// Note: Previously we used to omit a single `Op::DropObject` for every object
2457/// we needed to drop. But removing a batch of objects from a durable Catalog
2458/// Transaction is O(n) where `n` is the number of objects that exist in the
2459/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2460/// `DROP ... CASCADE` statement.
2461#[derive(Debug, Default)]
2462pub(crate) struct ObjectsToDrop {
2463    pub comments: BTreeSet<CommentObjectId>,
2464    pub databases: BTreeSet<DatabaseId>,
2465    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2466    pub clusters: BTreeSet<ClusterId>,
2467    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2468    pub roles: BTreeSet<RoleId>,
2469    pub items: Vec<CatalogItemId>,
2470    pub network_policies: BTreeSet<NetworkPolicyId>,
2471}
2472
2473impl ObjectsToDrop {
2474    pub fn generate(
2475        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2476        state: &CatalogState,
2477        session: Option<&ConnMeta>,
2478    ) -> Result<Self, AdapterError> {
2479        let mut delta = ObjectsToDrop::default();
2480
2481        for drop_object_info in drop_object_infos {
2482            delta.add_item(drop_object_info, state, session)?;
2483        }
2484
2485        Ok(delta)
2486    }
2487
2488    fn add_item(
2489        &mut self,
2490        drop_object_info: DropObjectInfo,
2491        state: &CatalogState,
2492        session: Option<&ConnMeta>,
2493    ) -> Result<(), AdapterError> {
2494        self.comments
2495            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2496
2497        match drop_object_info {
2498            DropObjectInfo::Database(database_id) => {
2499                let database = &state.database_by_id[&database_id];
2500                if database_id.is_system() {
2501                    return Err(AdapterError::Catalog(Error::new(
2502                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2503                    )));
2504                }
2505
2506                self.databases.insert(database_id);
2507            }
2508            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2509                let schema = state.get_schema(
2510                    &database_spec,
2511                    &schema_spec,
2512                    session
2513                        .map(|session| session.conn_id())
2514                        .unwrap_or(&SYSTEM_CONN_ID),
2515                );
2516                let schema_id: SchemaId = schema_spec.into();
2517                if schema_id.is_system() {
2518                    let name = schema.name();
2519                    let full_name = state.resolve_full_schema_name(name);
2520                    return Err(AdapterError::Catalog(Error::new(
2521                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2522                    )));
2523                }
2524
2525                self.schemas.insert(schema_spec, database_spec);
2526            }
2527            DropObjectInfo::Role(role_id) => {
2528                let name = state.get_role(&role_id).name().to_string();
2529                if role_id.is_system() || role_id.is_predefined() {
2530                    return Err(AdapterError::Catalog(Error::new(
2531                        ErrorKind::ReservedRoleName(name.clone()),
2532                    )));
2533                }
2534                state.ensure_not_reserved_role(&role_id)?;
2535
2536                self.roles.insert(role_id);
2537            }
2538            DropObjectInfo::Cluster(cluster_id) => {
2539                let cluster = state.get_cluster(cluster_id);
2540                let name = &cluster.name;
2541                if cluster_id.is_system() {
2542                    return Err(AdapterError::Catalog(Error::new(
2543                        ErrorKind::ReadOnlyCluster(name.clone()),
2544                    )));
2545                }
2546
2547                self.clusters.insert(cluster_id);
2548            }
2549            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2550                let cluster = state.get_cluster(cluster_id);
2551                let replica = cluster.replica(replica_id).expect("Must exist");
2552
2553                self.replicas
2554                    .insert(replica.replica_id, (cluster.id, reason));
2555            }
2556            DropObjectInfo::Item(item_id) => {
2557                let entry = state.get_entry(&item_id);
2558                if item_id.is_system() {
2559                    let name = entry.name();
2560                    let full_name =
2561                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2562                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2563                        full_name.to_string(),
2564                    ))));
2565                }
2566
2567                self.items.push(item_id);
2568            }
2569            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2570                let policy = state.get_network_policy(&network_policy_id);
2571                let name = &policy.name;
2572                if network_policy_id.is_system() {
2573                    return Err(AdapterError::Catalog(Error::new(
2574                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2575                    )));
2576                }
2577
2578                self.network_policies.insert(network_policy_id);
2579            }
2580        }
2581
2582        Ok(())
2583    }
2584}
2585
2586#[cfg(test)]
2587mod tests {
2588    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2589    use mz_repr::role_id::RoleId;
2590
2591    use crate::catalog::Catalog;
2592
2593    #[mz_ore::test]
2594    fn test_update_privilege_owners() {
2595        let old_owner = RoleId::User(1);
2596        let new_owner = RoleId::User(2);
2597        let other_role = RoleId::User(3);
2598
2599        // older owner exists as grantor.
2600        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2601            MzAclItem {
2602                grantee: other_role,
2603                grantor: old_owner,
2604                acl_mode: AclMode::UPDATE,
2605            },
2606            MzAclItem {
2607                grantee: other_role,
2608                grantor: new_owner,
2609                acl_mode: AclMode::SELECT,
2610            },
2611        ]);
2612        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2613        assert_eq!(1, privileges.all_values().count());
2614        assert_eq!(
2615            vec![MzAclItem {
2616                grantee: other_role,
2617                grantor: new_owner,
2618                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2619            }],
2620            privileges.all_values_owned().collect::<Vec<_>>()
2621        );
2622
2623        // older owner exists as grantee.
2624        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2625            MzAclItem {
2626                grantee: old_owner,
2627                grantor: other_role,
2628                acl_mode: AclMode::UPDATE,
2629            },
2630            MzAclItem {
2631                grantee: new_owner,
2632                grantor: other_role,
2633                acl_mode: AclMode::SELECT,
2634            },
2635        ]);
2636        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2637        assert_eq!(1, privileges.all_values().count());
2638        assert_eq!(
2639            vec![MzAclItem {
2640                grantee: new_owner,
2641                grantor: other_role,
2642                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2643            }],
2644            privileges.all_values_owned().collect::<Vec<_>>()
2645        );
2646
2647        // older owner exists as grantee and grantor.
2648        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2649            MzAclItem {
2650                grantee: old_owner,
2651                grantor: old_owner,
2652                acl_mode: AclMode::UPDATE,
2653            },
2654            MzAclItem {
2655                grantee: new_owner,
2656                grantor: new_owner,
2657                acl_mode: AclMode::SELECT,
2658            },
2659        ]);
2660        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2661        assert_eq!(1, privileges.all_values().count());
2662        assert_eq!(
2663            vec![MzAclItem {
2664                grantee: new_owner,
2665                grantor: new_owner,
2666                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2667            }],
2668            privileges.all_values_owned().collect::<Vec<_>>()
2669        );
2670    }
2671}