mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use std::time::Duration;
16
17use itertools::Itertools;
18use mz_adapter_types::compaction::CompactionWindow;
19use mz_adapter_types::connection::ConnectionId;
20use mz_adapter_types::dyncfgs::{
21    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
22    WITH_0DT_DEPLOYMENT_MAX_WAIT,
23};
24use mz_audit_log::{
25    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
26    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
27};
28use mz_catalog::SYSTEM_CONN_ID;
29use mz_catalog::builtin::BuiltinLog;
30use mz_catalog::durable::{NetworkPolicy, Transaction};
31use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
34    StateUpdateKind, TemporaryItem,
35};
36use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
37use mz_controller_types::{ClusterId, ReplicaId};
38use mz_ore::collections::HashSet;
39use mz_ore::instrument;
40use mz_ore::now::EpochMillis;
41use mz_persist_types::ShardId;
42use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
43use mz_repr::network_policy_id::NetworkPolicyId;
44use mz_repr::role_id::RoleId;
45use mz_repr::{CatalogItemId, ColumnName, Diff, GlobalId, SqlColumnType, strconv};
46use mz_sql::ast::RawDataType;
47use mz_sql::catalog::{
48    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
49    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, PasswordAction,
50    RoleAttributesRaw, RoleMembership, RoleVars,
51};
52use mz_sql::names::{
53    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
54    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
55};
56use mz_sql::plan::{NetworkPolicyRule, PlanError};
57use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
58use mz_sql::session::vars::OwnedVarInput;
59use mz_sql::session::vars::{Value as VarValue, VarInput};
60use mz_sql::{DEFAULT_SCHEMA, rbac};
61use mz_sql_parser::ast::{QualifiedReplica, Value};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{info, trace};
64
65use crate::AdapterError;
66use crate::catalog::{
67    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
68    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
69    is_reserved_role_name, object_type_to_audit_object_type,
70    system_object_type_to_audit_object_type,
71};
72use crate::coord::ConnMeta;
73use crate::coord::cluster_scheduling::SchedulingDecision;
74use crate::util::ResultExt;
75
76#[derive(Debug, Clone)]
77pub enum Op {
78    AlterRetainHistory {
79        id: CatalogItemId,
80        value: Option<Value>,
81        window: CompactionWindow,
82    },
83    AlterRole {
84        id: RoleId,
85        name: String,
86        attributes: RoleAttributesRaw,
87        nopassword: bool,
88        vars: RoleVars,
89    },
90    AlterNetworkPolicy {
91        id: NetworkPolicyId,
92        rules: Vec<NetworkPolicyRule>,
93        name: String,
94        owner_id: RoleId,
95    },
96    AlterAddColumn {
97        id: CatalogItemId,
98        new_global_id: GlobalId,
99        name: ColumnName,
100        typ: SqlColumnType,
101        sql: RawDataType,
102    },
103    CreateDatabase {
104        name: String,
105        owner_id: RoleId,
106    },
107    CreateSchema {
108        database_id: ResolvedDatabaseSpecifier,
109        schema_name: String,
110        owner_id: RoleId,
111    },
112    CreateRole {
113        name: String,
114        attributes: RoleAttributesRaw,
115    },
116    CreateCluster {
117        id: ClusterId,
118        name: String,
119        introspection_sources: Vec<&'static BuiltinLog>,
120        owner_id: RoleId,
121        config: ClusterConfig,
122    },
123    CreateClusterReplica {
124        cluster_id: ClusterId,
125        name: String,
126        config: ReplicaConfig,
127        owner_id: RoleId,
128        reason: ReplicaCreateDropReason,
129    },
130    CreateItem {
131        id: CatalogItemId,
132        name: QualifiedItemName,
133        item: CatalogItem,
134        owner_id: RoleId,
135    },
136    CreateNetworkPolicy {
137        rules: Vec<NetworkPolicyRule>,
138        name: String,
139        owner_id: RoleId,
140    },
141    Comment {
142        object_id: CommentObjectId,
143        sub_component: Option<usize>,
144        comment: Option<String>,
145    },
146    DropObjects(Vec<DropObjectInfo>),
147    GrantRole {
148        role_id: RoleId,
149        member_id: RoleId,
150        grantor_id: RoleId,
151    },
152    RenameCluster {
153        id: ClusterId,
154        name: String,
155        to_name: String,
156        check_reserved_names: bool,
157    },
158    RenameClusterReplica {
159        cluster_id: ClusterId,
160        replica_id: ReplicaId,
161        name: QualifiedReplica,
162        to_name: String,
163    },
164    RenameItem {
165        id: CatalogItemId,
166        current_full_name: FullItemName,
167        to_name: String,
168    },
169    RenameSchema {
170        database_spec: ResolvedDatabaseSpecifier,
171        schema_spec: SchemaSpecifier,
172        new_name: String,
173        check_reserved_names: bool,
174    },
175    UpdateOwner {
176        id: ObjectId,
177        new_owner: RoleId,
178    },
179    UpdatePrivilege {
180        target_id: SystemObjectId,
181        privilege: MzAclItem,
182        variant: UpdatePrivilegeVariant,
183    },
184    UpdateDefaultPrivilege {
185        privilege_object: DefaultPrivilegeObject,
186        privilege_acl_item: DefaultPrivilegeAclItem,
187        variant: UpdatePrivilegeVariant,
188    },
189    RevokeRole {
190        role_id: RoleId,
191        member_id: RoleId,
192        grantor_id: RoleId,
193    },
194    UpdateClusterConfig {
195        id: ClusterId,
196        name: String,
197        config: ClusterConfig,
198    },
199    UpdateClusterReplicaConfig {
200        cluster_id: ClusterId,
201        replica_id: ReplicaId,
202        config: ReplicaConfig,
203    },
204    UpdateItem {
205        id: CatalogItemId,
206        name: QualifiedItemName,
207        to_item: CatalogItem,
208    },
209    UpdateSourceReferences {
210        source_id: CatalogItemId,
211        references: SourceReferences,
212    },
213    UpdateSystemConfiguration {
214        name: String,
215        value: OwnedVarInput,
216    },
217    ResetSystemConfiguration {
218        name: String,
219    },
220    ResetAllSystemConfiguration,
221    /// Performs updates to the storage usage table, which probably should be a builtin source.
222    ///
223    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
224    /// might not work. If a process crashes after collecting storage usage events
225    /// but before updating the builtin table, then another listening catalog
226    /// will never know to update the builtin table.
227    WeirdStorageUsageUpdates {
228        object_id: Option<String>,
229        size_bytes: u64,
230        collection_timestamp: EpochMillis,
231    },
232    /// Performs a dry run of the commit, but errors with
233    /// [`AdapterError::TransactionDryRun`].
234    ///
235    /// When using this value, it should be included only as the last element of
236    /// the transaction and should not be the only value in the transaction.
237    TransactionDryRun,
238}
239
240/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
241/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
242/// the `Op::DropObjects`.
243#[derive(Debug, Clone)]
244pub enum DropObjectInfo {
245    Cluster(ClusterId),
246    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
247    Database(DatabaseId),
248    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
249    Role(RoleId),
250    Item(CatalogItemId),
251    NetworkPolicy(NetworkPolicyId),
252}
253
254impl DropObjectInfo {
255    /// Creates a `DropObjectInfo` from an `ObjectId`.
256    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
257    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
258        match id {
259            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
260            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
261                cluster_id,
262                replica_id,
263                ReplicaCreateDropReason::Manual,
264            )),
265            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
266            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
267            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
268            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
269            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
270        }
271    }
272
273    /// Creates an `ObjectId` from a `DropObjectInfo`.
274    /// Loses the `ReplicaCreateDropReason` if there is one!
275    fn to_object_id(&self) -> ObjectId {
276        match &self {
277            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
278            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
279                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
280            }
281            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
282            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
283            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
284            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
285            DropObjectInfo::NetworkPolicy(network_policy_id) => {
286                ObjectId::NetworkPolicy(network_policy_id.clone())
287            }
288        }
289    }
290}
291
292/// The reason for creating or dropping a replica.
293#[derive(Debug, Clone)]
294pub enum ReplicaCreateDropReason {
295    /// The user initiated the replica create or drop, e.g., by
296    /// - creating/dropping a cluster,
297    /// - ALTERing various options on a managed cluster,
298    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
299    Manual,
300    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
301    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
302    ClusterScheduling(Vec<SchedulingDecision>),
303}
304
305impl ReplicaCreateDropReason {
306    pub fn into_audit_log(
307        self,
308    ) -> (
309        CreateOrDropClusterReplicaReasonV1,
310        Option<SchedulingDecisionsWithReasonsV2>,
311    ) {
312        let (reason, scheduling_policies) = match self {
313            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
314            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
315                CreateOrDropClusterReplicaReasonV1::Schedule,
316                Some(scheduling_decisions),
317            ),
318        };
319        (
320            reason,
321            scheduling_policies
322                .as_ref()
323                .map(SchedulingDecision::reasons_to_audit_log_reasons),
324        )
325    }
326}
327
328pub struct TransactionResult {
329    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
330    pub audit_events: Vec<VersionedEvent>,
331}
332
333impl Catalog {
334    fn should_audit_log_item(item: &CatalogItem) -> bool {
335        !item.is_temporary()
336    }
337
338    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
339    /// within a connection id.
340    fn temporary_ids(
341        &self,
342        ops: &[Op],
343        temporary_drops: BTreeSet<(&ConnectionId, String)>,
344    ) -> Result<BTreeSet<CatalogItemId>, Error> {
345        let mut creating = BTreeSet::new();
346        let mut temporary_ids = BTreeSet::new();
347        for op in ops.iter() {
348            if let Op::CreateItem {
349                id,
350                name,
351                item,
352                owner_id: _,
353            } = op
354            {
355                if let Some(conn_id) = item.conn_id() {
356                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
357                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
358                        || creating.contains(&(conn_id, &name.item))
359                    {
360                        return Err(
361                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
362                        );
363                    } else {
364                        creating.insert((conn_id, &name.item));
365                        temporary_ids.insert(id.clone());
366                    }
367                }
368            }
369        }
370        Ok(temporary_ids)
371    }
372
373    #[instrument(name = "catalog::transact")]
374    pub async fn transact(
375        &mut self,
376        // n.b. this is an option to prevent us from needing to build out a
377        // dummy impl of `StorageController` for tests.
378        storage_collections: Option<
379            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
380        >,
381        oracle_write_ts: mz_repr::Timestamp,
382        session: Option<&ConnMeta>,
383        ops: Vec<Op>,
384    ) -> Result<TransactionResult, AdapterError> {
385        trace!("transact: {:?}", ops);
386        fail::fail_point!("catalog_transact", |arg| {
387            Err(AdapterError::Unstructured(anyhow::anyhow!(
388                "failpoint: {arg:?}"
389            )))
390        });
391
392        let drop_ids: BTreeSet<CatalogItemId> = ops
393            .iter()
394            .filter_map(|op| match op {
395                Op::DropObjects(drop_object_infos) => {
396                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
397                    let item_ids = ids.filter_map(|id| match id {
398                        ObjectId::Item(id) => Some(id),
399                        _ => None,
400                    });
401                    Some(item_ids)
402                }
403                _ => None,
404            })
405            .flatten()
406            .collect();
407        let temporary_drops = drop_ids
408            .iter()
409            .filter_map(|id| {
410                let entry = self.get_entry(id);
411                match entry.item().conn_id() {
412                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
413                    None => None,
414                }
415            })
416            .collect();
417        let dropped_global_ids = drop_ids
418            .iter()
419            .flat_map(|item_id| self.get_global_ids(item_id))
420            .collect();
421
422        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
423        let mut builtin_table_updates = vec![];
424        let mut audit_events = vec![];
425        let mut storage = self.storage().await;
426        let mut tx = storage
427            .transaction()
428            .await
429            .unwrap_or_terminate("starting catalog transaction");
430
431        let new_state = Self::transact_inner(
432            storage_collections,
433            oracle_write_ts,
434            session,
435            ops,
436            temporary_ids,
437            &mut builtin_table_updates,
438            &mut audit_events,
439            &mut tx,
440            &self.state,
441        )
442        .await?;
443
444        // The user closure was successful, apply the updates. Terminate the
445        // process if this fails, because we have to restart envd due to
446        // indeterminate catalog state, which we only reconcile during catalog
447        // init.
448        tx.commit(oracle_write_ts)
449            .await
450            .unwrap_or_terminate("catalog storage transaction commit must succeed");
451
452        // Dropping here keeps the mutable borrow on self, preventing us accidentally
453        // mutating anything until after f is executed.
454        drop(storage);
455        if let Some(new_state) = new_state {
456            self.transient_revision += 1;
457            self.state = new_state;
458        }
459
460        // Drop in-memory planning metadata.
461        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
462        if self.state.system_config().enable_mz_notices() {
463            // Generate retractions for the Builtin tables.
464            self.state().pack_optimizer_notices(
465                &mut builtin_table_updates,
466                dropped_notices.iter(),
467                Diff::MINUS_ONE,
468            );
469        }
470
471        Ok(TransactionResult {
472            builtin_table_updates,
473            audit_events,
474        })
475    }
476
477    /// Performs the transaction described by `ops` and returns the new state of the catalog, if
478    /// it has changed. If `ops` don't result in a change in the state this method returns `None`.
479    ///
480    /// # Panics
481    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
482    ///   final element.
483    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
484    #[instrument(name = "catalog::transact_inner")]
485    async fn transact_inner(
486        storage_collections: Option<
487            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
488        >,
489        oracle_write_ts: mz_repr::Timestamp,
490        session: Option<&ConnMeta>,
491        mut ops: Vec<Op>,
492        temporary_ids: BTreeSet<CatalogItemId>,
493        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
494        audit_events: &mut Vec<VersionedEvent>,
495        tx: &mut Transaction<'_>,
496        state: &CatalogState,
497    ) -> Result<Option<CatalogState>, AdapterError> {
498        let mut state = Cow::Borrowed(state);
499
500        let dry_run_ops = match ops.last() {
501            Some(Op::TransactionDryRun) => {
502                // Remove dry run marker.
503                ops.pop();
504                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
505                ops.clone()
506            }
507            Some(_) => vec![],
508            None => return Ok(None),
509        };
510
511        let mut storage_collections_to_create = BTreeSet::new();
512        let mut storage_collections_to_drop = BTreeSet::new();
513        let mut storage_collections_to_register = BTreeMap::new();
514
515        for op in ops {
516            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
517                oracle_write_ts,
518                session,
519                op,
520                &temporary_ids,
521                audit_events,
522                tx,
523                &*state,
524                &mut storage_collections_to_create,
525                &mut storage_collections_to_drop,
526                &mut storage_collections_to_register,
527            )
528            .await?;
529
530            // Certain builtin tables are not derived from the durable catalog state, so they need
531            // to be updated ad-hoc based on the current transaction. This is weird and will not
532            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
533            // this instance crashes after committing the durable catalog but before applying the
534            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
535            // world. Currently, this works fine and there are no correctness issues because
536            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
537            // state of all builtin tables to the correct values.
538            if let Some(builtin_table_update) = weird_builtin_table_update {
539                builtin_table_updates.push(builtin_table_update);
540            }
541
542            // Temporary items are not stored in the durable catalog, so they need to be handled
543            // separately for updating state and builtin tables.
544            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
545            // in a multi-subscriber catalog world.
546            let op_id = tx.op_id().into();
547            let temporary_item_updates =
548                temporary_item_updates
549                    .into_iter()
550                    .map(|(item, diff)| StateUpdate {
551                        kind: StateUpdateKind::TemporaryItem(item),
552                        ts: op_id,
553                        diff,
554                    });
555
556            let mut updates: Vec<_> = tx.get_and_commit_op_updates();
557            updates.extend(temporary_item_updates);
558            if !updates.is_empty() {
559                let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
560                let op_builtin_table_updates = state
561                    .to_mut()
562                    .resolve_builtin_table_updates(op_builtin_table_updates);
563                builtin_table_updates.extend(op_builtin_table_updates);
564            }
565        }
566
567        if dry_run_ops.is_empty() {
568            // `storage_collections` should only be `None` for tests.
569            if let Some(c) = storage_collections {
570                c.prepare_state(
571                    tx,
572                    storage_collections_to_create,
573                    storage_collections_to_drop,
574                    storage_collections_to_register,
575                )
576                .await?;
577            }
578
579            let updates = tx.get_and_commit_op_updates();
580            if !updates.is_empty() {
581                let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
582                let op_builtin_table_updates = state
583                    .to_mut()
584                    .resolve_builtin_table_updates(op_builtin_table_updates);
585                builtin_table_updates.extend(op_builtin_table_updates);
586            }
587
588            match state {
589                Cow::Owned(state) => Ok(Some(state)),
590                Cow::Borrowed(_) => Ok(None),
591            }
592        } else {
593            Err(AdapterError::TransactionDryRun {
594                new_ops: dry_run_ops,
595                new_state: state.into_owned(),
596            })
597        }
598    }
599
600    /// Performs the transaction operation described by `op`. This function prepares the changes in
601    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
602    /// changes.
603    ///
604    /// Optionally returns a builtin table update for any builtin table updates than cannot be
605    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
606    /// scenarios and ideally in the future don't exist.
607    #[instrument]
608    async fn transact_op(
609        oracle_write_ts: mz_repr::Timestamp,
610        session: Option<&ConnMeta>,
611        op: Op,
612        temporary_ids: &BTreeSet<CatalogItemId>,
613        audit_events: &mut Vec<VersionedEvent>,
614        tx: &mut Transaction<'_>,
615        state: &CatalogState,
616        storage_collections_to_create: &mut BTreeSet<GlobalId>,
617        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
618        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
619    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
620        let mut weird_builtin_table_update = None;
621        let mut temporary_item_updates = Vec::new();
622
623        match op {
624            Op::TransactionDryRun => {
625                unreachable!("TransactionDryRun can only be used a final element of ops")
626            }
627            Op::AlterRetainHistory { id, value, window } => {
628                let entry = state.get_entry(&id);
629                if id.is_system() {
630                    let name = entry.name();
631                    let full_name =
632                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
633                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
634                        full_name.to_string(),
635                    ))));
636                }
637
638                let mut new_entry = entry.clone();
639                let previous = new_entry
640                    .item
641                    .update_retain_history(value.clone(), window)
642                    .map_err(|_| {
643                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
644                            "planner should have rejected invalid alter retain history item type"
645                                .to_string(),
646                        )))
647                    })?;
648
649                if Self::should_audit_log_item(new_entry.item()) {
650                    let details =
651                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
652                            id: id.to_string(),
653                            old_history: previous.map(|previous| previous.to_string()),
654                            new_history: value.map(|v| v.to_string()),
655                        });
656                    CatalogState::add_to_audit_log(
657                        &state.system_configuration,
658                        oracle_write_ts,
659                        session,
660                        tx,
661                        audit_events,
662                        EventType::Alter,
663                        catalog_type_to_audit_object_type(new_entry.item().typ()),
664                        details,
665                    )?;
666                }
667
668                tx.update_item(id, new_entry.into())?;
669
670                Self::log_update(state, &id);
671            }
672            Op::AlterRole {
673                id,
674                name,
675                attributes,
676                nopassword,
677                vars,
678            } => {
679                state.ensure_not_reserved_role(&id)?;
680
681                let mut existing_role = state.get_role(&id).clone();
682                let password = attributes.password.clone();
683                existing_role.attributes = attributes.into();
684                existing_role.vars = vars;
685                let password_action = if nopassword {
686                    PasswordAction::Clear
687                } else if let Some(password) = password {
688                    PasswordAction::Set(password)
689                } else {
690                    PasswordAction::NoChange
691                };
692                tx.update_role(id, existing_role.into(), password_action)?;
693
694                CatalogState::add_to_audit_log(
695                    &state.system_configuration,
696                    oracle_write_ts,
697                    session,
698                    tx,
699                    audit_events,
700                    EventType::Alter,
701                    ObjectType::Role,
702                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
703                        id: id.to_string(),
704                        name: name.clone(),
705                    }),
706                )?;
707
708                info!("update role {name} ({id})");
709            }
710            Op::AlterNetworkPolicy {
711                id,
712                rules,
713                name,
714                owner_id: _owner_id,
715            } => {
716                let existing_policy = state.get_network_policy(&id).clone();
717                let mut policy: NetworkPolicy = existing_policy.into();
718                policy.rules = rules;
719                if is_reserved_name(&name) {
720                    return Err(AdapterError::Catalog(Error::new(
721                        ErrorKind::ReservedNetworkPolicyName(name),
722                    )));
723                }
724                tx.update_network_policy(id, policy.clone())?;
725
726                CatalogState::add_to_audit_log(
727                    &state.system_configuration,
728                    oracle_write_ts,
729                    session,
730                    tx,
731                    audit_events,
732                    EventType::Alter,
733                    ObjectType::NetworkPolicy,
734                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
735                        id: id.to_string(),
736                        name: name.clone(),
737                    }),
738                )?;
739
740                info!("update network policy {name} ({id})");
741            }
742            Op::AlterAddColumn {
743                id,
744                new_global_id,
745                name,
746                typ,
747                sql,
748            } => {
749                let mut new_entry = state.get_entry(&id).clone();
750                let version = new_entry.item.add_column(name, typ, sql)?;
751                // All versions of a table share the same shard, so it shouldn't matter what
752                // GlobalId we use here.
753                let shard_id = state
754                    .storage_metadata()
755                    .get_collection_shard(new_entry.latest_global_id())?;
756
757                // TODO(alter_table): Support adding columns to sources.
758                let CatalogItem::Table(table) = &mut new_entry.item else {
759                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
760                };
761                table.collections.insert(version, new_global_id);
762
763                tx.update_item(id, new_entry.into())?;
764                storage_collections_to_register.insert(new_global_id, shard_id);
765            }
766            Op::CreateDatabase { name, owner_id } => {
767                let database_owner_privileges = vec![rbac::owner_privilege(
768                    mz_sql::catalog::ObjectType::Database,
769                    owner_id,
770                )];
771                let database_default_privileges = state
772                    .default_privileges
773                    .get_applicable_privileges(
774                        owner_id,
775                        None,
776                        None,
777                        mz_sql::catalog::ObjectType::Database,
778                    )
779                    .map(|item| item.mz_acl_item(owner_id));
780                let database_privileges: Vec<_> = merge_mz_acl_items(
781                    database_owner_privileges
782                        .into_iter()
783                        .chain(database_default_privileges),
784                )
785                .collect();
786
787                let schema_owner_privileges = vec![rbac::owner_privilege(
788                    mz_sql::catalog::ObjectType::Schema,
789                    owner_id,
790                )];
791                let schema_default_privileges = state
792                    .default_privileges
793                    .get_applicable_privileges(
794                        owner_id,
795                        None,
796                        None,
797                        mz_sql::catalog::ObjectType::Schema,
798                    )
799                    .map(|item| item.mz_acl_item(owner_id))
800                    // Special default privilege on public schemas.
801                    .chain(std::iter::once(MzAclItem {
802                        grantee: RoleId::Public,
803                        grantor: owner_id,
804                        acl_mode: AclMode::USAGE,
805                    }));
806                let schema_privileges: Vec<_> = merge_mz_acl_items(
807                    schema_owner_privileges
808                        .into_iter()
809                        .chain(schema_default_privileges),
810                )
811                .collect();
812
813                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
814                let (database_id, _) = tx.insert_user_database(
815                    &name,
816                    owner_id,
817                    database_privileges.clone(),
818                    &temporary_oids,
819                )?;
820                let (schema_id, _) = tx.insert_user_schema(
821                    database_id,
822                    DEFAULT_SCHEMA,
823                    owner_id,
824                    schema_privileges.clone(),
825                    &temporary_oids,
826                )?;
827                CatalogState::add_to_audit_log(
828                    &state.system_configuration,
829                    oracle_write_ts,
830                    session,
831                    tx,
832                    audit_events,
833                    EventType::Create,
834                    ObjectType::Database,
835                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
836                        id: database_id.to_string(),
837                        name: name.clone(),
838                    }),
839                )?;
840                info!("create database {}", name);
841
842                CatalogState::add_to_audit_log(
843                    &state.system_configuration,
844                    oracle_write_ts,
845                    session,
846                    tx,
847                    audit_events,
848                    EventType::Create,
849                    ObjectType::Schema,
850                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
851                        id: schema_id.to_string(),
852                        name: DEFAULT_SCHEMA.to_string(),
853                        database_name: Some(name),
854                    }),
855                )?;
856            }
857            Op::CreateSchema {
858                database_id,
859                schema_name,
860                owner_id,
861            } => {
862                if is_reserved_name(&schema_name) {
863                    return Err(AdapterError::Catalog(Error::new(
864                        ErrorKind::ReservedSchemaName(schema_name),
865                    )));
866                }
867                let database_id = match database_id {
868                    ResolvedDatabaseSpecifier::Id(id) => id,
869                    ResolvedDatabaseSpecifier::Ambient => {
870                        return Err(AdapterError::Catalog(Error::new(
871                            ErrorKind::ReadOnlySystemSchema(schema_name),
872                        )));
873                    }
874                };
875                let owner_privileges = vec![rbac::owner_privilege(
876                    mz_sql::catalog::ObjectType::Schema,
877                    owner_id,
878                )];
879                let default_privileges = state
880                    .default_privileges
881                    .get_applicable_privileges(
882                        owner_id,
883                        Some(database_id),
884                        None,
885                        mz_sql::catalog::ObjectType::Schema,
886                    )
887                    .map(|item| item.mz_acl_item(owner_id));
888                let privileges: Vec<_> =
889                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
890                        .collect();
891                let (schema_id, _) = tx.insert_user_schema(
892                    database_id,
893                    &schema_name,
894                    owner_id,
895                    privileges.clone(),
896                    &state.get_temporary_oids().collect(),
897                )?;
898                CatalogState::add_to_audit_log(
899                    &state.system_configuration,
900                    oracle_write_ts,
901                    session,
902                    tx,
903                    audit_events,
904                    EventType::Create,
905                    ObjectType::Schema,
906                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
907                        id: schema_id.to_string(),
908                        name: schema_name.clone(),
909                        database_name: Some(state.database_by_id[&database_id].name.clone()),
910                    }),
911                )?;
912            }
913            Op::CreateRole { name, attributes } => {
914                if is_reserved_role_name(&name) {
915                    return Err(AdapterError::Catalog(Error::new(
916                        ErrorKind::ReservedRoleName(name),
917                    )));
918                }
919                let membership = RoleMembership::new();
920                let vars = RoleVars::default();
921                let (id, _) = tx.insert_user_role(
922                    name.clone(),
923                    attributes.clone(),
924                    membership.clone(),
925                    vars.clone(),
926                    &state.get_temporary_oids().collect(),
927                )?;
928                CatalogState::add_to_audit_log(
929                    &state.system_configuration,
930                    oracle_write_ts,
931                    session,
932                    tx,
933                    audit_events,
934                    EventType::Create,
935                    ObjectType::Role,
936                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
937                        id: id.to_string(),
938                        name: name.clone(),
939                    }),
940                )?;
941                info!("create role {}", name);
942            }
943            Op::CreateCluster {
944                id,
945                name,
946                introspection_sources,
947                owner_id,
948                config,
949            } => {
950                if is_reserved_name(&name) {
951                    return Err(AdapterError::Catalog(Error::new(
952                        ErrorKind::ReservedClusterName(name),
953                    )));
954                }
955                let owner_privileges = vec![rbac::owner_privilege(
956                    mz_sql::catalog::ObjectType::Cluster,
957                    owner_id,
958                )];
959                let default_privileges = state
960                    .default_privileges
961                    .get_applicable_privileges(
962                        owner_id,
963                        None,
964                        None,
965                        mz_sql::catalog::ObjectType::Cluster,
966                    )
967                    .map(|item| item.mz_acl_item(owner_id));
968                let privileges: Vec<_> =
969                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
970                        .collect();
971                let introspection_source_ids: Vec<_> = introspection_sources
972                    .iter()
973                    .map(|introspection_source| {
974                        Transaction::allocate_introspection_source_index_id(
975                            &id,
976                            introspection_source.variant,
977                        )
978                    })
979                    .collect();
980
981                let introspection_sources = introspection_sources
982                    .into_iter()
983                    .zip_eq(introspection_source_ids)
984                    .map(|(log, (item_id, gid))| (log, item_id, gid))
985                    .collect();
986
987                tx.insert_user_cluster(
988                    id,
989                    &name,
990                    introspection_sources,
991                    owner_id,
992                    privileges.clone(),
993                    config.clone().into(),
994                    &state.get_temporary_oids().collect(),
995                )?;
996                CatalogState::add_to_audit_log(
997                    &state.system_configuration,
998                    oracle_write_ts,
999                    session,
1000                    tx,
1001                    audit_events,
1002                    EventType::Create,
1003                    ObjectType::Cluster,
1004                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1005                        id: id.to_string(),
1006                        name: name.clone(),
1007                    }),
1008                )?;
1009                info!("create cluster {}", name);
1010            }
1011            Op::CreateClusterReplica {
1012                cluster_id,
1013                name,
1014                config,
1015                owner_id,
1016                reason,
1017            } => {
1018                if is_reserved_name(&name) {
1019                    return Err(AdapterError::Catalog(Error::new(
1020                        ErrorKind::ReservedReplicaName(name),
1021                    )));
1022                }
1023                let cluster = state.get_cluster(cluster_id);
1024                let id =
1025                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1026                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1027                    size,
1028                    billed_as,
1029                    internal,
1030                    ..
1031                }) = &config.location
1032                {
1033                    let (reason, scheduling_policies) = reason.into_audit_log();
1034                    let details = EventDetails::CreateClusterReplicaV4(
1035                        mz_audit_log::CreateClusterReplicaV4 {
1036                            cluster_id: cluster_id.to_string(),
1037                            cluster_name: cluster.name.clone(),
1038                            replica_id: Some(id.to_string()),
1039                            replica_name: name.clone(),
1040                            logical_size: size.clone(),
1041                            billed_as: billed_as.clone(),
1042                            internal: *internal,
1043                            reason,
1044                            scheduling_policies,
1045                        },
1046                    );
1047                    CatalogState::add_to_audit_log(
1048                        &state.system_configuration,
1049                        oracle_write_ts,
1050                        session,
1051                        tx,
1052                        audit_events,
1053                        EventType::Create,
1054                        ObjectType::ClusterReplica,
1055                        details,
1056                    )?;
1057                }
1058            }
1059            Op::CreateItem {
1060                id,
1061                name,
1062                item,
1063                owner_id,
1064            } => {
1065                state.check_unstable_dependencies(&item)?;
1066
1067                match &item {
1068                    CatalogItem::Table(table) => {
1069                        let gids: Vec<_> = table.global_ids().collect();
1070                        assert_eq!(gids.len(), 1);
1071                        storage_collections_to_create.extend(gids);
1072                    }
1073                    CatalogItem::Source(source) => {
1074                        storage_collections_to_create.insert(source.global_id());
1075                    }
1076                    CatalogItem::MaterializedView(mv) => {
1077                        storage_collections_to_create.insert(mv.global_id());
1078                    }
1079                    CatalogItem::ContinualTask(ct) => {
1080                        storage_collections_to_create.insert(ct.global_id());
1081                    }
1082                    CatalogItem::Sink(sink) => {
1083                        storage_collections_to_create.insert(sink.global_id());
1084                    }
1085                    CatalogItem::Log(_)
1086                    | CatalogItem::View(_)
1087                    | CatalogItem::Index(_)
1088                    | CatalogItem::Type(_)
1089                    | CatalogItem::Func(_)
1090                    | CatalogItem::Secret(_)
1091                    | CatalogItem::Connection(_) => (),
1092                }
1093
1094                let system_user = session.map_or(false, |s| s.user().is_system_user());
1095                if !system_user {
1096                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1097                        let cluster_name = state.clusters_by_id[&id].name.clone();
1098                        return Err(AdapterError::Catalog(Error::new(
1099                            ErrorKind::ReadOnlyCluster(cluster_name),
1100                        )));
1101                    }
1102                }
1103
1104                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1105                let default_privileges = state
1106                    .default_privileges
1107                    .get_applicable_privileges(
1108                        owner_id,
1109                        name.qualifiers.database_spec.id(),
1110                        Some(name.qualifiers.schema_spec.into()),
1111                        item.typ().into(),
1112                    )
1113                    .map(|item| item.mz_acl_item(owner_id));
1114                // mz_support can read all progress sources.
1115                let progress_source_privilege = if item.is_progress_source() {
1116                    Some(MzAclItem {
1117                        grantee: MZ_SUPPORT_ROLE_ID,
1118                        grantor: owner_id,
1119                        acl_mode: AclMode::SELECT,
1120                    })
1121                } else {
1122                    None
1123                };
1124                let privileges: Vec<_> = merge_mz_acl_items(
1125                    owner_privileges
1126                        .into_iter()
1127                        .chain(default_privileges)
1128                        .chain(progress_source_privilege),
1129                )
1130                .collect();
1131
1132                let temporary_oids = state.get_temporary_oids().collect();
1133
1134                if item.is_temporary() {
1135                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1136                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1137                    {
1138                        return Err(AdapterError::Catalog(Error::new(
1139                            ErrorKind::InvalidTemporarySchema,
1140                        )));
1141                    }
1142                    let oid = tx.allocate_oid(&temporary_oids)?;
1143                    let item = TemporaryItem {
1144                        id,
1145                        oid,
1146                        name: name.clone(),
1147                        item: item.clone(),
1148                        owner_id,
1149                        privileges: PrivilegeMap::from_mz_acl_items(privileges),
1150                    };
1151                    temporary_item_updates.push((item, StateDiff::Addition));
1152                } else {
1153                    if let Some(temp_id) =
1154                        item.uses()
1155                            .iter()
1156                            .find(|id| match state.try_get_entry(*id) {
1157                                Some(entry) => entry.item().is_temporary(),
1158                                None => temporary_ids.contains(id),
1159                            })
1160                    {
1161                        let temp_item = state.get_entry(temp_id);
1162                        return Err(AdapterError::Catalog(Error::new(
1163                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1164                        )));
1165                    }
1166                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1167                        && !system_user
1168                    {
1169                        let schema_name = state
1170                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1171                            .schema;
1172                        return Err(AdapterError::Catalog(Error::new(
1173                            ErrorKind::ReadOnlySystemSchema(schema_name),
1174                        )));
1175                    }
1176                    let schema_id = name.qualifiers.schema_spec.clone().into();
1177                    let item_type = item.typ();
1178                    let (create_sql, global_id, versions) = item.to_serialized();
1179                    tx.insert_user_item(
1180                        id,
1181                        global_id,
1182                        schema_id,
1183                        &name.item,
1184                        create_sql,
1185                        owner_id,
1186                        privileges.clone(),
1187                        &temporary_oids,
1188                        versions,
1189                    )?;
1190                    info!(
1191                        "create {} {} ({})",
1192                        item_type,
1193                        state.resolve_full_name(&name, None),
1194                        id
1195                    );
1196                }
1197
1198                if Self::should_audit_log_item(&item) {
1199                    let name = Self::full_name_detail(
1200                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1201                    );
1202                    let details = match &item {
1203                        CatalogItem::Source(s) => {
1204                            let cluster_id = match s.data_source {
1205                                // Ingestion exports don't have their own cluster, but
1206                                // run on their ingestion's cluster.
1207                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1208                                    match state.get_entry(&ingestion_id).cluster_id() {
1209                                        Some(cluster_id) => Some(cluster_id.to_string()),
1210                                        None => None,
1211                                    }
1212                                }
1213                                _ => match item.cluster_id() {
1214                                    Some(cluster_id) => Some(cluster_id.to_string()),
1215                                    None => None,
1216                                },
1217                            };
1218
1219                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1220                                id: id.to_string(),
1221                                cluster_id,
1222                                name,
1223                                external_type: s.source_type().to_string(),
1224                            })
1225                        }
1226                        CatalogItem::Sink(s) => {
1227                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1228                                id: id.to_string(),
1229                                cluster_id: Some(s.cluster_id.to_string()),
1230                                name,
1231                                external_type: s.sink_type().to_string(),
1232                            })
1233                        }
1234                        CatalogItem::Index(i) => {
1235                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1236                                id: id.to_string(),
1237                                name,
1238                                cluster_id: i.cluster_id.to_string(),
1239                            })
1240                        }
1241                        CatalogItem::MaterializedView(mv) => {
1242                            EventDetails::CreateMaterializedViewV1(
1243                                mz_audit_log::CreateMaterializedViewV1 {
1244                                    id: id.to_string(),
1245                                    name,
1246                                    cluster_id: mv.cluster_id.to_string(),
1247                                },
1248                            )
1249                        }
1250                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1251                            id: id.to_string(),
1252                            name,
1253                        }),
1254                    };
1255                    CatalogState::add_to_audit_log(
1256                        &state.system_configuration,
1257                        oracle_write_ts,
1258                        session,
1259                        tx,
1260                        audit_events,
1261                        EventType::Create,
1262                        catalog_type_to_audit_object_type(item.typ()),
1263                        details,
1264                    )?;
1265                }
1266            }
1267            Op::CreateNetworkPolicy {
1268                rules,
1269                name,
1270                owner_id,
1271            } => {
1272                if state.network_policies_by_name.contains_key(&name) {
1273                    return Err(AdapterError::PlanError(PlanError::Catalog(
1274                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1275                    )));
1276                }
1277                if is_reserved_name(&name) {
1278                    return Err(AdapterError::Catalog(Error::new(
1279                        ErrorKind::ReservedNetworkPolicyName(name),
1280                    )));
1281                }
1282
1283                let owner_privileges = vec![rbac::owner_privilege(
1284                    mz_sql::catalog::ObjectType::NetworkPolicy,
1285                    owner_id,
1286                )];
1287                let default_privileges = state
1288                    .default_privileges
1289                    .get_applicable_privileges(
1290                        owner_id,
1291                        None,
1292                        None,
1293                        mz_sql::catalog::ObjectType::NetworkPolicy,
1294                    )
1295                    .map(|item| item.mz_acl_item(owner_id));
1296                let privileges: Vec<_> =
1297                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1298                        .collect();
1299
1300                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1301                let id = tx.insert_user_network_policy(
1302                    name.clone(),
1303                    rules,
1304                    privileges,
1305                    owner_id,
1306                    &temporary_oids,
1307                )?;
1308
1309                CatalogState::add_to_audit_log(
1310                    &state.system_configuration,
1311                    oracle_write_ts,
1312                    session,
1313                    tx,
1314                    audit_events,
1315                    EventType::Create,
1316                    ObjectType::NetworkPolicy,
1317                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1318                        id: id.to_string(),
1319                        name: name.clone(),
1320                    }),
1321                )?;
1322
1323                info!("created network policy {name} ({id})");
1324            }
1325            Op::Comment {
1326                object_id,
1327                sub_component,
1328                comment,
1329            } => {
1330                tx.update_comment(object_id, sub_component, comment)?;
1331                let entry = state.get_comment_id_entry(&object_id);
1332                let should_log = entry
1333                    .map(|entry| Self::should_audit_log_item(entry.item()))
1334                    // Things that aren't catalog entries can't be temp, so should be logged.
1335                    .unwrap_or(true);
1336                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1337                // comments won't be logged for now.
1338                if let (Some(conn_id), true) =
1339                    (session.map(|session| session.conn_id()), should_log)
1340                {
1341                    CatalogState::add_to_audit_log(
1342                        &state.system_configuration,
1343                        oracle_write_ts,
1344                        session,
1345                        tx,
1346                        audit_events,
1347                        EventType::Comment,
1348                        comment_id_to_audit_object_type(object_id),
1349                        EventDetails::IdNameV1(IdNameV1 {
1350                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1351                            id: format!("{object_id:?}"),
1352                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1353                        }),
1354                    )?;
1355                }
1356            }
1357            Op::UpdateSourceReferences {
1358                source_id,
1359                references,
1360            } => {
1361                tx.update_source_references(
1362                    source_id,
1363                    references
1364                        .references
1365                        .into_iter()
1366                        .map(|reference| reference.into())
1367                        .collect(),
1368                    references.updated_at,
1369                )?;
1370            }
1371            Op::DropObjects(drop_object_infos) => {
1372                // Generate all of the objects that need to get dropped.
1373                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1374
1375                // Drop any associated comments.
1376                tx.drop_comments(&delta.comments)?;
1377
1378                // Drop any items.
1379                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1380                    delta
1381                        .items
1382                        .iter()
1383                        .map(|id| id)
1384                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1385                tx.remove_items(&durable_items_to_drop)?;
1386                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1387                    let entry = state.get_entry(&id);
1388                    (entry.clone().into(), StateDiff::Retraction)
1389                }));
1390
1391                for item_id in delta.items {
1392                    let entry = state.get_entry(&item_id);
1393
1394                    if entry.item().is_storage_collection() {
1395                        storage_collections_to_drop.extend(entry.global_ids());
1396                    }
1397
1398                    if state.source_references.contains_key(&item_id) {
1399                        tx.remove_source_references(item_id)?;
1400                    }
1401
1402                    if Self::should_audit_log_item(entry.item()) {
1403                        CatalogState::add_to_audit_log(
1404                            &state.system_configuration,
1405                            oracle_write_ts,
1406                            session,
1407                            tx,
1408                            audit_events,
1409                            EventType::Drop,
1410                            catalog_type_to_audit_object_type(entry.item().typ()),
1411                            EventDetails::IdFullNameV1(IdFullNameV1 {
1412                                id: item_id.to_string(),
1413                                name: Self::full_name_detail(&state.resolve_full_name(
1414                                    entry.name(),
1415                                    session.map(|session| session.conn_id()),
1416                                )),
1417                            }),
1418                        )?;
1419                    }
1420                    info!(
1421                        "drop {} {} ({})",
1422                        entry.item_type(),
1423                        state.resolve_full_name(entry.name(), entry.conn_id()),
1424                        item_id
1425                    );
1426                }
1427
1428                // Drop any schemas.
1429                let schemas = delta
1430                    .schemas
1431                    .iter()
1432                    .map(|(schema_spec, database_spec)| {
1433                        (SchemaId::from(schema_spec), *database_spec)
1434                    })
1435                    .collect();
1436                tx.remove_schemas(&schemas)?;
1437
1438                for (schema_spec, database_spec) in delta.schemas {
1439                    let schema = state.get_schema(
1440                        &database_spec,
1441                        &schema_spec,
1442                        session
1443                            .map(|session| session.conn_id())
1444                            .unwrap_or(&SYSTEM_CONN_ID),
1445                    );
1446
1447                    let schema_id = SchemaId::from(schema_spec);
1448                    let database_id = match database_spec {
1449                        ResolvedDatabaseSpecifier::Ambient => None,
1450                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1451                    };
1452
1453                    CatalogState::add_to_audit_log(
1454                        &state.system_configuration,
1455                        oracle_write_ts,
1456                        session,
1457                        tx,
1458                        audit_events,
1459                        EventType::Drop,
1460                        ObjectType::Schema,
1461                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1462                            id: schema_id.to_string(),
1463                            name: schema.name.schema.to_string(),
1464                            database_name: database_id
1465                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1466                        }),
1467                    )?;
1468                }
1469
1470                // Drop any databases.
1471                tx.remove_databases(&delta.databases)?;
1472
1473                for database_id in delta.databases {
1474                    let database = state.get_database(&database_id).clone();
1475
1476                    CatalogState::add_to_audit_log(
1477                        &state.system_configuration,
1478                        oracle_write_ts,
1479                        session,
1480                        tx,
1481                        audit_events,
1482                        EventType::Drop,
1483                        ObjectType::Database,
1484                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1485                            id: database_id.to_string(),
1486                            name: database.name.clone(),
1487                        }),
1488                    )?;
1489                }
1490
1491                // Drop any roles.
1492                tx.remove_user_roles(&delta.roles)?;
1493
1494                for role_id in delta.roles {
1495                    let role = state
1496                        .roles_by_id
1497                        .get(&role_id)
1498                        .expect("catalog out of sync");
1499
1500                    CatalogState::add_to_audit_log(
1501                        &state.system_configuration,
1502                        oracle_write_ts,
1503                        session,
1504                        tx,
1505                        audit_events,
1506                        EventType::Drop,
1507                        ObjectType::Role,
1508                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1509                            id: role.id.to_string(),
1510                            name: role.name.clone(),
1511                        }),
1512                    )?;
1513                    info!("drop role {}", role.name());
1514                }
1515
1516                // Drop any network policies.
1517                tx.remove_network_policies(&delta.network_policies)?;
1518
1519                for network_policy_id in delta.network_policies {
1520                    let policy = state
1521                        .network_policies_by_id
1522                        .get(&network_policy_id)
1523                        .expect("catalog out of sync");
1524
1525                    CatalogState::add_to_audit_log(
1526                        &state.system_configuration,
1527                        oracle_write_ts,
1528                        session,
1529                        tx,
1530                        audit_events,
1531                        EventType::Drop,
1532                        ObjectType::NetworkPolicy,
1533                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1534                            id: policy.id.to_string(),
1535                            name: policy.name.clone(),
1536                        }),
1537                    )?;
1538                    info!("drop network policy {}", policy.name.clone());
1539                }
1540
1541                // Drop any replicas.
1542                let replicas = delta.replicas.keys().copied().collect();
1543                tx.remove_cluster_replicas(&replicas)?;
1544
1545                for (replica_id, (cluster_id, reason)) in delta.replicas {
1546                    let cluster = state.get_cluster(cluster_id);
1547                    let replica = cluster.replica(replica_id).expect("Must exist");
1548
1549                    let (reason, scheduling_policies) = reason.into_audit_log();
1550                    let details =
1551                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1552                            cluster_id: cluster_id.to_string(),
1553                            cluster_name: cluster.name.clone(),
1554                            replica_id: Some(replica_id.to_string()),
1555                            replica_name: replica.name.clone(),
1556                            reason,
1557                            scheduling_policies,
1558                        });
1559                    CatalogState::add_to_audit_log(
1560                        &state.system_configuration,
1561                        oracle_write_ts,
1562                        session,
1563                        tx,
1564                        audit_events,
1565                        EventType::Drop,
1566                        ObjectType::ClusterReplica,
1567                        details,
1568                    )?;
1569                }
1570
1571                // Drop any clusters.
1572                tx.remove_clusters(&delta.clusters)?;
1573
1574                for cluster_id in delta.clusters {
1575                    let cluster = state.get_cluster(cluster_id);
1576
1577                    CatalogState::add_to_audit_log(
1578                        &state.system_configuration,
1579                        oracle_write_ts,
1580                        session,
1581                        tx,
1582                        audit_events,
1583                        EventType::Drop,
1584                        ObjectType::Cluster,
1585                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1586                            id: cluster.id.to_string(),
1587                            name: cluster.name.clone(),
1588                        }),
1589                    )?;
1590                }
1591            }
1592            Op::GrantRole {
1593                role_id,
1594                member_id,
1595                grantor_id,
1596            } => {
1597                state.ensure_not_reserved_role(&member_id)?;
1598                state.ensure_grantable_role(&role_id)?;
1599                if state.collect_role_membership(&role_id).contains(&member_id) {
1600                    let group_role = state.get_role(&role_id);
1601                    let member_role = state.get_role(&member_id);
1602                    return Err(AdapterError::Catalog(Error::new(
1603                        ErrorKind::CircularRoleMembership {
1604                            role_name: group_role.name().to_string(),
1605                            member_name: member_role.name().to_string(),
1606                        },
1607                    )));
1608                }
1609                let mut member_role = state.get_role(&member_id).clone();
1610                member_role.membership.map.insert(role_id, grantor_id);
1611                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1612
1613                CatalogState::add_to_audit_log(
1614                    &state.system_configuration,
1615                    oracle_write_ts,
1616                    session,
1617                    tx,
1618                    audit_events,
1619                    EventType::Grant,
1620                    ObjectType::Role,
1621                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1622                        role_id: role_id.to_string(),
1623                        member_id: member_id.to_string(),
1624                        grantor_id: grantor_id.to_string(),
1625                        executed_by: session
1626                            .map(|session| session.authenticated_role_id())
1627                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1628                            .to_string(),
1629                    }),
1630                )?;
1631            }
1632            Op::RevokeRole {
1633                role_id,
1634                member_id,
1635                grantor_id,
1636            } => {
1637                state.ensure_not_reserved_role(&member_id)?;
1638                state.ensure_grantable_role(&role_id)?;
1639                let mut member_role = state.get_role(&member_id).clone();
1640                member_role.membership.map.remove(&role_id);
1641                tx.update_role(member_id, member_role.into(), PasswordAction::NoChange)?;
1642
1643                CatalogState::add_to_audit_log(
1644                    &state.system_configuration,
1645                    oracle_write_ts,
1646                    session,
1647                    tx,
1648                    audit_events,
1649                    EventType::Revoke,
1650                    ObjectType::Role,
1651                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1652                        role_id: role_id.to_string(),
1653                        member_id: member_id.to_string(),
1654                        grantor_id: grantor_id.to_string(),
1655                        executed_by: session
1656                            .map(|session| session.authenticated_role_id())
1657                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1658                            .to_string(),
1659                    }),
1660                )?;
1661            }
1662            Op::UpdatePrivilege {
1663                target_id,
1664                privilege,
1665                variant,
1666            } => {
1667                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1668                    UpdatePrivilegeVariant::Grant => {
1669                        privileges.grant(privilege);
1670                    }
1671                    UpdatePrivilegeVariant::Revoke => {
1672                        privileges.revoke(&privilege);
1673                    }
1674                };
1675                match &target_id {
1676                    SystemObjectId::Object(object_id) => match object_id {
1677                        ObjectId::Cluster(id) => {
1678                            let mut cluster = state.get_cluster(*id).clone();
1679                            update_privilege_fn(&mut cluster.privileges);
1680                            tx.update_cluster(*id, cluster.into())?;
1681                        }
1682                        ObjectId::Database(id) => {
1683                            let mut database = state.get_database(id).clone();
1684                            update_privilege_fn(&mut database.privileges);
1685                            tx.update_database(*id, database.into())?;
1686                        }
1687                        ObjectId::NetworkPolicy(id) => {
1688                            let mut policy = state.get_network_policy(id).clone();
1689                            update_privilege_fn(&mut policy.privileges);
1690                            tx.update_network_policy(*id, policy.into())?;
1691                        }
1692                        ObjectId::Schema((database_spec, schema_spec)) => {
1693                            let schema_id = schema_spec.clone().into();
1694                            let mut schema = state
1695                                .get_schema(
1696                                    database_spec,
1697                                    schema_spec,
1698                                    session
1699                                        .map(|session| session.conn_id())
1700                                        .unwrap_or(&SYSTEM_CONN_ID),
1701                                )
1702                                .clone();
1703                            update_privilege_fn(&mut schema.privileges);
1704                            tx.update_schema(schema_id, schema.into())?;
1705                        }
1706                        ObjectId::Item(id) => {
1707                            let entry = state.get_entry(id);
1708                            let mut new_entry = entry.clone();
1709                            update_privilege_fn(&mut new_entry.privileges);
1710                            if !new_entry.item().is_temporary() {
1711                                tx.update_item(*id, new_entry.into())?;
1712                            } else {
1713                                temporary_item_updates
1714                                    .push((entry.clone().into(), StateDiff::Retraction));
1715                                temporary_item_updates
1716                                    .push((new_entry.into(), StateDiff::Addition));
1717                            }
1718                        }
1719                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1720                    },
1721                    SystemObjectId::System => {
1722                        let mut system_privileges = state.system_privileges.clone();
1723                        update_privilege_fn(&mut system_privileges);
1724                        let new_privilege =
1725                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1726                        tx.set_system_privilege(
1727                            privilege.grantee,
1728                            privilege.grantor,
1729                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1730                        )?;
1731                    }
1732                }
1733                let object_type = state.get_system_object_type(&target_id);
1734                let object_id_str = match &target_id {
1735                    SystemObjectId::System => "SYSTEM".to_string(),
1736                    SystemObjectId::Object(id) => id.to_string(),
1737                };
1738                CatalogState::add_to_audit_log(
1739                    &state.system_configuration,
1740                    oracle_write_ts,
1741                    session,
1742                    tx,
1743                    audit_events,
1744                    variant.into(),
1745                    system_object_type_to_audit_object_type(&object_type),
1746                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1747                        object_id: object_id_str,
1748                        grantee_id: privilege.grantee.to_string(),
1749                        grantor_id: privilege.grantor.to_string(),
1750                        privileges: privilege.acl_mode.to_string(),
1751                    }),
1752                )?;
1753            }
1754            Op::UpdateDefaultPrivilege {
1755                privilege_object,
1756                privilege_acl_item,
1757                variant,
1758            } => {
1759                let mut default_privileges = state.default_privileges.clone();
1760                match variant {
1761                    UpdatePrivilegeVariant::Grant => default_privileges
1762                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1763                    UpdatePrivilegeVariant::Revoke => {
1764                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1765                    }
1766                }
1767                let new_acl_mode = default_privileges
1768                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1769                tx.set_default_privilege(
1770                    privilege_object.role_id,
1771                    privilege_object.database_id,
1772                    privilege_object.schema_id,
1773                    privilege_object.object_type,
1774                    privilege_acl_item.grantee,
1775                    new_acl_mode.cloned(),
1776                )?;
1777                CatalogState::add_to_audit_log(
1778                    &state.system_configuration,
1779                    oracle_write_ts,
1780                    session,
1781                    tx,
1782                    audit_events,
1783                    variant.into(),
1784                    object_type_to_audit_object_type(privilege_object.object_type),
1785                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1786                        role_id: privilege_object.role_id.to_string(),
1787                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1788                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1789                        grantee_id: privilege_acl_item.grantee.to_string(),
1790                        privileges: privilege_acl_item.acl_mode.to_string(),
1791                    }),
1792                )?;
1793            }
1794            Op::RenameCluster {
1795                id,
1796                name,
1797                to_name,
1798                check_reserved_names,
1799            } => {
1800                if id.is_system() {
1801                    return Err(AdapterError::Catalog(Error::new(
1802                        ErrorKind::ReadOnlyCluster(name.clone()),
1803                    )));
1804                }
1805                if check_reserved_names && is_reserved_name(&to_name) {
1806                    return Err(AdapterError::Catalog(Error::new(
1807                        ErrorKind::ReservedClusterName(to_name),
1808                    )));
1809                }
1810                tx.rename_cluster(id, &name, &to_name)?;
1811                CatalogState::add_to_audit_log(
1812                    &state.system_configuration,
1813                    oracle_write_ts,
1814                    session,
1815                    tx,
1816                    audit_events,
1817                    EventType::Alter,
1818                    ObjectType::Cluster,
1819                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1820                        id: id.to_string(),
1821                        old_name: name.clone(),
1822                        new_name: to_name.clone(),
1823                    }),
1824                )?;
1825                info!("rename cluster {name} to {to_name}");
1826            }
1827            Op::RenameClusterReplica {
1828                cluster_id,
1829                replica_id,
1830                name,
1831                to_name,
1832            } => {
1833                if is_reserved_name(&to_name) {
1834                    return Err(AdapterError::Catalog(Error::new(
1835                        ErrorKind::ReservedReplicaName(to_name),
1836                    )));
1837                }
1838                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1839                CatalogState::add_to_audit_log(
1840                    &state.system_configuration,
1841                    oracle_write_ts,
1842                    session,
1843                    tx,
1844                    audit_events,
1845                    EventType::Alter,
1846                    ObjectType::ClusterReplica,
1847                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1848                        cluster_id: cluster_id.to_string(),
1849                        replica_id: replica_id.to_string(),
1850                        old_name: name.replica.as_str().to_string(),
1851                        new_name: to_name.clone(),
1852                    }),
1853                )?;
1854                info!("rename cluster replica {name} to {to_name}");
1855            }
1856            Op::RenameItem {
1857                id,
1858                to_name,
1859                current_full_name,
1860            } => {
1861                let mut updates = Vec::new();
1862
1863                let entry = state.get_entry(&id);
1864                if let CatalogItem::Type(_) = entry.item() {
1865                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1866                        current_full_name.to_string(),
1867                    ))));
1868                }
1869
1870                if entry.id().is_system() {
1871                    let name = state
1872                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1873                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1874                        name.to_string(),
1875                    ))));
1876                }
1877
1878                let mut to_full_name = current_full_name.clone();
1879                to_full_name.item.clone_from(&to_name);
1880
1881                let mut to_qualified_name = entry.name().clone();
1882                to_qualified_name.item.clone_from(&to_name);
1883
1884                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1885                    id: id.to_string(),
1886                    old_name: Self::full_name_detail(&current_full_name),
1887                    new_name: Self::full_name_detail(&to_full_name),
1888                });
1889                if Self::should_audit_log_item(entry.item()) {
1890                    CatalogState::add_to_audit_log(
1891                        &state.system_configuration,
1892                        oracle_write_ts,
1893                        session,
1894                        tx,
1895                        audit_events,
1896                        EventType::Alter,
1897                        catalog_type_to_audit_object_type(entry.item().typ()),
1898                        details,
1899                    )?;
1900                }
1901
1902                // Rename item itself.
1903                let mut new_entry = entry.clone();
1904                new_entry.name.item.clone_from(&to_name);
1905                new_entry.item = entry
1906                    .item()
1907                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1908                    .map_err(|e| {
1909                        Error::new(ErrorKind::from(AmbiguousRename {
1910                            depender: state
1911                                .resolve_full_name(entry.name(), entry.conn_id())
1912                                .to_string(),
1913                            dependee: state
1914                                .resolve_full_name(entry.name(), entry.conn_id())
1915                                .to_string(),
1916                            message: e,
1917                        }))
1918                    })?;
1919
1920                for id in entry.referenced_by() {
1921                    let dependent_item = state.get_entry(id);
1922                    let mut to_entry = dependent_item.clone();
1923                    to_entry.item = dependent_item
1924                        .item()
1925                        .rename_item_refs(
1926                            current_full_name.clone(),
1927                            to_full_name.item.clone(),
1928                            false,
1929                        )
1930                        .map_err(|e| {
1931                            Error::new(ErrorKind::from(AmbiguousRename {
1932                                depender: state
1933                                    .resolve_full_name(
1934                                        dependent_item.name(),
1935                                        dependent_item.conn_id(),
1936                                    )
1937                                    .to_string(),
1938                                dependee: state
1939                                    .resolve_full_name(entry.name(), entry.conn_id())
1940                                    .to_string(),
1941                                message: e,
1942                            }))
1943                        })?;
1944
1945                    if !to_entry.item().is_temporary() {
1946                        tx.update_item(*id, to_entry.into())?;
1947                    } else {
1948                        temporary_item_updates
1949                            .push((dependent_item.clone().into(), StateDiff::Retraction));
1950                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1951                    }
1952                    updates.push(*id);
1953                }
1954                if !new_entry.item().is_temporary() {
1955                    tx.update_item(id, new_entry.into())?;
1956                } else {
1957                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1958                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1959                }
1960
1961                updates.push(id);
1962                for id in updates {
1963                    Self::log_update(state, &id);
1964                }
1965            }
1966            Op::RenameSchema {
1967                database_spec,
1968                schema_spec,
1969                new_name,
1970                check_reserved_names,
1971            } => {
1972                if check_reserved_names && is_reserved_name(&new_name) {
1973                    return Err(AdapterError::Catalog(Error::new(
1974                        ErrorKind::ReservedSchemaName(new_name),
1975                    )));
1976                }
1977
1978                let conn_id = session
1979                    .map(|session| session.conn_id())
1980                    .unwrap_or(&SYSTEM_CONN_ID);
1981
1982                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1983                let cur_name = schema.name().schema.clone();
1984
1985                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1986                    return Err(AdapterError::Catalog(Error::new(
1987                        ErrorKind::AmbientSchemaRename(cur_name),
1988                    )));
1989                };
1990                let database = state.get_database(&database_id);
1991                let database_name = &database.name;
1992
1993                let mut updates = Vec::new();
1994                let mut items_to_update = BTreeMap::new();
1995
1996                let mut update_item = |id| {
1997                    if items_to_update.contains_key(id) {
1998                        return Ok(());
1999                    }
2000
2001                    let entry = state.get_entry(id);
2002
2003                    // Update our item.
2004                    let mut new_entry = entry.clone();
2005                    new_entry.item = entry
2006                        .item
2007                        .rename_schema_refs(database_name, &cur_name, &new_name)
2008                        .map_err(|(s, _i)| {
2009                            Error::new(ErrorKind::from(AmbiguousRename {
2010                                depender: state
2011                                    .resolve_full_name(entry.name(), entry.conn_id())
2012                                    .to_string(),
2013                                dependee: format!("{database_name}.{cur_name}"),
2014                                message: format!("ambiguous reference to schema named {s}"),
2015                            }))
2016                        })?;
2017
2018                    // Queue updates for Catalog storage and Builtin Tables.
2019                    if !new_entry.item().is_temporary() {
2020                        items_to_update.insert(*id, new_entry.into());
2021                    } else {
2022                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2023                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2024                    }
2025                    updates.push(id);
2026
2027                    Ok::<_, AdapterError>(())
2028                };
2029
2030                // Update all of the items in the schema.
2031                for (_name, item_id) in &schema.items {
2032                    // Update the item itself.
2033                    update_item(item_id)?;
2034
2035                    // Update everything that depends on this item.
2036                    for id in state.get_entry(item_id).referenced_by() {
2037                        update_item(id)?;
2038                    }
2039                }
2040                // Note: When updating the transaction it's very important that we update the
2041                // items as a whole group, otherwise we exhibit quadratic behavior.
2042                tx.update_items(items_to_update)?;
2043
2044                // Renaming temporary schemas is not supported.
2045                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2046                    let schema_name = schema.name().schema.clone();
2047                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2048                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2049                    )));
2050                };
2051
2052                // Add an entry to the audit log.
2053                let database_name = database_spec
2054                    .id()
2055                    .map(|id| state.get_database(&id).name.clone());
2056                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2057                    id: schema_id.to_string(),
2058                    old_name: schema.name().schema.clone(),
2059                    new_name: new_name.clone(),
2060                    database_name,
2061                });
2062                CatalogState::add_to_audit_log(
2063                    &state.system_configuration,
2064                    oracle_write_ts,
2065                    session,
2066                    tx,
2067                    audit_events,
2068                    EventType::Alter,
2069                    mz_audit_log::ObjectType::Schema,
2070                    details,
2071                )?;
2072
2073                // Update the schema itself.
2074                let mut new_schema = schema.clone();
2075                new_schema.name.schema.clone_from(&new_name);
2076                tx.update_schema(schema_id, new_schema.into())?;
2077
2078                for id in updates {
2079                    Self::log_update(state, id);
2080                }
2081            }
2082            Op::UpdateOwner { id, new_owner } => {
2083                let conn_id = session
2084                    .map(|session| session.conn_id())
2085                    .unwrap_or(&SYSTEM_CONN_ID);
2086                let old_owner = state
2087                    .get_owner_id(&id, conn_id)
2088                    .expect("cannot update the owner of an object without an owner");
2089                match &id {
2090                    ObjectId::Cluster(id) => {
2091                        let mut cluster = state.get_cluster(*id).clone();
2092                        if id.is_system() {
2093                            return Err(AdapterError::Catalog(Error::new(
2094                                ErrorKind::ReadOnlyCluster(cluster.name),
2095                            )));
2096                        }
2097                        Self::update_privilege_owners(
2098                            &mut cluster.privileges,
2099                            cluster.owner_id,
2100                            new_owner,
2101                        );
2102                        cluster.owner_id = new_owner;
2103                        tx.update_cluster(*id, cluster.into())?;
2104                    }
2105                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2106                        let cluster = state.get_cluster(*cluster_id);
2107                        let mut replica = cluster
2108                            .replica(*replica_id)
2109                            .expect("catalog out of sync")
2110                            .clone();
2111                        if replica_id.is_system() {
2112                            return Err(AdapterError::Catalog(Error::new(
2113                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2114                            )));
2115                        }
2116                        replica.owner_id = new_owner;
2117                        tx.update_cluster_replica(*replica_id, replica.into())?;
2118                    }
2119                    ObjectId::Database(id) => {
2120                        let mut database = state.get_database(id).clone();
2121                        if id.is_system() {
2122                            return Err(AdapterError::Catalog(Error::new(
2123                                ErrorKind::ReadOnlyDatabase(database.name),
2124                            )));
2125                        }
2126                        Self::update_privilege_owners(
2127                            &mut database.privileges,
2128                            database.owner_id,
2129                            new_owner,
2130                        );
2131                        database.owner_id = new_owner;
2132                        tx.update_database(*id, database.clone().into())?;
2133                    }
2134                    ObjectId::Schema((database_spec, schema_spec)) => {
2135                        let schema_id: SchemaId = schema_spec.clone().into();
2136                        let mut schema = state
2137                            .get_schema(database_spec, schema_spec, conn_id)
2138                            .clone();
2139                        if schema_id.is_system() {
2140                            let name = schema.name();
2141                            let full_name = state.resolve_full_schema_name(name);
2142                            return Err(AdapterError::Catalog(Error::new(
2143                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2144                            )));
2145                        }
2146                        Self::update_privilege_owners(
2147                            &mut schema.privileges,
2148                            schema.owner_id,
2149                            new_owner,
2150                        );
2151                        schema.owner_id = new_owner;
2152                        tx.update_schema(schema_id, schema.into())?;
2153                    }
2154                    ObjectId::Item(id) => {
2155                        let entry = state.get_entry(id);
2156                        let mut new_entry = entry.clone();
2157                        if id.is_system() {
2158                            let full_name = state.resolve_full_name(
2159                                new_entry.name(),
2160                                session.map(|session| session.conn_id()),
2161                            );
2162                            return Err(AdapterError::Catalog(Error::new(
2163                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2164                            )));
2165                        }
2166                        Self::update_privilege_owners(
2167                            &mut new_entry.privileges,
2168                            new_entry.owner_id,
2169                            new_owner,
2170                        );
2171                        new_entry.owner_id = new_owner;
2172                        if !new_entry.item().is_temporary() {
2173                            tx.update_item(*id, new_entry.into())?;
2174                        } else {
2175                            temporary_item_updates
2176                                .push((entry.clone().into(), StateDiff::Retraction));
2177                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2178                        }
2179                    }
2180                    ObjectId::NetworkPolicy(id) => {
2181                        let mut policy = state.get_network_policy(id).clone();
2182                        if id.is_system() {
2183                            return Err(AdapterError::Catalog(Error::new(
2184                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2185                            )));
2186                        }
2187                        Self::update_privilege_owners(
2188                            &mut policy.privileges,
2189                            policy.owner_id,
2190                            new_owner,
2191                        );
2192                        policy.owner_id = new_owner;
2193                        tx.update_network_policy(*id, policy.into())?;
2194                    }
2195                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2196                }
2197                let object_type = state.get_object_type(&id);
2198                CatalogState::add_to_audit_log(
2199                    &state.system_configuration,
2200                    oracle_write_ts,
2201                    session,
2202                    tx,
2203                    audit_events,
2204                    EventType::Alter,
2205                    object_type_to_audit_object_type(object_type),
2206                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2207                        object_id: id.to_string(),
2208                        old_owner_id: old_owner.to_string(),
2209                        new_owner_id: new_owner.to_string(),
2210                    }),
2211                )?;
2212            }
2213            Op::UpdateClusterConfig { id, name, config } => {
2214                let mut cluster = state.get_cluster(id).clone();
2215                cluster.config = config;
2216                tx.update_cluster(id, cluster.into())?;
2217                info!("update cluster {}", name);
2218
2219                CatalogState::add_to_audit_log(
2220                    &state.system_configuration,
2221                    oracle_write_ts,
2222                    session,
2223                    tx,
2224                    audit_events,
2225                    EventType::Alter,
2226                    ObjectType::Cluster,
2227                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2228                        id: id.to_string(),
2229                        name,
2230                    }),
2231                )?;
2232            }
2233            Op::UpdateClusterReplicaConfig {
2234                replica_id,
2235                cluster_id,
2236                config,
2237            } => {
2238                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2239                info!("update replica {}", replica.name);
2240                tx.update_cluster_replica(
2241                    replica_id,
2242                    mz_catalog::durable::ClusterReplica {
2243                        cluster_id,
2244                        replica_id,
2245                        name: replica.name.clone(),
2246                        config: config.clone().into(),
2247                        owner_id: replica.owner_id,
2248                    },
2249                )?;
2250            }
2251            Op::UpdateItem { id, name, to_item } => {
2252                let mut entry = state.get_entry(&id).clone();
2253                entry.name = name.clone();
2254                entry.item = to_item.clone();
2255                tx.update_item(id, entry.into())?;
2256
2257                if Self::should_audit_log_item(&to_item) {
2258                    let mut full_name = Self::full_name_detail(
2259                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2260                    );
2261                    full_name.item = name.item;
2262
2263                    CatalogState::add_to_audit_log(
2264                        &state.system_configuration,
2265                        oracle_write_ts,
2266                        session,
2267                        tx,
2268                        audit_events,
2269                        EventType::Alter,
2270                        catalog_type_to_audit_object_type(to_item.typ()),
2271                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2272                            id: id.to_string(),
2273                            name: full_name,
2274                        }),
2275                    )?;
2276                }
2277
2278                Self::log_update(state, &id);
2279            }
2280            Op::UpdateSystemConfiguration { name, value } => {
2281                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2282                tx.upsert_system_config(&name, parsed_value.clone())?;
2283                // This mirrors some "system vars" into the catalog storage
2284                // "config" collection so that we can toggle the flag with
2285                // Launch Darkly, but use it in boot before Launch Darkly is
2286                // available.
2287                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2288                    let with_0dt_deployment_max_wait =
2289                        Duration::parse(VarInput::Flat(&parsed_value))
2290                            .expect("parsing succeeded above");
2291                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2292                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2293                    let with_0dt_deployment_ddl_check_interval =
2294                        Duration::parse(VarInput::Flat(&parsed_value))
2295                            .expect("parsing succeeded above");
2296                    tx.set_0dt_deployment_ddl_check_interval(
2297                        with_0dt_deployment_ddl_check_interval,
2298                    )?;
2299                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2300                    let panic_after_timeout =
2301                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2302                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2303                }
2304
2305                CatalogState::add_to_audit_log(
2306                    &state.system_configuration,
2307                    oracle_write_ts,
2308                    session,
2309                    tx,
2310                    audit_events,
2311                    EventType::Alter,
2312                    ObjectType::System,
2313                    EventDetails::SetV1(mz_audit_log::SetV1 {
2314                        name,
2315                        value: Some(value.borrow().to_vec().join(", ")),
2316                    }),
2317                )?;
2318            }
2319            Op::ResetSystemConfiguration { name } => {
2320                tx.remove_system_config(&name);
2321                // This mirrors some "system vars" into the catalog storage
2322                // "config" collection so that we can toggle the flag with
2323                // Launch Darkly, but use it in boot before Launch Darkly is
2324                // available.
2325                if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2326                    tx.reset_0dt_deployment_max_wait()?;
2327                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2328                    tx.reset_0dt_deployment_ddl_check_interval()?;
2329                }
2330
2331                CatalogState::add_to_audit_log(
2332                    &state.system_configuration,
2333                    oracle_write_ts,
2334                    session,
2335                    tx,
2336                    audit_events,
2337                    EventType::Alter,
2338                    ObjectType::System,
2339                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2340                )?;
2341            }
2342            Op::ResetAllSystemConfiguration => {
2343                tx.clear_system_configs();
2344                tx.reset_0dt_deployment_max_wait()?;
2345                tx.reset_0dt_deployment_ddl_check_interval()?;
2346
2347                CatalogState::add_to_audit_log(
2348                    &state.system_configuration,
2349                    oracle_write_ts,
2350                    session,
2351                    tx,
2352                    audit_events,
2353                    EventType::Alter,
2354                    ObjectType::System,
2355                    EventDetails::ResetAllV1,
2356                )?;
2357            }
2358            Op::WeirdStorageUsageUpdates {
2359                object_id,
2360                size_bytes,
2361                collection_timestamp,
2362            } => {
2363                let id = tx.allocate_storage_usage_ids()?;
2364                let metric =
2365                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2366                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2367                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2368                weird_builtin_table_update = Some(builtin_table_update);
2369            }
2370        };
2371        Ok((weird_builtin_table_update, temporary_item_updates))
2372    }
2373
2374    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2375        let entry = state.get_entry(id);
2376        info!(
2377            "update {} {} ({})",
2378            entry.item_type(),
2379            state.resolve_full_name(entry.name(), entry.conn_id()),
2380            id
2381        );
2382    }
2383
2384    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2385    /// implementation:
2386    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2387    fn update_privilege_owners(
2388        privileges: &mut PrivilegeMap,
2389        old_owner: RoleId,
2390        new_owner: RoleId,
2391    ) {
2392        // TODO(jkosh44) Would be nice not to clone every privilege.
2393        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2394
2395        let mut new_present = false;
2396        for privilege in flat_privileges.iter_mut() {
2397            // Old owner's granted privilege are updated to be granted by the new
2398            // owner.
2399            if privilege.grantor == old_owner {
2400                privilege.grantor = new_owner;
2401            } else if privilege.grantor == new_owner {
2402                new_present = true;
2403            }
2404            // Old owner's privileges is given to the new owner.
2405            if privilege.grantee == old_owner {
2406                privilege.grantee = new_owner;
2407            } else if privilege.grantee == new_owner {
2408                new_present = true;
2409            }
2410        }
2411
2412        // If the old privilege list contained references to the new owner, we may
2413        // have created duplicate entries. Here we try and consolidate them. This
2414        // is inspired by PostgreSQL's algorithm but not identical.
2415        if new_present {
2416            // Group privileges by (grantee, grantor).
2417            let privilege_map: BTreeMap<_, Vec<_>> =
2418                flat_privileges
2419                    .into_iter()
2420                    .fold(BTreeMap::new(), |mut accum, privilege| {
2421                        accum
2422                            .entry((privilege.grantee, privilege.grantor))
2423                            .or_default()
2424                            .push(privilege);
2425                        accum
2426                    });
2427
2428            // Consolidate and update all privileges.
2429            flat_privileges = privilege_map
2430                .into_iter()
2431                .map(|((grantee, grantor), values)|
2432                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2433                    values.into_iter().fold(
2434                        MzAclItem::empty(grantee, grantor),
2435                        |mut accum, mz_aclitem| {
2436                            accum.acl_mode =
2437                                accum.acl_mode.union(mz_aclitem.acl_mode);
2438                            accum
2439                        },
2440                    ))
2441                .collect();
2442        }
2443
2444        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2445    }
2446}
2447
2448/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2449///
2450/// Note: Previously we used to omit a single `Op::DropObject` for every object
2451/// we needed to drop. But removing a batch of objects from a durable Catalog
2452/// Transaction is O(n) where `n` is the number of objects that exist in the
2453/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2454/// `DROP ... CASCADE` statement.
2455#[derive(Debug, Default)]
2456pub(crate) struct ObjectsToDrop {
2457    pub comments: BTreeSet<CommentObjectId>,
2458    pub databases: BTreeSet<DatabaseId>,
2459    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2460    pub clusters: BTreeSet<ClusterId>,
2461    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2462    pub roles: BTreeSet<RoleId>,
2463    pub items: Vec<CatalogItemId>,
2464    pub network_policies: BTreeSet<NetworkPolicyId>,
2465}
2466
2467impl ObjectsToDrop {
2468    pub fn generate(
2469        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2470        state: &CatalogState,
2471        session: Option<&ConnMeta>,
2472    ) -> Result<Self, AdapterError> {
2473        let mut delta = ObjectsToDrop::default();
2474
2475        for drop_object_info in drop_object_infos {
2476            delta.add_item(drop_object_info, state, session)?;
2477        }
2478
2479        Ok(delta)
2480    }
2481
2482    fn add_item(
2483        &mut self,
2484        drop_object_info: DropObjectInfo,
2485        state: &CatalogState,
2486        session: Option<&ConnMeta>,
2487    ) -> Result<(), AdapterError> {
2488        self.comments
2489            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2490
2491        match drop_object_info {
2492            DropObjectInfo::Database(database_id) => {
2493                let database = &state.database_by_id[&database_id];
2494                if database_id.is_system() {
2495                    return Err(AdapterError::Catalog(Error::new(
2496                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2497                    )));
2498                }
2499
2500                self.databases.insert(database_id);
2501            }
2502            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2503                let schema = state.get_schema(
2504                    &database_spec,
2505                    &schema_spec,
2506                    session
2507                        .map(|session| session.conn_id())
2508                        .unwrap_or(&SYSTEM_CONN_ID),
2509                );
2510                let schema_id: SchemaId = schema_spec.into();
2511                if schema_id.is_system() {
2512                    let name = schema.name();
2513                    let full_name = state.resolve_full_schema_name(name);
2514                    return Err(AdapterError::Catalog(Error::new(
2515                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2516                    )));
2517                }
2518
2519                self.schemas.insert(schema_spec, database_spec);
2520            }
2521            DropObjectInfo::Role(role_id) => {
2522                let name = state.get_role(&role_id).name().to_string();
2523                if role_id.is_system() || role_id.is_predefined() {
2524                    return Err(AdapterError::Catalog(Error::new(
2525                        ErrorKind::ReservedRoleName(name.clone()),
2526                    )));
2527                }
2528                state.ensure_not_reserved_role(&role_id)?;
2529
2530                self.roles.insert(role_id);
2531            }
2532            DropObjectInfo::Cluster(cluster_id) => {
2533                let cluster = state.get_cluster(cluster_id);
2534                let name = &cluster.name;
2535                if cluster_id.is_system() {
2536                    return Err(AdapterError::Catalog(Error::new(
2537                        ErrorKind::ReadOnlyCluster(name.clone()),
2538                    )));
2539                }
2540
2541                self.clusters.insert(cluster_id);
2542            }
2543            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2544                let cluster = state.get_cluster(cluster_id);
2545                let replica = cluster.replica(replica_id).expect("Must exist");
2546
2547                self.replicas
2548                    .insert(replica.replica_id, (cluster.id, reason));
2549            }
2550            DropObjectInfo::Item(item_id) => {
2551                let entry = state.get_entry(&item_id);
2552                if item_id.is_system() {
2553                    let name = entry.name();
2554                    let full_name =
2555                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2556                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2557                        full_name.to_string(),
2558                    ))));
2559                }
2560
2561                self.items.push(item_id);
2562            }
2563            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2564                let policy = state.get_network_policy(&network_policy_id);
2565                let name = &policy.name;
2566                if network_policy_id.is_system() {
2567                    return Err(AdapterError::Catalog(Error::new(
2568                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2569                    )));
2570                }
2571
2572                self.network_policies.insert(network_policy_id);
2573            }
2574        }
2575
2576        Ok(())
2577    }
2578}
2579
2580#[cfg(test)]
2581mod tests {
2582    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2583    use mz_repr::role_id::RoleId;
2584
2585    use crate::catalog::Catalog;
2586
2587    #[mz_ore::test]
2588    fn test_update_privilege_owners() {
2589        let old_owner = RoleId::User(1);
2590        let new_owner = RoleId::User(2);
2591        let other_role = RoleId::User(3);
2592
2593        // older owner exists as grantor.
2594        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2595            MzAclItem {
2596                grantee: other_role,
2597                grantor: old_owner,
2598                acl_mode: AclMode::UPDATE,
2599            },
2600            MzAclItem {
2601                grantee: other_role,
2602                grantor: new_owner,
2603                acl_mode: AclMode::SELECT,
2604            },
2605        ]);
2606        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2607        assert_eq!(1, privileges.all_values().count());
2608        assert_eq!(
2609            vec![MzAclItem {
2610                grantee: other_role,
2611                grantor: new_owner,
2612                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2613            }],
2614            privileges.all_values_owned().collect::<Vec<_>>()
2615        );
2616
2617        // older owner exists as grantee.
2618        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2619            MzAclItem {
2620                grantee: old_owner,
2621                grantor: other_role,
2622                acl_mode: AclMode::UPDATE,
2623            },
2624            MzAclItem {
2625                grantee: new_owner,
2626                grantor: other_role,
2627                acl_mode: AclMode::SELECT,
2628            },
2629        ]);
2630        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2631        assert_eq!(1, privileges.all_values().count());
2632        assert_eq!(
2633            vec![MzAclItem {
2634                grantee: new_owner,
2635                grantor: other_role,
2636                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2637            }],
2638            privileges.all_values_owned().collect::<Vec<_>>()
2639        );
2640
2641        // older owner exists as grantee and grantor.
2642        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2643            MzAclItem {
2644                grantee: old_owner,
2645                grantor: old_owner,
2646                acl_mode: AclMode::UPDATE,
2647            },
2648            MzAclItem {
2649                grantee: new_owner,
2650                grantor: new_owner,
2651                acl_mode: AclMode::SELECT,
2652            },
2653        ]);
2654        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2655        assert_eq!(1, privileges.all_values().count());
2656        assert_eq!(
2657            vec![MzAclItem {
2658                grantee: new_owner,
2659                grantor: new_owner,
2660                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2661            }],
2662            privileges.all_values_owned().collect::<Vec<_>>()
2663        );
2664    }
2665}