mz_adapter/catalog/
transact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to executing catalog transactions.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14use std::time::Duration;
15
16use itertools::Itertools;
17use mz_adapter_types::compaction::CompactionWindow;
18use mz_adapter_types::connection::ConnectionId;
19use mz_adapter_types::dyncfgs::{
20    ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
21    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
22};
23use mz_audit_log::{
24    CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
25    ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
26};
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::BuiltinLog;
29use mz_catalog::durable::{NetworkPolicy, Transaction};
30use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
31use mz_catalog::memory::objects::{
32    CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate,
33    StateUpdateKind, TemporaryItem,
34};
35use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
36use mz_controller_types::{ClusterId, ReplicaId};
37use mz_ore::collections::HashSet;
38use mz_ore::instrument;
39use mz_ore::now::EpochMillis;
40use mz_persist_types::ShardId;
41use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap, merge_mz_acl_items};
42use mz_repr::network_policy_id::NetworkPolicyId;
43use mz_repr::role_id::RoleId;
44use mz_repr::{CatalogItemId, ColumnName, ColumnType, Diff, GlobalId, strconv};
45use mz_sql::ast::RawDataType;
46use mz_sql::catalog::{
47    CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
48    CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, RoleAttributes, RoleMembership,
49    RoleVars,
50};
51use mz_sql::names::{
52    CommentObjectId, DatabaseId, FullItemName, ObjectId, QualifiedItemName,
53    ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
54};
55use mz_sql::plan::{NetworkPolicyRule, PlanError};
56use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID};
57use mz_sql::session::vars::OwnedVarInput;
58use mz_sql::session::vars::{Value as VarValue, VarInput};
59use mz_sql::{DEFAULT_SCHEMA, rbac};
60use mz_sql_parser::ast::{QualifiedReplica, Value};
61use mz_storage_client::storage_collections::StorageCollections;
62use tracing::{info, trace};
63
64use crate::AdapterError;
65use crate::catalog::{
66    BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant,
67    catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name,
68    is_reserved_role_name, object_type_to_audit_object_type,
69    system_object_type_to_audit_object_type,
70};
71use crate::coord::ConnMeta;
72use crate::coord::cluster_scheduling::SchedulingDecision;
73use crate::util::ResultExt;
74
75#[derive(Debug, Clone)]
76pub enum Op {
77    AlterRetainHistory {
78        id: CatalogItemId,
79        value: Option<Value>,
80        window: CompactionWindow,
81    },
82    AlterRole {
83        id: RoleId,
84        name: String,
85        attributes: RoleAttributes,
86        vars: RoleVars,
87    },
88    AlterNetworkPolicy {
89        id: NetworkPolicyId,
90        rules: Vec<NetworkPolicyRule>,
91        name: String,
92        owner_id: RoleId,
93    },
94    AlterAddColumn {
95        id: CatalogItemId,
96        new_global_id: GlobalId,
97        name: ColumnName,
98        typ: ColumnType,
99        sql: RawDataType,
100    },
101    CreateDatabase {
102        name: String,
103        owner_id: RoleId,
104    },
105    CreateSchema {
106        database_id: ResolvedDatabaseSpecifier,
107        schema_name: String,
108        owner_id: RoleId,
109    },
110    CreateRole {
111        name: String,
112        attributes: RoleAttributes,
113    },
114    CreateCluster {
115        id: ClusterId,
116        name: String,
117        introspection_sources: Vec<&'static BuiltinLog>,
118        owner_id: RoleId,
119        config: ClusterConfig,
120    },
121    CreateClusterReplica {
122        cluster_id: ClusterId,
123        name: String,
124        config: ReplicaConfig,
125        owner_id: RoleId,
126        reason: ReplicaCreateDropReason,
127    },
128    CreateItem {
129        id: CatalogItemId,
130        name: QualifiedItemName,
131        item: CatalogItem,
132        owner_id: RoleId,
133    },
134    CreateNetworkPolicy {
135        rules: Vec<NetworkPolicyRule>,
136        name: String,
137        owner_id: RoleId,
138    },
139    Comment {
140        object_id: CommentObjectId,
141        sub_component: Option<usize>,
142        comment: Option<String>,
143    },
144    DropObjects(Vec<DropObjectInfo>),
145    GrantRole {
146        role_id: RoleId,
147        member_id: RoleId,
148        grantor_id: RoleId,
149    },
150    RenameCluster {
151        id: ClusterId,
152        name: String,
153        to_name: String,
154        check_reserved_names: bool,
155    },
156    RenameClusterReplica {
157        cluster_id: ClusterId,
158        replica_id: ReplicaId,
159        name: QualifiedReplica,
160        to_name: String,
161    },
162    RenameItem {
163        id: CatalogItemId,
164        current_full_name: FullItemName,
165        to_name: String,
166    },
167    RenameSchema {
168        database_spec: ResolvedDatabaseSpecifier,
169        schema_spec: SchemaSpecifier,
170        new_name: String,
171        check_reserved_names: bool,
172    },
173    UpdateOwner {
174        id: ObjectId,
175        new_owner: RoleId,
176    },
177    UpdatePrivilege {
178        target_id: SystemObjectId,
179        privilege: MzAclItem,
180        variant: UpdatePrivilegeVariant,
181    },
182    UpdateDefaultPrivilege {
183        privilege_object: DefaultPrivilegeObject,
184        privilege_acl_item: DefaultPrivilegeAclItem,
185        variant: UpdatePrivilegeVariant,
186    },
187    RevokeRole {
188        role_id: RoleId,
189        member_id: RoleId,
190        grantor_id: RoleId,
191    },
192    UpdateClusterConfig {
193        id: ClusterId,
194        name: String,
195        config: ClusterConfig,
196    },
197    UpdateClusterReplicaConfig {
198        cluster_id: ClusterId,
199        replica_id: ReplicaId,
200        config: ReplicaConfig,
201    },
202    UpdateItem {
203        id: CatalogItemId,
204        name: QualifiedItemName,
205        to_item: CatalogItem,
206    },
207    UpdateSourceReferences {
208        source_id: CatalogItemId,
209        references: SourceReferences,
210    },
211    UpdateSystemConfiguration {
212        name: String,
213        value: OwnedVarInput,
214    },
215    ResetSystemConfiguration {
216        name: String,
217    },
218    ResetAllSystemConfiguration,
219    /// Performs updates to the storage usage table, which probably should be a builtin source.
220    ///
221    /// TODO(jkosh44) In a multi-writer or high availability catalog world, this
222    /// might not work. If a process crashes after collecting storage usage events
223    /// but before updating the builtin table, then another listening catalog
224    /// will never know to update the builtin table.
225    WeirdStorageUsageUpdates {
226        object_id: Option<String>,
227        size_bytes: u64,
228        collection_timestamp: EpochMillis,
229    },
230    /// Performs a dry run of the commit, but errors with
231    /// [`AdapterError::TransactionDryRun`].
232    ///
233    /// When using this value, it should be included only as the last element of
234    /// the transaction and should not be the only value in the transaction.
235    TransactionDryRun,
236}
237
238/// Almost the same as `ObjectId`, but the `ClusterReplica` case has an extra
239/// `ReplicaCreateDropReason` field. This is forwarded to `mz_audit_events.details` when applying
240/// the `Op::DropObjects`.
241#[derive(Debug, Clone)]
242pub enum DropObjectInfo {
243    Cluster(ClusterId),
244    ClusterReplica((ClusterId, ReplicaId, ReplicaCreateDropReason)),
245    Database(DatabaseId),
246    Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)),
247    Role(RoleId),
248    Item(CatalogItemId),
249    NetworkPolicy(NetworkPolicyId),
250}
251
252impl DropObjectInfo {
253    /// Creates a `DropObjectInfo` from an `ObjectId`.
254    /// If it is a `ClusterReplica`, the reason will be set to `ReplicaCreateDropReason::Manual`.
255    pub(crate) fn manual_drop_from_object_id(id: ObjectId) -> Self {
256        match id {
257            ObjectId::Cluster(cluster_id) => DropObjectInfo::Cluster(cluster_id),
258            ObjectId::ClusterReplica((cluster_id, replica_id)) => DropObjectInfo::ClusterReplica((
259                cluster_id,
260                replica_id,
261                ReplicaCreateDropReason::Manual,
262            )),
263            ObjectId::Database(database_id) => DropObjectInfo::Database(database_id),
264            ObjectId::Schema(schema) => DropObjectInfo::Schema(schema),
265            ObjectId::Role(role_id) => DropObjectInfo::Role(role_id),
266            ObjectId::Item(item_id) => DropObjectInfo::Item(item_id),
267            ObjectId::NetworkPolicy(policy_id) => DropObjectInfo::NetworkPolicy(policy_id),
268        }
269    }
270
271    /// Creates an `ObjectId` from a `DropObjectInfo`.
272    /// Loses the `ReplicaCreateDropReason` if there is one!
273    fn to_object_id(&self) -> ObjectId {
274        match &self {
275            DropObjectInfo::Cluster(cluster_id) => ObjectId::Cluster(cluster_id.clone()),
276            DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
277                ObjectId::ClusterReplica((cluster_id.clone(), replica_id.clone()))
278            }
279            DropObjectInfo::Database(database_id) => ObjectId::Database(database_id.clone()),
280            DropObjectInfo::Schema(schema) => ObjectId::Schema(schema.clone()),
281            DropObjectInfo::Role(role_id) => ObjectId::Role(role_id.clone()),
282            DropObjectInfo::Item(item_id) => ObjectId::Item(item_id.clone()),
283            DropObjectInfo::NetworkPolicy(network_policy_id) => {
284                ObjectId::NetworkPolicy(network_policy_id.clone())
285            }
286        }
287    }
288}
289
290/// The reason for creating or dropping a replica.
291#[derive(Debug, Clone)]
292pub enum ReplicaCreateDropReason {
293    /// The user initiated the replica create or drop, e.g., by
294    /// - creating/dropping a cluster,
295    /// - ALTERing various options on a managed cluster,
296    /// - CREATE/DROP CLUSTER REPLICA on an unmanaged cluster.
297    Manual,
298    /// The automated cluster scheduling initiated the replica create or drop, e.g., a
299    /// materialized view is needing a refresh on a SCHEDULE ON REFRESH cluster.
300    ClusterScheduling(Vec<SchedulingDecision>),
301}
302
303impl ReplicaCreateDropReason {
304    pub fn into_audit_log(
305        self,
306    ) -> (
307        CreateOrDropClusterReplicaReasonV1,
308        Option<SchedulingDecisionsWithReasonsV2>,
309    ) {
310        let (reason, scheduling_policies) = match self {
311            ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
312            ReplicaCreateDropReason::ClusterScheduling(scheduling_decisions) => (
313                CreateOrDropClusterReplicaReasonV1::Schedule,
314                Some(scheduling_decisions),
315            ),
316        };
317        (
318            reason,
319            scheduling_policies
320                .as_ref()
321                .map(SchedulingDecision::reasons_to_audit_log_reasons),
322        )
323    }
324}
325
326pub struct TransactionResult {
327    pub builtin_table_updates: Vec<BuiltinTableUpdate>,
328    pub audit_events: Vec<VersionedEvent>,
329}
330
331impl Catalog {
332    fn should_audit_log_item(item: &CatalogItem) -> bool {
333        !item.is_temporary()
334    }
335
336    /// Gets [`CatalogItemId`]s of temporary items to be created, checks for name collisions
337    /// within a connection id.
338    fn temporary_ids(
339        &self,
340        ops: &[Op],
341        temporary_drops: BTreeSet<(&ConnectionId, String)>,
342    ) -> Result<BTreeSet<CatalogItemId>, Error> {
343        let mut creating = BTreeSet::new();
344        let mut temporary_ids = BTreeSet::new();
345        for op in ops.iter() {
346            if let Op::CreateItem {
347                id,
348                name,
349                item,
350                owner_id: _,
351            } = op
352            {
353                if let Some(conn_id) = item.conn_id() {
354                    if self.item_exists_in_temp_schemas(conn_id, &name.item)
355                        && !temporary_drops.contains(&(conn_id, name.item.clone()))
356                        || creating.contains(&(conn_id, &name.item))
357                    {
358                        return Err(
359                            SqlCatalogError::ItemAlreadyExists(*id, name.item.clone()).into()
360                        );
361                    } else {
362                        creating.insert((conn_id, &name.item));
363                        temporary_ids.insert(id.clone());
364                    }
365                }
366            }
367        }
368        Ok(temporary_ids)
369    }
370
371    #[instrument(name = "catalog::transact")]
372    pub async fn transact(
373        &mut self,
374        // n.b. this is an option to prevent us from needing to build out a
375        // dummy impl of `StorageController` for tests.
376        storage_collections: Option<
377            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
378        >,
379        oracle_write_ts: mz_repr::Timestamp,
380        session: Option<&ConnMeta>,
381        ops: Vec<Op>,
382    ) -> Result<TransactionResult, AdapterError> {
383        trace!("transact: {:?}", ops);
384        fail::fail_point!("catalog_transact", |arg| {
385            Err(AdapterError::Unstructured(anyhow::anyhow!(
386                "failpoint: {arg:?}"
387            )))
388        });
389
390        let drop_ids: BTreeSet<CatalogItemId> = ops
391            .iter()
392            .filter_map(|op| match op {
393                Op::DropObjects(drop_object_infos) => {
394                    let ids = drop_object_infos.iter().map(|info| info.to_object_id());
395                    let item_ids = ids.filter_map(|id| match id {
396                        ObjectId::Item(id) => Some(id),
397                        _ => None,
398                    });
399                    Some(item_ids)
400                }
401                _ => None,
402            })
403            .flatten()
404            .collect();
405        let temporary_drops = drop_ids
406            .iter()
407            .filter_map(|id| {
408                let entry = self.get_entry(id);
409                match entry.item().conn_id() {
410                    Some(conn_id) => Some((conn_id, entry.name().item.clone())),
411                    None => None,
412                }
413            })
414            .collect();
415        let dropped_global_ids = drop_ids
416            .iter()
417            .flat_map(|item_id| self.get_global_ids(item_id))
418            .collect();
419
420        let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
421        let mut builtin_table_updates = vec![];
422        let mut audit_events = vec![];
423        let mut storage = self.storage().await;
424        let mut tx = storage
425            .transaction()
426            .await
427            .unwrap_or_terminate("starting catalog transaction");
428        // Prepare a candidate catalog state.
429        let mut state = self.state.clone();
430
431        Self::transact_inner(
432            storage_collections,
433            oracle_write_ts,
434            session,
435            ops,
436            temporary_ids,
437            &mut builtin_table_updates,
438            &mut audit_events,
439            &mut tx,
440            &mut state,
441        )
442        .await?;
443
444        // The user closure was successful, apply the updates. Terminate the
445        // process if this fails, because we have to restart envd due to
446        // indeterminate catalog state, which we only reconcile during catalog
447        // init.
448        tx.commit(oracle_write_ts)
449            .await
450            .unwrap_or_terminate("catalog storage transaction commit must succeed");
451
452        // Dropping here keeps the mutable borrow on self, preventing us accidentally
453        // mutating anything until after f is executed.
454        drop(storage);
455        self.state = state;
456        self.transient_revision += 1;
457
458        // Drop in-memory planning metadata.
459        let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
460        if self.state.system_config().enable_mz_notices() {
461            // Generate retractions for the Builtin tables.
462            self.state().pack_optimizer_notices(
463                &mut builtin_table_updates,
464                dropped_notices.iter(),
465                Diff::MINUS_ONE,
466            );
467        }
468
469        Ok(TransactionResult {
470            builtin_table_updates,
471            audit_events,
472        })
473    }
474
475    /// Performs the transaction described by `ops`.
476    ///
477    /// # Panics
478    /// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
479    ///   final element.
480    /// - If the only element of `ops` is [`Op::TransactionDryRun`].
481    #[instrument(name = "catalog::transact_inner")]
482    async fn transact_inner(
483        storage_collections: Option<
484            &mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
485        >,
486        oracle_write_ts: mz_repr::Timestamp,
487        session: Option<&ConnMeta>,
488        mut ops: Vec<Op>,
489        temporary_ids: BTreeSet<CatalogItemId>,
490        builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
491        audit_events: &mut Vec<VersionedEvent>,
492        tx: &mut Transaction<'_>,
493        state: &mut CatalogState,
494    ) -> Result<(), AdapterError> {
495        let dry_run_ops = match ops.last() {
496            Some(Op::TransactionDryRun) => {
497                // Remove dry run marker.
498                ops.pop();
499                assert!(!ops.is_empty(), "TransactionDryRun must not be the only op");
500                ops.clone()
501            }
502            Some(_) => vec![],
503            None => return Ok(()),
504        };
505
506        let mut storage_collections_to_create = BTreeSet::new();
507        let mut storage_collections_to_drop = BTreeSet::new();
508        let mut storage_collections_to_register = BTreeMap::new();
509
510        for op in ops {
511            let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
512                oracle_write_ts,
513                session,
514                op,
515                &temporary_ids,
516                audit_events,
517                tx,
518                state,
519                &mut storage_collections_to_create,
520                &mut storage_collections_to_drop,
521                &mut storage_collections_to_register,
522            )
523            .await?;
524
525            // Certain builtin tables are not derived from the durable catalog state, so they need
526            // to be updated ad-hoc based on the current transaction. This is weird and will not
527            // work if we ever want multi-writer catalogs or high availability (HA) catalogs. If
528            // this instance crashes after committing the durable catalog but before applying the
529            // weird builtin table updates, then they'll be lost forever in a multi-writer or HA
530            // world. Currently, this works fine and there are no correctness issues because
531            // whenever a Coordinator crashes, a new Coordinator starts up, and it will set the
532            // state of all builtin tables to the correct values.
533            if let Some(builtin_table_update) = weird_builtin_table_update {
534                builtin_table_updates.push(builtin_table_update);
535            }
536
537            // Temporary items are not stored in the durable catalog, so they need to be handled
538            // separately for updating state and builtin tables.
539            // TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
540            // in a multi-subscriber catalog world.
541            let op_id = tx.op_id().into();
542            let temporary_item_updates =
543                temporary_item_updates
544                    .into_iter()
545                    .map(|(item, diff)| StateUpdate {
546                        kind: StateUpdateKind::TemporaryItem(item),
547                        ts: op_id,
548                        diff,
549                    });
550
551            let mut updates: Vec<_> = tx.get_and_commit_op_updates();
552            updates.extend(temporary_item_updates);
553            let op_builtin_table_updates = state.apply_updates(updates)?;
554            let op_builtin_table_updates =
555                state.resolve_builtin_table_updates(op_builtin_table_updates);
556            builtin_table_updates.extend(op_builtin_table_updates);
557        }
558
559        if dry_run_ops.is_empty() {
560            // `storage_collections` should only be `None` for tests.
561            if let Some(c) = storage_collections {
562                c.prepare_state(
563                    tx,
564                    storage_collections_to_create,
565                    storage_collections_to_drop,
566                    storage_collections_to_register,
567                )
568                .await?;
569            }
570
571            let updates = tx.get_and_commit_op_updates();
572            let op_builtin_table_updates = state.apply_updates(updates)?;
573            let op_builtin_table_updates =
574                state.resolve_builtin_table_updates(op_builtin_table_updates);
575            builtin_table_updates.extend(op_builtin_table_updates);
576
577            Ok(())
578        } else {
579            Err(AdapterError::TransactionDryRun {
580                new_ops: dry_run_ops,
581                new_state: state.clone(),
582            })
583        }
584    }
585
586    /// Performs the transaction operation described by `op`. This function prepares the changes in
587    /// `tx`, but does not update `state`. `state` will be updated when applying the durable
588    /// changes.
589    ///
590    /// Optionally returns a builtin table update for any builtin table updates than cannot be
591    /// derived from the durable catalog state, and temporary item diffs. These are all very weird
592    /// scenarios and ideally in the future don't exist.
593    #[instrument]
594    async fn transact_op(
595        oracle_write_ts: mz_repr::Timestamp,
596        session: Option<&ConnMeta>,
597        op: Op,
598        temporary_ids: &BTreeSet<CatalogItemId>,
599        audit_events: &mut Vec<VersionedEvent>,
600        tx: &mut Transaction<'_>,
601        state: &CatalogState,
602        storage_collections_to_create: &mut BTreeSet<GlobalId>,
603        storage_collections_to_drop: &mut BTreeSet<GlobalId>,
604        storage_collections_to_register: &mut BTreeMap<GlobalId, ShardId>,
605    ) -> Result<(Option<BuiltinTableUpdate>, Vec<(TemporaryItem, StateDiff)>), AdapterError> {
606        let mut weird_builtin_table_update = None;
607        let mut temporary_item_updates = Vec::new();
608
609        match op {
610            Op::TransactionDryRun => {
611                unreachable!("TransactionDryRun can only be used a final element of ops")
612            }
613            Op::AlterRetainHistory { id, value, window } => {
614                let entry = state.get_entry(&id);
615                if id.is_system() {
616                    let name = entry.name();
617                    let full_name =
618                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
619                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
620                        full_name.to_string(),
621                    ))));
622                }
623
624                let mut new_entry = entry.clone();
625                let previous = new_entry
626                    .item
627                    .update_retain_history(value.clone(), window)
628                    .map_err(|_| {
629                        AdapterError::Catalog(Error::new(ErrorKind::Internal(
630                            "planner should have rejected invalid alter retain history item type"
631                                .to_string(),
632                        )))
633                    })?;
634
635                if Self::should_audit_log_item(new_entry.item()) {
636                    let details =
637                        EventDetails::AlterRetainHistoryV1(mz_audit_log::AlterRetainHistoryV1 {
638                            id: id.to_string(),
639                            old_history: previous.map(|previous| previous.to_string()),
640                            new_history: value.map(|v| v.to_string()),
641                        });
642                    CatalogState::add_to_audit_log(
643                        &state.system_configuration,
644                        oracle_write_ts,
645                        session,
646                        tx,
647                        audit_events,
648                        EventType::Alter,
649                        catalog_type_to_audit_object_type(new_entry.item().typ()),
650                        details,
651                    )?;
652                }
653
654                tx.update_item(id, new_entry.into())?;
655
656                Self::log_update(state, &id);
657            }
658            Op::AlterRole {
659                id,
660                name,
661                attributes,
662                vars,
663            } => {
664                state.ensure_not_reserved_role(&id)?;
665
666                let mut existing_role = state.get_role(&id).clone();
667                existing_role.attributes = attributes;
668                existing_role.vars = vars;
669                tx.update_role(id, existing_role.into())?;
670
671                CatalogState::add_to_audit_log(
672                    &state.system_configuration,
673                    oracle_write_ts,
674                    session,
675                    tx,
676                    audit_events,
677                    EventType::Alter,
678                    ObjectType::Role,
679                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
680                        id: id.to_string(),
681                        name: name.clone(),
682                    }),
683                )?;
684
685                info!("update role {name} ({id})");
686            }
687            Op::AlterNetworkPolicy {
688                id,
689                rules,
690                name,
691                owner_id: _owner_id,
692            } => {
693                let existing_policy = state.get_network_policy(&id).clone();
694                let mut policy: NetworkPolicy = existing_policy.into();
695                policy.rules = rules;
696                if is_reserved_name(&name) {
697                    return Err(AdapterError::Catalog(Error::new(
698                        ErrorKind::ReservedNetworkPolicyName(name),
699                    )));
700                }
701                tx.update_network_policy(id, policy.clone())?;
702
703                CatalogState::add_to_audit_log(
704                    &state.system_configuration,
705                    oracle_write_ts,
706                    session,
707                    tx,
708                    audit_events,
709                    EventType::Alter,
710                    ObjectType::NetworkPolicy,
711                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
712                        id: id.to_string(),
713                        name: name.clone(),
714                    }),
715                )?;
716
717                info!("update network policy {name} ({id})");
718            }
719            Op::AlterAddColumn {
720                id,
721                new_global_id,
722                name,
723                typ,
724                sql,
725            } => {
726                let mut new_entry = state.get_entry(&id).clone();
727                let version = new_entry.item.add_column(name, typ, sql)?;
728                // All versions of a table share the same shard, so it shouldn't matter what
729                // GlobalId we use here.
730                let shard_id = state
731                    .storage_metadata()
732                    .get_collection_shard(new_entry.latest_global_id())?;
733
734                // TODO(alter_table): Support adding columns to sources.
735                let CatalogItem::Table(table) = &mut new_entry.item else {
736                    return Err(AdapterError::Unsupported("adding columns to non-Table"));
737                };
738                table.collections.insert(version, new_global_id);
739
740                tx.update_item(id, new_entry.into())?;
741                storage_collections_to_register.insert(new_global_id, shard_id);
742            }
743            Op::CreateDatabase { name, owner_id } => {
744                let database_owner_privileges = vec![rbac::owner_privilege(
745                    mz_sql::catalog::ObjectType::Database,
746                    owner_id,
747                )];
748                let database_default_privileges = state
749                    .default_privileges
750                    .get_applicable_privileges(
751                        owner_id,
752                        None,
753                        None,
754                        mz_sql::catalog::ObjectType::Database,
755                    )
756                    .map(|item| item.mz_acl_item(owner_id));
757                let database_privileges: Vec<_> = merge_mz_acl_items(
758                    database_owner_privileges
759                        .into_iter()
760                        .chain(database_default_privileges),
761                )
762                .collect();
763
764                let schema_owner_privileges = vec![rbac::owner_privilege(
765                    mz_sql::catalog::ObjectType::Schema,
766                    owner_id,
767                )];
768                let schema_default_privileges = state
769                    .default_privileges
770                    .get_applicable_privileges(
771                        owner_id,
772                        None,
773                        None,
774                        mz_sql::catalog::ObjectType::Schema,
775                    )
776                    .map(|item| item.mz_acl_item(owner_id))
777                    // Special default privilege on public schemas.
778                    .chain(std::iter::once(MzAclItem {
779                        grantee: RoleId::Public,
780                        grantor: owner_id,
781                        acl_mode: AclMode::USAGE,
782                    }));
783                let schema_privileges: Vec<_> = merge_mz_acl_items(
784                    schema_owner_privileges
785                        .into_iter()
786                        .chain(schema_default_privileges),
787                )
788                .collect();
789
790                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
791                let (database_id, _) = tx.insert_user_database(
792                    &name,
793                    owner_id,
794                    database_privileges.clone(),
795                    &temporary_oids,
796                )?;
797                let (schema_id, _) = tx.insert_user_schema(
798                    database_id,
799                    DEFAULT_SCHEMA,
800                    owner_id,
801                    schema_privileges.clone(),
802                    &temporary_oids,
803                )?;
804                CatalogState::add_to_audit_log(
805                    &state.system_configuration,
806                    oracle_write_ts,
807                    session,
808                    tx,
809                    audit_events,
810                    EventType::Create,
811                    ObjectType::Database,
812                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
813                        id: database_id.to_string(),
814                        name: name.clone(),
815                    }),
816                )?;
817                info!("create database {}", name);
818
819                CatalogState::add_to_audit_log(
820                    &state.system_configuration,
821                    oracle_write_ts,
822                    session,
823                    tx,
824                    audit_events,
825                    EventType::Create,
826                    ObjectType::Schema,
827                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
828                        id: schema_id.to_string(),
829                        name: DEFAULT_SCHEMA.to_string(),
830                        database_name: Some(name),
831                    }),
832                )?;
833            }
834            Op::CreateSchema {
835                database_id,
836                schema_name,
837                owner_id,
838            } => {
839                if is_reserved_name(&schema_name) {
840                    return Err(AdapterError::Catalog(Error::new(
841                        ErrorKind::ReservedSchemaName(schema_name),
842                    )));
843                }
844                let database_id = match database_id {
845                    ResolvedDatabaseSpecifier::Id(id) => id,
846                    ResolvedDatabaseSpecifier::Ambient => {
847                        return Err(AdapterError::Catalog(Error::new(
848                            ErrorKind::ReadOnlySystemSchema(schema_name),
849                        )));
850                    }
851                };
852                let owner_privileges = vec![rbac::owner_privilege(
853                    mz_sql::catalog::ObjectType::Schema,
854                    owner_id,
855                )];
856                let default_privileges = state
857                    .default_privileges
858                    .get_applicable_privileges(
859                        owner_id,
860                        Some(database_id),
861                        None,
862                        mz_sql::catalog::ObjectType::Schema,
863                    )
864                    .map(|item| item.mz_acl_item(owner_id));
865                let privileges: Vec<_> =
866                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
867                        .collect();
868                let (schema_id, _) = tx.insert_user_schema(
869                    database_id,
870                    &schema_name,
871                    owner_id,
872                    privileges.clone(),
873                    &state.get_temporary_oids().collect(),
874                )?;
875                CatalogState::add_to_audit_log(
876                    &state.system_configuration,
877                    oracle_write_ts,
878                    session,
879                    tx,
880                    audit_events,
881                    EventType::Create,
882                    ObjectType::Schema,
883                    EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
884                        id: schema_id.to_string(),
885                        name: schema_name.clone(),
886                        database_name: Some(state.database_by_id[&database_id].name.clone()),
887                    }),
888                )?;
889            }
890            Op::CreateRole { name, attributes } => {
891                if is_reserved_role_name(&name) {
892                    return Err(AdapterError::Catalog(Error::new(
893                        ErrorKind::ReservedRoleName(name),
894                    )));
895                }
896                let membership = RoleMembership::new();
897                let vars = RoleVars::default();
898                let (id, _) = tx.insert_user_role(
899                    name.clone(),
900                    attributes.clone(),
901                    membership.clone(),
902                    vars.clone(),
903                    &state.get_temporary_oids().collect(),
904                )?;
905                CatalogState::add_to_audit_log(
906                    &state.system_configuration,
907                    oracle_write_ts,
908                    session,
909                    tx,
910                    audit_events,
911                    EventType::Create,
912                    ObjectType::Role,
913                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
914                        id: id.to_string(),
915                        name: name.clone(),
916                    }),
917                )?;
918                info!("create role {}", name);
919            }
920            Op::CreateCluster {
921                id,
922                name,
923                introspection_sources,
924                owner_id,
925                config,
926            } => {
927                if is_reserved_name(&name) {
928                    return Err(AdapterError::Catalog(Error::new(
929                        ErrorKind::ReservedClusterName(name),
930                    )));
931                }
932                let owner_privileges = vec![rbac::owner_privilege(
933                    mz_sql::catalog::ObjectType::Cluster,
934                    owner_id,
935                )];
936                let default_privileges = state
937                    .default_privileges
938                    .get_applicable_privileges(
939                        owner_id,
940                        None,
941                        None,
942                        mz_sql::catalog::ObjectType::Cluster,
943                    )
944                    .map(|item| item.mz_acl_item(owner_id));
945                let privileges: Vec<_> =
946                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
947                        .collect();
948                let introspection_source_ids: Vec<_> = introspection_sources
949                    .iter()
950                    .map(|introspection_source| {
951                        Transaction::allocate_introspection_source_index_id(
952                            &id,
953                            introspection_source.variant,
954                        )
955                    })
956                    .collect();
957
958                let introspection_sources = introspection_sources
959                    .into_iter()
960                    .zip_eq(introspection_source_ids)
961                    .map(|(log, (item_id, gid))| (log, item_id, gid))
962                    .collect();
963
964                tx.insert_user_cluster(
965                    id,
966                    &name,
967                    introspection_sources,
968                    owner_id,
969                    privileges.clone(),
970                    config.clone().into(),
971                    &state.get_temporary_oids().collect(),
972                )?;
973                CatalogState::add_to_audit_log(
974                    &state.system_configuration,
975                    oracle_write_ts,
976                    session,
977                    tx,
978                    audit_events,
979                    EventType::Create,
980                    ObjectType::Cluster,
981                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
982                        id: id.to_string(),
983                        name: name.clone(),
984                    }),
985                )?;
986                info!("create cluster {}", name);
987            }
988            Op::CreateClusterReplica {
989                cluster_id,
990                name,
991                config,
992                owner_id,
993                reason,
994            } => {
995                if is_reserved_name(&name) {
996                    return Err(AdapterError::Catalog(Error::new(
997                        ErrorKind::ReservedReplicaName(name),
998                    )));
999                }
1000                let cluster = state.get_cluster(cluster_id);
1001                let id =
1002                    tx.insert_cluster_replica(cluster_id, &name, config.clone().into(), owner_id)?;
1003                if let ReplicaLocation::Managed(ManagedReplicaLocation {
1004                    size,
1005                    disk,
1006                    billed_as,
1007                    internal,
1008                    ..
1009                }) = &config.location
1010                {
1011                    let (reason, scheduling_policies) = reason.into_audit_log();
1012                    let details = EventDetails::CreateClusterReplicaV3(
1013                        mz_audit_log::CreateClusterReplicaV3 {
1014                            cluster_id: cluster_id.to_string(),
1015                            cluster_name: cluster.name.clone(),
1016                            replica_id: Some(id.to_string()),
1017                            replica_name: name.clone(),
1018                            logical_size: size.clone(),
1019                            disk: *disk,
1020                            billed_as: billed_as.clone(),
1021                            internal: *internal,
1022                            reason,
1023                            scheduling_policies,
1024                        },
1025                    );
1026                    CatalogState::add_to_audit_log(
1027                        &state.system_configuration,
1028                        oracle_write_ts,
1029                        session,
1030                        tx,
1031                        audit_events,
1032                        EventType::Create,
1033                        ObjectType::ClusterReplica,
1034                        details,
1035                    )?;
1036                }
1037            }
1038            Op::CreateItem {
1039                id,
1040                name,
1041                item,
1042                owner_id,
1043            } => {
1044                state.check_unstable_dependencies(&item)?;
1045
1046                match &item {
1047                    CatalogItem::Table(table) => {
1048                        let gids: Vec<_> = table.global_ids().collect();
1049                        assert_eq!(gids.len(), 1);
1050                        storage_collections_to_create.extend(gids);
1051                    }
1052                    CatalogItem::Source(source) => {
1053                        storage_collections_to_create.insert(source.global_id());
1054                    }
1055                    CatalogItem::MaterializedView(mv) => {
1056                        storage_collections_to_create.insert(mv.global_id());
1057                    }
1058                    CatalogItem::ContinualTask(ct) => {
1059                        storage_collections_to_create.insert(ct.global_id());
1060                    }
1061                    CatalogItem::Sink(sink) => {
1062                        storage_collections_to_create.insert(sink.global_id());
1063                    }
1064                    CatalogItem::Log(_)
1065                    | CatalogItem::View(_)
1066                    | CatalogItem::Index(_)
1067                    | CatalogItem::Type(_)
1068                    | CatalogItem::Func(_)
1069                    | CatalogItem::Secret(_)
1070                    | CatalogItem::Connection(_) => (),
1071                }
1072
1073                let system_user = session.map_or(false, |s| s.user().is_system_user());
1074                if !system_user {
1075                    if let Some(id @ ClusterId::System(_)) = item.cluster_id() {
1076                        let cluster_name = state.clusters_by_id[&id].name.clone();
1077                        return Err(AdapterError::Catalog(Error::new(
1078                            ErrorKind::ReadOnlyCluster(cluster_name),
1079                        )));
1080                    }
1081                }
1082
1083                let owner_privileges = vec![rbac::owner_privilege(item.typ().into(), owner_id)];
1084                let default_privileges = state
1085                    .default_privileges
1086                    .get_applicable_privileges(
1087                        owner_id,
1088                        name.qualifiers.database_spec.id(),
1089                        Some(name.qualifiers.schema_spec.into()),
1090                        item.typ().into(),
1091                    )
1092                    .map(|item| item.mz_acl_item(owner_id));
1093                // mz_support can read all progress sources.
1094                let progress_source_privilege = if item.is_progress_source() {
1095                    Some(MzAclItem {
1096                        grantee: MZ_SUPPORT_ROLE_ID,
1097                        grantor: owner_id,
1098                        acl_mode: AclMode::SELECT,
1099                    })
1100                } else {
1101                    None
1102                };
1103                let privileges: Vec<_> = merge_mz_acl_items(
1104                    owner_privileges
1105                        .into_iter()
1106                        .chain(default_privileges)
1107                        .chain(progress_source_privilege),
1108                )
1109                .collect();
1110
1111                let temporary_oids = state.get_temporary_oids().collect();
1112
1113                if item.is_temporary() {
1114                    if name.qualifiers.database_spec != ResolvedDatabaseSpecifier::Ambient
1115                        || name.qualifiers.schema_spec != SchemaSpecifier::Temporary
1116                    {
1117                        return Err(AdapterError::Catalog(Error::new(
1118                            ErrorKind::InvalidTemporarySchema,
1119                        )));
1120                    }
1121                    let oid = tx.allocate_oid(&temporary_oids)?;
1122                    let item = TemporaryItem {
1123                        id,
1124                        oid,
1125                        name: name.clone(),
1126                        item: item.clone(),
1127                        owner_id,
1128                        privileges: PrivilegeMap::from_mz_acl_items(privileges),
1129                    };
1130                    temporary_item_updates.push((item, StateDiff::Addition));
1131                } else {
1132                    if let Some(temp_id) =
1133                        item.uses()
1134                            .iter()
1135                            .find(|id| match state.try_get_entry(*id) {
1136                                Some(entry) => entry.item().is_temporary(),
1137                                None => temporary_ids.contains(id),
1138                            })
1139                    {
1140                        let temp_item = state.get_entry(temp_id);
1141                        return Err(AdapterError::Catalog(Error::new(
1142                            ErrorKind::InvalidTemporaryDependency(temp_item.name().item.clone()),
1143                        )));
1144                    }
1145                    if name.qualifiers.database_spec == ResolvedDatabaseSpecifier::Ambient
1146                        && !system_user
1147                    {
1148                        let schema_name = state
1149                            .resolve_full_name(&name, session.map(|session| session.conn_id()))
1150                            .schema;
1151                        return Err(AdapterError::Catalog(Error::new(
1152                            ErrorKind::ReadOnlySystemSchema(schema_name),
1153                        )));
1154                    }
1155                    let schema_id = name.qualifiers.schema_spec.clone().into();
1156                    let item_type = item.typ();
1157                    let (create_sql, global_id, versions) = item.to_serialized();
1158                    tx.insert_user_item(
1159                        id,
1160                        global_id,
1161                        schema_id,
1162                        &name.item,
1163                        create_sql,
1164                        owner_id,
1165                        privileges.clone(),
1166                        &temporary_oids,
1167                        versions,
1168                    )?;
1169                    info!(
1170                        "create {} {} ({})",
1171                        item_type,
1172                        state.resolve_full_name(&name, None),
1173                        id
1174                    );
1175                }
1176
1177                if Self::should_audit_log_item(&item) {
1178                    let name = Self::full_name_detail(
1179                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
1180                    );
1181                    let details = match &item {
1182                        CatalogItem::Source(s) => {
1183                            let cluster_id = match s.data_source {
1184                                // Ingestion exports don't have their own cluster, but
1185                                // run on their ingestion's cluster.
1186                                DataSourceDesc::IngestionExport { ingestion_id, .. } => {
1187                                    match state.get_entry(&ingestion_id).cluster_id() {
1188                                        Some(cluster_id) => Some(cluster_id.to_string()),
1189                                        None => None,
1190                                    }
1191                                }
1192                                _ => match item.cluster_id() {
1193                                    Some(cluster_id) => Some(cluster_id.to_string()),
1194                                    None => None,
1195                                },
1196                            };
1197
1198                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1199                                id: id.to_string(),
1200                                cluster_id,
1201                                name,
1202                                external_type: s.source_type().to_string(),
1203                            })
1204                        }
1205                        CatalogItem::Sink(s) => {
1206                            EventDetails::CreateSourceSinkV4(mz_audit_log::CreateSourceSinkV4 {
1207                                id: id.to_string(),
1208                                cluster_id: Some(s.cluster_id.to_string()),
1209                                name,
1210                                external_type: s.sink_type().to_string(),
1211                            })
1212                        }
1213                        CatalogItem::Index(i) => {
1214                            EventDetails::CreateIndexV1(mz_audit_log::CreateIndexV1 {
1215                                id: id.to_string(),
1216                                name,
1217                                cluster_id: i.cluster_id.to_string(),
1218                            })
1219                        }
1220                        CatalogItem::MaterializedView(mv) => {
1221                            EventDetails::CreateMaterializedViewV1(
1222                                mz_audit_log::CreateMaterializedViewV1 {
1223                                    id: id.to_string(),
1224                                    name,
1225                                    cluster_id: mv.cluster_id.to_string(),
1226                                },
1227                            )
1228                        }
1229                        _ => EventDetails::IdFullNameV1(IdFullNameV1 {
1230                            id: id.to_string(),
1231                            name,
1232                        }),
1233                    };
1234                    CatalogState::add_to_audit_log(
1235                        &state.system_configuration,
1236                        oracle_write_ts,
1237                        session,
1238                        tx,
1239                        audit_events,
1240                        EventType::Create,
1241                        catalog_type_to_audit_object_type(item.typ()),
1242                        details,
1243                    )?;
1244                }
1245            }
1246            Op::CreateNetworkPolicy {
1247                rules,
1248                name,
1249                owner_id,
1250            } => {
1251                if state.network_policies_by_name.contains_key(&name) {
1252                    return Err(AdapterError::PlanError(PlanError::Catalog(
1253                        SqlCatalogError::NetworkPolicyAlreadyExists(name),
1254                    )));
1255                }
1256                if is_reserved_name(&name) {
1257                    return Err(AdapterError::Catalog(Error::new(
1258                        ErrorKind::ReservedNetworkPolicyName(name),
1259                    )));
1260                }
1261
1262                let owner_privileges = vec![rbac::owner_privilege(
1263                    mz_sql::catalog::ObjectType::NetworkPolicy,
1264                    owner_id,
1265                )];
1266                let default_privileges = state
1267                    .default_privileges
1268                    .get_applicable_privileges(
1269                        owner_id,
1270                        None,
1271                        None,
1272                        mz_sql::catalog::ObjectType::NetworkPolicy,
1273                    )
1274                    .map(|item| item.mz_acl_item(owner_id));
1275                let privileges: Vec<_> =
1276                    merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
1277                        .collect();
1278
1279                let temporary_oids: HashSet<_> = state.get_temporary_oids().collect();
1280                let id = tx.insert_user_network_policy(
1281                    name.clone(),
1282                    rules,
1283                    privileges,
1284                    owner_id,
1285                    &temporary_oids,
1286                )?;
1287
1288                CatalogState::add_to_audit_log(
1289                    &state.system_configuration,
1290                    oracle_write_ts,
1291                    session,
1292                    tx,
1293                    audit_events,
1294                    EventType::Create,
1295                    ObjectType::NetworkPolicy,
1296                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1297                        id: id.to_string(),
1298                        name: name.clone(),
1299                    }),
1300                )?;
1301
1302                info!("created network policy {name} ({id})");
1303            }
1304            Op::Comment {
1305                object_id,
1306                sub_component,
1307                comment,
1308            } => {
1309                tx.update_comment(object_id, sub_component, comment)?;
1310                let entry = state.get_comment_id_entry(&object_id);
1311                let should_log = entry
1312                    .map(|entry| Self::should_audit_log_item(entry.item()))
1313                    // Things that aren't catalog entries can't be temp, so should be logged.
1314                    .unwrap_or(true);
1315                // TODO: We need a conn_id to resolve schema names. This means that system-initiated
1316                // comments won't be logged for now.
1317                if let (Some(conn_id), true) =
1318                    (session.map(|session| session.conn_id()), should_log)
1319                {
1320                    CatalogState::add_to_audit_log(
1321                        &state.system_configuration,
1322                        oracle_write_ts,
1323                        session,
1324                        tx,
1325                        audit_events,
1326                        EventType::Comment,
1327                        comment_id_to_audit_object_type(object_id),
1328                        EventDetails::IdNameV1(IdNameV1 {
1329                            // CommentObjectIds don't have a great string representation, but debug will do for now.
1330                            id: format!("{object_id:?}"),
1331                            name: state.comment_id_to_audit_log_name(object_id, conn_id),
1332                        }),
1333                    )?;
1334                }
1335            }
1336            Op::UpdateSourceReferences {
1337                source_id,
1338                references,
1339            } => {
1340                tx.update_source_references(
1341                    source_id,
1342                    references
1343                        .references
1344                        .into_iter()
1345                        .map(|reference| reference.into())
1346                        .collect(),
1347                    references.updated_at,
1348                )?;
1349            }
1350            Op::DropObjects(drop_object_infos) => {
1351                // Generate all of the objects that need to get dropped.
1352                let delta = ObjectsToDrop::generate(drop_object_infos, state, session)?;
1353
1354                // Drop any associated comments.
1355                tx.drop_comments(&delta.comments)?;
1356
1357                // Drop any items.
1358                let (durable_items_to_drop, temporary_items_to_drop): (BTreeSet<_>, BTreeSet<_>) =
1359                    delta
1360                        .items
1361                        .iter()
1362                        .map(|id| id)
1363                        .partition(|id| !state.get_entry(*id).item().is_temporary());
1364                tx.remove_items(&durable_items_to_drop)?;
1365                temporary_item_updates.extend(temporary_items_to_drop.into_iter().map(|id| {
1366                    let entry = state.get_entry(&id);
1367                    (entry.clone().into(), StateDiff::Retraction)
1368                }));
1369
1370                for item_id in delta.items {
1371                    let entry = state.get_entry(&item_id);
1372
1373                    if entry.item().is_storage_collection() {
1374                        storage_collections_to_drop.extend(entry.global_ids());
1375                    }
1376
1377                    if state.source_references.contains_key(&item_id) {
1378                        tx.remove_source_references(item_id)?;
1379                    }
1380
1381                    if Self::should_audit_log_item(entry.item()) {
1382                        CatalogState::add_to_audit_log(
1383                            &state.system_configuration,
1384                            oracle_write_ts,
1385                            session,
1386                            tx,
1387                            audit_events,
1388                            EventType::Drop,
1389                            catalog_type_to_audit_object_type(entry.item().typ()),
1390                            EventDetails::IdFullNameV1(IdFullNameV1 {
1391                                id: item_id.to_string(),
1392                                name: Self::full_name_detail(&state.resolve_full_name(
1393                                    entry.name(),
1394                                    session.map(|session| session.conn_id()),
1395                                )),
1396                            }),
1397                        )?;
1398                    }
1399                    info!(
1400                        "drop {} {} ({})",
1401                        entry.item_type(),
1402                        state.resolve_full_name(entry.name(), entry.conn_id()),
1403                        item_id
1404                    );
1405                }
1406
1407                // Drop any schemas.
1408                let schemas = delta
1409                    .schemas
1410                    .iter()
1411                    .map(|(schema_spec, database_spec)| {
1412                        (SchemaId::from(schema_spec), *database_spec)
1413                    })
1414                    .collect();
1415                tx.remove_schemas(&schemas)?;
1416
1417                for (schema_spec, database_spec) in delta.schemas {
1418                    let schema = state.get_schema(
1419                        &database_spec,
1420                        &schema_spec,
1421                        session
1422                            .map(|session| session.conn_id())
1423                            .unwrap_or(&SYSTEM_CONN_ID),
1424                    );
1425
1426                    let schema_id = SchemaId::from(schema_spec);
1427                    let database_id = match database_spec {
1428                        ResolvedDatabaseSpecifier::Ambient => None,
1429                        ResolvedDatabaseSpecifier::Id(database_id) => Some(database_id),
1430                    };
1431
1432                    CatalogState::add_to_audit_log(
1433                        &state.system_configuration,
1434                        oracle_write_ts,
1435                        session,
1436                        tx,
1437                        audit_events,
1438                        EventType::Drop,
1439                        ObjectType::Schema,
1440                        EventDetails::SchemaV2(mz_audit_log::SchemaV2 {
1441                            id: schema_id.to_string(),
1442                            name: schema.name.schema.to_string(),
1443                            database_name: database_id
1444                                .map(|database_id| state.database_by_id[&database_id].name.clone()),
1445                        }),
1446                    )?;
1447                }
1448
1449                // Drop any databases.
1450                tx.remove_databases(&delta.databases)?;
1451
1452                for database_id in delta.databases {
1453                    let database = state.get_database(&database_id).clone();
1454
1455                    CatalogState::add_to_audit_log(
1456                        &state.system_configuration,
1457                        oracle_write_ts,
1458                        session,
1459                        tx,
1460                        audit_events,
1461                        EventType::Drop,
1462                        ObjectType::Database,
1463                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1464                            id: database_id.to_string(),
1465                            name: database.name.clone(),
1466                        }),
1467                    )?;
1468                }
1469
1470                // Drop any roles.
1471                tx.remove_user_roles(&delta.roles)?;
1472
1473                for role_id in delta.roles {
1474                    let role = state
1475                        .roles_by_id
1476                        .get(&role_id)
1477                        .expect("catalog out of sync");
1478
1479                    CatalogState::add_to_audit_log(
1480                        &state.system_configuration,
1481                        oracle_write_ts,
1482                        session,
1483                        tx,
1484                        audit_events,
1485                        EventType::Drop,
1486                        ObjectType::Role,
1487                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1488                            id: role.id.to_string(),
1489                            name: role.name.clone(),
1490                        }),
1491                    )?;
1492                    info!("drop role {}", role.name());
1493                }
1494
1495                // Drop any network policies.
1496                tx.remove_network_policies(&delta.network_policies)?;
1497
1498                for network_policy_id in delta.network_policies {
1499                    let policy = state
1500                        .network_policies_by_id
1501                        .get(&network_policy_id)
1502                        .expect("catalog out of sync");
1503
1504                    CatalogState::add_to_audit_log(
1505                        &state.system_configuration,
1506                        oracle_write_ts,
1507                        session,
1508                        tx,
1509                        audit_events,
1510                        EventType::Drop,
1511                        ObjectType::NetworkPolicy,
1512                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1513                            id: policy.id.to_string(),
1514                            name: policy.name.clone(),
1515                        }),
1516                    )?;
1517                    info!("drop network policy {}", policy.name.clone());
1518                }
1519
1520                // Drop any replicas.
1521                let replicas = delta.replicas.keys().copied().collect();
1522                tx.remove_cluster_replicas(&replicas)?;
1523
1524                for (replica_id, (cluster_id, reason)) in delta.replicas {
1525                    let cluster = state.get_cluster(cluster_id);
1526                    let replica = cluster.replica(replica_id).expect("Must exist");
1527
1528                    let (reason, scheduling_policies) = reason.into_audit_log();
1529                    let details =
1530                        EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1531                            cluster_id: cluster_id.to_string(),
1532                            cluster_name: cluster.name.clone(),
1533                            replica_id: Some(replica_id.to_string()),
1534                            replica_name: replica.name.clone(),
1535                            reason,
1536                            scheduling_policies,
1537                        });
1538                    CatalogState::add_to_audit_log(
1539                        &state.system_configuration,
1540                        oracle_write_ts,
1541                        session,
1542                        tx,
1543                        audit_events,
1544                        EventType::Drop,
1545                        ObjectType::ClusterReplica,
1546                        details,
1547                    )?;
1548                }
1549
1550                // Drop any clusters.
1551                tx.remove_clusters(&delta.clusters)?;
1552
1553                for cluster_id in delta.clusters {
1554                    let cluster = state.get_cluster(cluster_id);
1555
1556                    CatalogState::add_to_audit_log(
1557                        &state.system_configuration,
1558                        oracle_write_ts,
1559                        session,
1560                        tx,
1561                        audit_events,
1562                        EventType::Drop,
1563                        ObjectType::Cluster,
1564                        EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1565                            id: cluster.id.to_string(),
1566                            name: cluster.name.clone(),
1567                        }),
1568                    )?;
1569                }
1570            }
1571            Op::GrantRole {
1572                role_id,
1573                member_id,
1574                grantor_id,
1575            } => {
1576                state.ensure_not_reserved_role(&member_id)?;
1577                state.ensure_grantable_role(&role_id)?;
1578                if state.collect_role_membership(&role_id).contains(&member_id) {
1579                    let group_role = state.get_role(&role_id);
1580                    let member_role = state.get_role(&member_id);
1581                    return Err(AdapterError::Catalog(Error::new(
1582                        ErrorKind::CircularRoleMembership {
1583                            role_name: group_role.name().to_string(),
1584                            member_name: member_role.name().to_string(),
1585                        },
1586                    )));
1587                }
1588                let mut member_role = state.get_role(&member_id).clone();
1589                member_role.membership.map.insert(role_id, grantor_id);
1590                tx.update_role(member_id, member_role.into())?;
1591
1592                CatalogState::add_to_audit_log(
1593                    &state.system_configuration,
1594                    oracle_write_ts,
1595                    session,
1596                    tx,
1597                    audit_events,
1598                    EventType::Grant,
1599                    ObjectType::Role,
1600                    EventDetails::GrantRoleV2(mz_audit_log::GrantRoleV2 {
1601                        role_id: role_id.to_string(),
1602                        member_id: member_id.to_string(),
1603                        grantor_id: grantor_id.to_string(),
1604                        executed_by: session
1605                            .map(|session| session.authenticated_role_id())
1606                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1607                            .to_string(),
1608                    }),
1609                )?;
1610            }
1611            Op::RevokeRole {
1612                role_id,
1613                member_id,
1614                grantor_id,
1615            } => {
1616                state.ensure_not_reserved_role(&member_id)?;
1617                state.ensure_grantable_role(&role_id)?;
1618                let mut member_role = state.get_role(&member_id).clone();
1619                member_role.membership.map.remove(&role_id);
1620                tx.update_role(member_id, member_role.into())?;
1621
1622                CatalogState::add_to_audit_log(
1623                    &state.system_configuration,
1624                    oracle_write_ts,
1625                    session,
1626                    tx,
1627                    audit_events,
1628                    EventType::Revoke,
1629                    ObjectType::Role,
1630                    EventDetails::RevokeRoleV2(mz_audit_log::RevokeRoleV2 {
1631                        role_id: role_id.to_string(),
1632                        member_id: member_id.to_string(),
1633                        grantor_id: grantor_id.to_string(),
1634                        executed_by: session
1635                            .map(|session| session.authenticated_role_id())
1636                            .unwrap_or(&MZ_SYSTEM_ROLE_ID)
1637                            .to_string(),
1638                    }),
1639                )?;
1640            }
1641            Op::UpdatePrivilege {
1642                target_id,
1643                privilege,
1644                variant,
1645            } => {
1646                let update_privilege_fn = |privileges: &mut PrivilegeMap| match variant {
1647                    UpdatePrivilegeVariant::Grant => {
1648                        privileges.grant(privilege);
1649                    }
1650                    UpdatePrivilegeVariant::Revoke => {
1651                        privileges.revoke(&privilege);
1652                    }
1653                };
1654                match &target_id {
1655                    SystemObjectId::Object(object_id) => match object_id {
1656                        ObjectId::Cluster(id) => {
1657                            let mut cluster = state.get_cluster(*id).clone();
1658                            update_privilege_fn(&mut cluster.privileges);
1659                            tx.update_cluster(*id, cluster.into())?;
1660                        }
1661                        ObjectId::Database(id) => {
1662                            let mut database = state.get_database(id).clone();
1663                            update_privilege_fn(&mut database.privileges);
1664                            tx.update_database(*id, database.into())?;
1665                        }
1666                        ObjectId::NetworkPolicy(id) => {
1667                            let mut policy = state.get_network_policy(id).clone();
1668                            update_privilege_fn(&mut policy.privileges);
1669                            tx.update_network_policy(*id, policy.into())?;
1670                        }
1671                        ObjectId::Schema((database_spec, schema_spec)) => {
1672                            let schema_id = schema_spec.clone().into();
1673                            let mut schema = state
1674                                .get_schema(
1675                                    database_spec,
1676                                    schema_spec,
1677                                    session
1678                                        .map(|session| session.conn_id())
1679                                        .unwrap_or(&SYSTEM_CONN_ID),
1680                                )
1681                                .clone();
1682                            update_privilege_fn(&mut schema.privileges);
1683                            tx.update_schema(schema_id, schema.into())?;
1684                        }
1685                        ObjectId::Item(id) => {
1686                            let entry = state.get_entry(id);
1687                            let mut new_entry = entry.clone();
1688                            update_privilege_fn(&mut new_entry.privileges);
1689                            if !new_entry.item().is_temporary() {
1690                                tx.update_item(*id, new_entry.into())?;
1691                            } else {
1692                                temporary_item_updates
1693                                    .push((entry.clone().into(), StateDiff::Retraction));
1694                                temporary_item_updates
1695                                    .push((new_entry.into(), StateDiff::Addition));
1696                            }
1697                        }
1698                        ObjectId::Role(_) | ObjectId::ClusterReplica(_) => {}
1699                    },
1700                    SystemObjectId::System => {
1701                        let mut system_privileges = state.system_privileges.clone();
1702                        update_privilege_fn(&mut system_privileges);
1703                        let new_privilege =
1704                            system_privileges.get_acl_item(&privilege.grantee, &privilege.grantor);
1705                        tx.set_system_privilege(
1706                            privilege.grantee,
1707                            privilege.grantor,
1708                            new_privilege.map(|new_privilege| new_privilege.acl_mode),
1709                        )?;
1710                    }
1711                }
1712                let object_type = state.get_system_object_type(&target_id);
1713                let object_id_str = match &target_id {
1714                    SystemObjectId::System => "SYSTEM".to_string(),
1715                    SystemObjectId::Object(id) => id.to_string(),
1716                };
1717                CatalogState::add_to_audit_log(
1718                    &state.system_configuration,
1719                    oracle_write_ts,
1720                    session,
1721                    tx,
1722                    audit_events,
1723                    variant.into(),
1724                    system_object_type_to_audit_object_type(&object_type),
1725                    EventDetails::UpdatePrivilegeV1(mz_audit_log::UpdatePrivilegeV1 {
1726                        object_id: object_id_str,
1727                        grantee_id: privilege.grantee.to_string(),
1728                        grantor_id: privilege.grantor.to_string(),
1729                        privileges: privilege.acl_mode.to_string(),
1730                    }),
1731                )?;
1732            }
1733            Op::UpdateDefaultPrivilege {
1734                privilege_object,
1735                privilege_acl_item,
1736                variant,
1737            } => {
1738                let mut default_privileges = state.default_privileges.clone();
1739                match variant {
1740                    UpdatePrivilegeVariant::Grant => default_privileges
1741                        .grant(privilege_object.clone(), privilege_acl_item.clone()),
1742                    UpdatePrivilegeVariant::Revoke => {
1743                        default_privileges.revoke(&privilege_object, &privilege_acl_item)
1744                    }
1745                }
1746                let new_acl_mode = default_privileges
1747                    .get_privileges_for_grantee(&privilege_object, &privilege_acl_item.grantee);
1748                tx.set_default_privilege(
1749                    privilege_object.role_id,
1750                    privilege_object.database_id,
1751                    privilege_object.schema_id,
1752                    privilege_object.object_type,
1753                    privilege_acl_item.grantee,
1754                    new_acl_mode.cloned(),
1755                )?;
1756                CatalogState::add_to_audit_log(
1757                    &state.system_configuration,
1758                    oracle_write_ts,
1759                    session,
1760                    tx,
1761                    audit_events,
1762                    variant.into(),
1763                    object_type_to_audit_object_type(privilege_object.object_type),
1764                    EventDetails::AlterDefaultPrivilegeV1(mz_audit_log::AlterDefaultPrivilegeV1 {
1765                        role_id: privilege_object.role_id.to_string(),
1766                        database_id: privilege_object.database_id.map(|id| id.to_string()),
1767                        schema_id: privilege_object.schema_id.map(|id| id.to_string()),
1768                        grantee_id: privilege_acl_item.grantee.to_string(),
1769                        privileges: privilege_acl_item.acl_mode.to_string(),
1770                    }),
1771                )?;
1772            }
1773            Op::RenameCluster {
1774                id,
1775                name,
1776                to_name,
1777                check_reserved_names,
1778            } => {
1779                if id.is_system() {
1780                    return Err(AdapterError::Catalog(Error::new(
1781                        ErrorKind::ReadOnlyCluster(name.clone()),
1782                    )));
1783                }
1784                if check_reserved_names && is_reserved_name(&to_name) {
1785                    return Err(AdapterError::Catalog(Error::new(
1786                        ErrorKind::ReservedClusterName(to_name),
1787                    )));
1788                }
1789                tx.rename_cluster(id, &name, &to_name)?;
1790                CatalogState::add_to_audit_log(
1791                    &state.system_configuration,
1792                    oracle_write_ts,
1793                    session,
1794                    tx,
1795                    audit_events,
1796                    EventType::Alter,
1797                    ObjectType::Cluster,
1798                    EventDetails::RenameClusterV1(mz_audit_log::RenameClusterV1 {
1799                        id: id.to_string(),
1800                        old_name: name.clone(),
1801                        new_name: to_name.clone(),
1802                    }),
1803                )?;
1804                info!("rename cluster {name} to {to_name}");
1805            }
1806            Op::RenameClusterReplica {
1807                cluster_id,
1808                replica_id,
1809                name,
1810                to_name,
1811            } => {
1812                if is_reserved_name(&to_name) {
1813                    return Err(AdapterError::Catalog(Error::new(
1814                        ErrorKind::ReservedReplicaName(to_name),
1815                    )));
1816                }
1817                tx.rename_cluster_replica(replica_id, &name, &to_name)?;
1818                CatalogState::add_to_audit_log(
1819                    &state.system_configuration,
1820                    oracle_write_ts,
1821                    session,
1822                    tx,
1823                    audit_events,
1824                    EventType::Alter,
1825                    ObjectType::ClusterReplica,
1826                    EventDetails::RenameClusterReplicaV1(mz_audit_log::RenameClusterReplicaV1 {
1827                        cluster_id: cluster_id.to_string(),
1828                        replica_id: replica_id.to_string(),
1829                        old_name: name.replica.as_str().to_string(),
1830                        new_name: to_name.clone(),
1831                    }),
1832                )?;
1833                info!("rename cluster replica {name} to {to_name}");
1834            }
1835            Op::RenameItem {
1836                id,
1837                to_name,
1838                current_full_name,
1839            } => {
1840                let mut updates = Vec::new();
1841
1842                let entry = state.get_entry(&id);
1843                if let CatalogItem::Type(_) = entry.item() {
1844                    return Err(AdapterError::Catalog(Error::new(ErrorKind::TypeRename(
1845                        current_full_name.to_string(),
1846                    ))));
1847                }
1848
1849                if entry.id().is_system() {
1850                    let name = state
1851                        .resolve_full_name(entry.name(), session.map(|session| session.conn_id()));
1852                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
1853                        name.to_string(),
1854                    ))));
1855                }
1856
1857                let mut to_full_name = current_full_name.clone();
1858                to_full_name.item.clone_from(&to_name);
1859
1860                let mut to_qualified_name = entry.name().clone();
1861                to_qualified_name.item.clone_from(&to_name);
1862
1863                let details = EventDetails::RenameItemV1(mz_audit_log::RenameItemV1 {
1864                    id: id.to_string(),
1865                    old_name: Self::full_name_detail(&current_full_name),
1866                    new_name: Self::full_name_detail(&to_full_name),
1867                });
1868                if Self::should_audit_log_item(entry.item()) {
1869                    CatalogState::add_to_audit_log(
1870                        &state.system_configuration,
1871                        oracle_write_ts,
1872                        session,
1873                        tx,
1874                        audit_events,
1875                        EventType::Alter,
1876                        catalog_type_to_audit_object_type(entry.item().typ()),
1877                        details,
1878                    )?;
1879                }
1880
1881                // Rename item itself.
1882                let mut new_entry = entry.clone();
1883                new_entry.name.item.clone_from(&to_name);
1884                new_entry.item = entry
1885                    .item()
1886                    .rename_item_refs(current_full_name.clone(), to_full_name.item.clone(), true)
1887                    .map_err(|e| {
1888                        Error::new(ErrorKind::from(AmbiguousRename {
1889                            depender: state
1890                                .resolve_full_name(entry.name(), entry.conn_id())
1891                                .to_string(),
1892                            dependee: state
1893                                .resolve_full_name(entry.name(), entry.conn_id())
1894                                .to_string(),
1895                            message: e,
1896                        }))
1897                    })?;
1898
1899                for id in entry.referenced_by() {
1900                    let dependent_item = state.get_entry(id);
1901                    let mut to_entry = dependent_item.clone();
1902                    to_entry.item = dependent_item
1903                        .item()
1904                        .rename_item_refs(
1905                            current_full_name.clone(),
1906                            to_full_name.item.clone(),
1907                            false,
1908                        )
1909                        .map_err(|e| {
1910                            Error::new(ErrorKind::from(AmbiguousRename {
1911                                depender: state
1912                                    .resolve_full_name(
1913                                        dependent_item.name(),
1914                                        dependent_item.conn_id(),
1915                                    )
1916                                    .to_string(),
1917                                dependee: state
1918                                    .resolve_full_name(entry.name(), entry.conn_id())
1919                                    .to_string(),
1920                                message: e,
1921                            }))
1922                        })?;
1923
1924                    if !to_entry.item().is_temporary() {
1925                        tx.update_item(*id, to_entry.into())?;
1926                    } else {
1927                        temporary_item_updates
1928                            .push((dependent_item.clone().into(), StateDiff::Retraction));
1929                        temporary_item_updates.push((to_entry.into(), StateDiff::Addition));
1930                    }
1931                    updates.push(*id);
1932                }
1933                if !new_entry.item().is_temporary() {
1934                    tx.update_item(id, new_entry.into())?;
1935                } else {
1936                    temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
1937                    temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
1938                }
1939
1940                updates.push(id);
1941                for id in updates {
1942                    Self::log_update(state, &id);
1943                }
1944            }
1945            Op::RenameSchema {
1946                database_spec,
1947                schema_spec,
1948                new_name,
1949                check_reserved_names,
1950            } => {
1951                if check_reserved_names && is_reserved_name(&new_name) {
1952                    return Err(AdapterError::Catalog(Error::new(
1953                        ErrorKind::ReservedSchemaName(new_name),
1954                    )));
1955                }
1956
1957                let conn_id = session
1958                    .map(|session| session.conn_id())
1959                    .unwrap_or(&SYSTEM_CONN_ID);
1960
1961                let schema = state.get_schema(&database_spec, &schema_spec, conn_id);
1962                let cur_name = schema.name().schema.clone();
1963
1964                let ResolvedDatabaseSpecifier::Id(database_id) = database_spec else {
1965                    return Err(AdapterError::Catalog(Error::new(
1966                        ErrorKind::AmbientSchemaRename(cur_name),
1967                    )));
1968                };
1969                let database = state.get_database(&database_id);
1970                let database_name = &database.name;
1971
1972                let mut updates = Vec::new();
1973                let mut items_to_update = BTreeMap::new();
1974
1975                let mut update_item = |id| {
1976                    if items_to_update.contains_key(id) {
1977                        return Ok(());
1978                    }
1979
1980                    let entry = state.get_entry(id);
1981
1982                    // Update our item.
1983                    let mut new_entry = entry.clone();
1984                    new_entry.item = entry
1985                        .item
1986                        .rename_schema_refs(database_name, &cur_name, &new_name)
1987                        .map_err(|(s, _i)| {
1988                            Error::new(ErrorKind::from(AmbiguousRename {
1989                                depender: state
1990                                    .resolve_full_name(entry.name(), entry.conn_id())
1991                                    .to_string(),
1992                                dependee: format!("{database_name}.{cur_name}"),
1993                                message: format!("ambiguous reference to schema named {s}"),
1994                            }))
1995                        })?;
1996
1997                    // Queue updates for Catalog storage and Builtin Tables.
1998                    if !new_entry.item().is_temporary() {
1999                        items_to_update.insert(*id, new_entry.into());
2000                    } else {
2001                        temporary_item_updates.push((entry.clone().into(), StateDiff::Retraction));
2002                        temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2003                    }
2004                    updates.push(id);
2005
2006                    Ok::<_, AdapterError>(())
2007                };
2008
2009                // Update all of the items in the schema.
2010                for (_name, item_id) in &schema.items {
2011                    // Update the item itself.
2012                    update_item(item_id)?;
2013
2014                    // Update everything that depends on this item.
2015                    for id in state.get_entry(item_id).referenced_by() {
2016                        update_item(id)?;
2017                    }
2018                }
2019                // Note: When updating the transaction it's very important that we update the
2020                // items as a whole group, otherwise we exhibit quadratic behavior.
2021                tx.update_items(items_to_update)?;
2022
2023                // Renaming temporary schemas is not supported.
2024                let SchemaSpecifier::Id(schema_id) = *schema.id() else {
2025                    let schema_name = schema.name().schema.clone();
2026                    return Err(AdapterError::Catalog(crate::catalog::Error::new(
2027                        crate::catalog::ErrorKind::ReadOnlySystemSchema(schema_name),
2028                    )));
2029                };
2030
2031                // Add an entry to the audit log.
2032                let database_name = database_spec
2033                    .id()
2034                    .map(|id| state.get_database(&id).name.clone());
2035                let details = EventDetails::RenameSchemaV1(mz_audit_log::RenameSchemaV1 {
2036                    id: schema_id.to_string(),
2037                    old_name: schema.name().schema.clone(),
2038                    new_name: new_name.clone(),
2039                    database_name,
2040                });
2041                CatalogState::add_to_audit_log(
2042                    &state.system_configuration,
2043                    oracle_write_ts,
2044                    session,
2045                    tx,
2046                    audit_events,
2047                    EventType::Alter,
2048                    mz_audit_log::ObjectType::Schema,
2049                    details,
2050                )?;
2051
2052                // Update the schema itself.
2053                let mut new_schema = schema.clone();
2054                new_schema.name.schema.clone_from(&new_name);
2055                tx.update_schema(schema_id, new_schema.into())?;
2056
2057                for id in updates {
2058                    Self::log_update(state, id);
2059                }
2060            }
2061            Op::UpdateOwner { id, new_owner } => {
2062                let conn_id = session
2063                    .map(|session| session.conn_id())
2064                    .unwrap_or(&SYSTEM_CONN_ID);
2065                let old_owner = state
2066                    .get_owner_id(&id, conn_id)
2067                    .expect("cannot update the owner of an object without an owner");
2068                match &id {
2069                    ObjectId::Cluster(id) => {
2070                        let mut cluster = state.get_cluster(*id).clone();
2071                        if id.is_system() {
2072                            return Err(AdapterError::Catalog(Error::new(
2073                                ErrorKind::ReadOnlyCluster(cluster.name),
2074                            )));
2075                        }
2076                        Self::update_privilege_owners(
2077                            &mut cluster.privileges,
2078                            cluster.owner_id,
2079                            new_owner,
2080                        );
2081                        cluster.owner_id = new_owner;
2082                        tx.update_cluster(*id, cluster.into())?;
2083                    }
2084                    ObjectId::ClusterReplica((cluster_id, replica_id)) => {
2085                        let cluster = state.get_cluster(*cluster_id);
2086                        let mut replica = cluster
2087                            .replica(*replica_id)
2088                            .expect("catalog out of sync")
2089                            .clone();
2090                        if replica_id.is_system() {
2091                            return Err(AdapterError::Catalog(Error::new(
2092                                ErrorKind::ReadOnlyClusterReplica(replica.name),
2093                            )));
2094                        }
2095                        replica.owner_id = new_owner;
2096                        tx.update_cluster_replica(*replica_id, replica.into())?;
2097                    }
2098                    ObjectId::Database(id) => {
2099                        let mut database = state.get_database(id).clone();
2100                        if id.is_system() {
2101                            return Err(AdapterError::Catalog(Error::new(
2102                                ErrorKind::ReadOnlyDatabase(database.name),
2103                            )));
2104                        }
2105                        Self::update_privilege_owners(
2106                            &mut database.privileges,
2107                            database.owner_id,
2108                            new_owner,
2109                        );
2110                        database.owner_id = new_owner;
2111                        tx.update_database(*id, database.clone().into())?;
2112                    }
2113                    ObjectId::Schema((database_spec, schema_spec)) => {
2114                        let schema_id: SchemaId = schema_spec.clone().into();
2115                        let mut schema = state
2116                            .get_schema(database_spec, schema_spec, conn_id)
2117                            .clone();
2118                        if schema_id.is_system() {
2119                            let name = schema.name();
2120                            let full_name = state.resolve_full_schema_name(name);
2121                            return Err(AdapterError::Catalog(Error::new(
2122                                ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2123                            )));
2124                        }
2125                        Self::update_privilege_owners(
2126                            &mut schema.privileges,
2127                            schema.owner_id,
2128                            new_owner,
2129                        );
2130                        schema.owner_id = new_owner;
2131                        tx.update_schema(schema_id, schema.into())?;
2132                    }
2133                    ObjectId::Item(id) => {
2134                        let entry = state.get_entry(id);
2135                        let mut new_entry = entry.clone();
2136                        if id.is_system() {
2137                            let full_name = state.resolve_full_name(
2138                                new_entry.name(),
2139                                session.map(|session| session.conn_id()),
2140                            );
2141                            return Err(AdapterError::Catalog(Error::new(
2142                                ErrorKind::ReadOnlyItem(full_name.to_string()),
2143                            )));
2144                        }
2145                        Self::update_privilege_owners(
2146                            &mut new_entry.privileges,
2147                            new_entry.owner_id,
2148                            new_owner,
2149                        );
2150                        new_entry.owner_id = new_owner;
2151                        if !new_entry.item().is_temporary() {
2152                            tx.update_item(*id, new_entry.into())?;
2153                        } else {
2154                            temporary_item_updates
2155                                .push((entry.clone().into(), StateDiff::Retraction));
2156                            temporary_item_updates.push((new_entry.into(), StateDiff::Addition));
2157                        }
2158                    }
2159                    ObjectId::NetworkPolicy(id) => {
2160                        let mut policy = state.get_network_policy(id).clone();
2161                        if id.is_system() {
2162                            return Err(AdapterError::Catalog(Error::new(
2163                                ErrorKind::ReadOnlyNetworkPolicy(policy.name),
2164                            )));
2165                        }
2166                        Self::update_privilege_owners(
2167                            &mut policy.privileges,
2168                            policy.owner_id,
2169                            new_owner,
2170                        );
2171                        policy.owner_id = new_owner;
2172                        tx.update_network_policy(*id, policy.into())?;
2173                    }
2174                    ObjectId::Role(_) => unreachable!("roles have no owner"),
2175                }
2176                let object_type = state.get_object_type(&id);
2177                CatalogState::add_to_audit_log(
2178                    &state.system_configuration,
2179                    oracle_write_ts,
2180                    session,
2181                    tx,
2182                    audit_events,
2183                    EventType::Alter,
2184                    object_type_to_audit_object_type(object_type),
2185                    EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
2186                        object_id: id.to_string(),
2187                        old_owner_id: old_owner.to_string(),
2188                        new_owner_id: new_owner.to_string(),
2189                    }),
2190                )?;
2191            }
2192            Op::UpdateClusterConfig { id, name, config } => {
2193                let mut cluster = state.get_cluster(id).clone();
2194                cluster.config = config;
2195                tx.update_cluster(id, cluster.into())?;
2196                info!("update cluster {}", name);
2197
2198                CatalogState::add_to_audit_log(
2199                    &state.system_configuration,
2200                    oracle_write_ts,
2201                    session,
2202                    tx,
2203                    audit_events,
2204                    EventType::Alter,
2205                    ObjectType::Cluster,
2206                    EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
2207                        id: id.to_string(),
2208                        name,
2209                    }),
2210                )?;
2211            }
2212            Op::UpdateClusterReplicaConfig {
2213                replica_id,
2214                cluster_id,
2215                config,
2216            } => {
2217                let replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
2218                info!("update replica {}", replica.name);
2219                tx.update_cluster_replica(
2220                    replica_id,
2221                    mz_catalog::durable::ClusterReplica {
2222                        cluster_id,
2223                        replica_id,
2224                        name: replica.name.clone(),
2225                        config: config.clone().into(),
2226                        owner_id: replica.owner_id,
2227                    },
2228                )?;
2229            }
2230            Op::UpdateItem { id, name, to_item } => {
2231                let mut entry = state.get_entry(&id).clone();
2232                entry.name = name.clone();
2233                entry.item = to_item.clone();
2234                tx.update_item(id, entry.into())?;
2235
2236                if Self::should_audit_log_item(&to_item) {
2237                    let mut full_name = Self::full_name_detail(
2238                        &state.resolve_full_name(&name, session.map(|session| session.conn_id())),
2239                    );
2240                    full_name.item = name.item;
2241
2242                    CatalogState::add_to_audit_log(
2243                        &state.system_configuration,
2244                        oracle_write_ts,
2245                        session,
2246                        tx,
2247                        audit_events,
2248                        EventType::Alter,
2249                        catalog_type_to_audit_object_type(to_item.typ()),
2250                        EventDetails::UpdateItemV1(mz_audit_log::UpdateItemV1 {
2251                            id: id.to_string(),
2252                            name: full_name,
2253                        }),
2254                    )?;
2255                }
2256
2257                Self::log_update(state, &id);
2258            }
2259            Op::UpdateSystemConfiguration { name, value } => {
2260                let parsed_value = state.parse_system_configuration(&name, value.borrow())?;
2261                tx.upsert_system_config(&name, parsed_value.clone())?;
2262                // This mirrors the `enable_0dt_deployment` "system var" into the catalog
2263                // storage "config" collection so that we can toggle the flag with
2264                // Launch Darkly, but use it in boot before Launch Darkly is available.
2265                if name == ENABLE_0DT_DEPLOYMENT.name() {
2266                    let enable_0dt_deployment =
2267                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2268                    tx.set_enable_0dt_deployment(enable_0dt_deployment)?;
2269                } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2270                    let with_0dt_deployment_max_wait =
2271                        Duration::parse(VarInput::Flat(&parsed_value))
2272                            .expect("parsing succeeded above");
2273                    tx.set_0dt_deployment_max_wait(with_0dt_deployment_max_wait)?;
2274                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2275                    let with_0dt_deployment_ddl_check_interval =
2276                        Duration::parse(VarInput::Flat(&parsed_value))
2277                            .expect("parsing succeeded above");
2278                    tx.set_0dt_deployment_ddl_check_interval(
2279                        with_0dt_deployment_ddl_check_interval,
2280                    )?;
2281                } else if name == ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name() {
2282                    let panic_after_timeout =
2283                        strconv::parse_bool(&parsed_value).expect("parsing succeeded above");
2284                    tx.set_enable_0dt_deployment_panic_after_timeout(panic_after_timeout)?;
2285                }
2286
2287                CatalogState::add_to_audit_log(
2288                    &state.system_configuration,
2289                    oracle_write_ts,
2290                    session,
2291                    tx,
2292                    audit_events,
2293                    EventType::Alter,
2294                    ObjectType::System,
2295                    EventDetails::SetV1(mz_audit_log::SetV1 {
2296                        name,
2297                        value: Some(value.borrow().to_vec().join(", ")),
2298                    }),
2299                )?;
2300            }
2301            Op::ResetSystemConfiguration { name } => {
2302                tx.remove_system_config(&name);
2303                // This mirrors the `enable_0dt_deployment` "system var" into the catalog
2304                // storage "config" collection so that we can toggle the flag with
2305                // Launch Darkly, but use it in boot before Launch Darkly is available.
2306                if name == ENABLE_0DT_DEPLOYMENT.name() {
2307                    tx.reset_enable_0dt_deployment()?;
2308                } else if name == WITH_0DT_DEPLOYMENT_MAX_WAIT.name() {
2309                    tx.reset_0dt_deployment_max_wait()?;
2310                } else if name == WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name() {
2311                    tx.reset_0dt_deployment_ddl_check_interval()?;
2312                }
2313
2314                CatalogState::add_to_audit_log(
2315                    &state.system_configuration,
2316                    oracle_write_ts,
2317                    session,
2318                    tx,
2319                    audit_events,
2320                    EventType::Alter,
2321                    ObjectType::System,
2322                    EventDetails::SetV1(mz_audit_log::SetV1 { name, value: None }),
2323                )?;
2324            }
2325            Op::ResetAllSystemConfiguration => {
2326                tx.clear_system_configs();
2327                tx.reset_enable_0dt_deployment()?;
2328                tx.reset_0dt_deployment_max_wait()?;
2329                tx.reset_0dt_deployment_ddl_check_interval()?;
2330
2331                CatalogState::add_to_audit_log(
2332                    &state.system_configuration,
2333                    oracle_write_ts,
2334                    session,
2335                    tx,
2336                    audit_events,
2337                    EventType::Alter,
2338                    ObjectType::System,
2339                    EventDetails::ResetAllV1,
2340                )?;
2341            }
2342            Op::WeirdStorageUsageUpdates {
2343                object_id,
2344                size_bytes,
2345                collection_timestamp,
2346            } => {
2347                let id = tx.allocate_storage_usage_ids()?;
2348                let metric =
2349                    VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2350                let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
2351                let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
2352                weird_builtin_table_update = Some(builtin_table_update);
2353            }
2354        };
2355        Ok((weird_builtin_table_update, temporary_item_updates))
2356    }
2357
2358    fn log_update(state: &CatalogState, id: &CatalogItemId) {
2359        let entry = state.get_entry(id);
2360        info!(
2361            "update {} {} ({})",
2362            entry.item_type(),
2363            state.resolve_full_name(entry.name(), entry.conn_id()),
2364            id
2365        );
2366    }
2367
2368    /// Update privileges to reflect the new owner. Based off of PostgreSQL's
2369    /// implementation:
2370    /// <https://github.com/postgres/postgres/blob/43a33ef54e503b61f269d088f2623ba3b9484ad7/src/backend/utils/adt/acl.c#L1078-L1177>
2371    fn update_privilege_owners(
2372        privileges: &mut PrivilegeMap,
2373        old_owner: RoleId,
2374        new_owner: RoleId,
2375    ) {
2376        // TODO(jkosh44) Would be nice not to clone every privilege.
2377        let mut flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2378
2379        let mut new_present = false;
2380        for privilege in flat_privileges.iter_mut() {
2381            // Old owner's granted privilege are updated to be granted by the new
2382            // owner.
2383            if privilege.grantor == old_owner {
2384                privilege.grantor = new_owner;
2385            } else if privilege.grantor == new_owner {
2386                new_present = true;
2387            }
2388            // Old owner's privileges is given to the new owner.
2389            if privilege.grantee == old_owner {
2390                privilege.grantee = new_owner;
2391            } else if privilege.grantee == new_owner {
2392                new_present = true;
2393            }
2394        }
2395
2396        // If the old privilege list contained references to the new owner, we may
2397        // have created duplicate entries. Here we try and consolidate them. This
2398        // is inspired by PostgreSQL's algorithm but not identical.
2399        if new_present {
2400            // Group privileges by (grantee, grantor).
2401            let privilege_map: BTreeMap<_, Vec<_>> =
2402                flat_privileges
2403                    .into_iter()
2404                    .fold(BTreeMap::new(), |mut accum, privilege| {
2405                        accum
2406                            .entry((privilege.grantee, privilege.grantor))
2407                            .or_default()
2408                            .push(privilege);
2409                        accum
2410                    });
2411
2412            // Consolidate and update all privileges.
2413            flat_privileges = privilege_map
2414                .into_iter()
2415                .map(|((grantee, grantor), values)|
2416                    // Combine the acl_mode of all mz_aclitems with the same grantee and grantor.
2417                    values.into_iter().fold(
2418                        MzAclItem::empty(grantee, grantor),
2419                        |mut accum, mz_aclitem| {
2420                            accum.acl_mode =
2421                                accum.acl_mode.union(mz_aclitem.acl_mode);
2422                            accum
2423                        },
2424                    ))
2425                .collect();
2426        }
2427
2428        *privileges = PrivilegeMap::from_mz_acl_items(flat_privileges);
2429    }
2430}
2431
2432/// All of the objects that need to be removed in response to an [`Op::DropObjects`].
2433///
2434/// Note: Previously we used to omit a single `Op::DropObject` for every object
2435/// we needed to drop. But removing a batch of objects from a durable Catalog
2436/// Transaction is O(n) where `n` is the number of objects that exist in the
2437/// Catalog. This resulted in an unacceptable `O(m * n)` performance for a
2438/// `DROP ... CASCADE` statement.
2439#[derive(Debug, Default)]
2440pub(crate) struct ObjectsToDrop {
2441    pub comments: BTreeSet<CommentObjectId>,
2442    pub databases: BTreeSet<DatabaseId>,
2443    pub schemas: BTreeMap<SchemaSpecifier, ResolvedDatabaseSpecifier>,
2444    pub clusters: BTreeSet<ClusterId>,
2445    pub replicas: BTreeMap<ReplicaId, (ClusterId, ReplicaCreateDropReason)>,
2446    pub roles: BTreeSet<RoleId>,
2447    pub items: Vec<CatalogItemId>,
2448    pub network_policies: BTreeSet<NetworkPolicyId>,
2449}
2450
2451impl ObjectsToDrop {
2452    pub fn generate(
2453        drop_object_infos: impl IntoIterator<Item = DropObjectInfo>,
2454        state: &CatalogState,
2455        session: Option<&ConnMeta>,
2456    ) -> Result<Self, AdapterError> {
2457        let mut delta = ObjectsToDrop::default();
2458
2459        for drop_object_info in drop_object_infos {
2460            delta.add_item(drop_object_info, state, session)?;
2461        }
2462
2463        Ok(delta)
2464    }
2465
2466    fn add_item(
2467        &mut self,
2468        drop_object_info: DropObjectInfo,
2469        state: &CatalogState,
2470        session: Option<&ConnMeta>,
2471    ) -> Result<(), AdapterError> {
2472        self.comments
2473            .insert(state.get_comment_id(drop_object_info.to_object_id()));
2474
2475        match drop_object_info {
2476            DropObjectInfo::Database(database_id) => {
2477                let database = &state.database_by_id[&database_id];
2478                if database_id.is_system() {
2479                    return Err(AdapterError::Catalog(Error::new(
2480                        ErrorKind::ReadOnlyDatabase(database.name().to_string()),
2481                    )));
2482                }
2483
2484                self.databases.insert(database_id);
2485            }
2486            DropObjectInfo::Schema((database_spec, schema_spec)) => {
2487                let schema = state.get_schema(
2488                    &database_spec,
2489                    &schema_spec,
2490                    session
2491                        .map(|session| session.conn_id())
2492                        .unwrap_or(&SYSTEM_CONN_ID),
2493                );
2494                let schema_id: SchemaId = schema_spec.into();
2495                if schema_id.is_system() {
2496                    let name = schema.name();
2497                    let full_name = state.resolve_full_schema_name(name);
2498                    return Err(AdapterError::Catalog(Error::new(
2499                        ErrorKind::ReadOnlySystemSchema(full_name.to_string()),
2500                    )));
2501                }
2502
2503                self.schemas.insert(schema_spec, database_spec);
2504            }
2505            DropObjectInfo::Role(role_id) => {
2506                let name = state.get_role(&role_id).name().to_string();
2507                if role_id.is_system() || role_id.is_predefined() {
2508                    return Err(AdapterError::Catalog(Error::new(
2509                        ErrorKind::ReservedRoleName(name.clone()),
2510                    )));
2511                }
2512                state.ensure_not_reserved_role(&role_id)?;
2513
2514                self.roles.insert(role_id);
2515            }
2516            DropObjectInfo::Cluster(cluster_id) => {
2517                let cluster = state.get_cluster(cluster_id);
2518                let name = &cluster.name;
2519                if cluster_id.is_system() {
2520                    return Err(AdapterError::Catalog(Error::new(
2521                        ErrorKind::ReadOnlyCluster(name.clone()),
2522                    )));
2523                }
2524
2525                self.clusters.insert(cluster_id);
2526            }
2527            DropObjectInfo::ClusterReplica((cluster_id, replica_id, reason)) => {
2528                let cluster = state.get_cluster(cluster_id);
2529                let replica = cluster.replica(replica_id).expect("Must exist");
2530
2531                self.replicas
2532                    .insert(replica.replica_id, (cluster.id, reason));
2533            }
2534            DropObjectInfo::Item(item_id) => {
2535                let entry = state.get_entry(&item_id);
2536                if item_id.is_system() {
2537                    let name = entry.name();
2538                    let full_name =
2539                        state.resolve_full_name(name, session.map(|session| session.conn_id()));
2540                    return Err(AdapterError::Catalog(Error::new(ErrorKind::ReadOnlyItem(
2541                        full_name.to_string(),
2542                    ))));
2543                }
2544
2545                self.items.push(item_id);
2546            }
2547            DropObjectInfo::NetworkPolicy(network_policy_id) => {
2548                let policy = state.get_network_policy(&network_policy_id);
2549                let name = &policy.name;
2550                if network_policy_id.is_system() {
2551                    return Err(AdapterError::Catalog(Error::new(
2552                        ErrorKind::ReadOnlyNetworkPolicy(name.clone()),
2553                    )));
2554                }
2555
2556                self.network_policies.insert(network_policy_id);
2557            }
2558        }
2559
2560        Ok(())
2561    }
2562}
2563
2564#[cfg(test)]
2565mod tests {
2566    use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
2567    use mz_repr::role_id::RoleId;
2568
2569    use crate::catalog::Catalog;
2570
2571    #[mz_ore::test]
2572    fn test_update_privilege_owners() {
2573        let old_owner = RoleId::User(1);
2574        let new_owner = RoleId::User(2);
2575        let other_role = RoleId::User(3);
2576
2577        // older owner exists as grantor.
2578        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2579            MzAclItem {
2580                grantee: other_role,
2581                grantor: old_owner,
2582                acl_mode: AclMode::UPDATE,
2583            },
2584            MzAclItem {
2585                grantee: other_role,
2586                grantor: new_owner,
2587                acl_mode: AclMode::SELECT,
2588            },
2589        ]);
2590        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2591        assert_eq!(1, privileges.all_values().count());
2592        assert_eq!(
2593            vec![MzAclItem {
2594                grantee: other_role,
2595                grantor: new_owner,
2596                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2597            }],
2598            privileges.all_values_owned().collect::<Vec<_>>()
2599        );
2600
2601        // older owner exists as grantee.
2602        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2603            MzAclItem {
2604                grantee: old_owner,
2605                grantor: other_role,
2606                acl_mode: AclMode::UPDATE,
2607            },
2608            MzAclItem {
2609                grantee: new_owner,
2610                grantor: other_role,
2611                acl_mode: AclMode::SELECT,
2612            },
2613        ]);
2614        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2615        assert_eq!(1, privileges.all_values().count());
2616        assert_eq!(
2617            vec![MzAclItem {
2618                grantee: new_owner,
2619                grantor: other_role,
2620                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2621            }],
2622            privileges.all_values_owned().collect::<Vec<_>>()
2623        );
2624
2625        // older owner exists as grantee and grantor.
2626        let mut privileges = PrivilegeMap::from_mz_acl_items(vec![
2627            MzAclItem {
2628                grantee: old_owner,
2629                grantor: old_owner,
2630                acl_mode: AclMode::UPDATE,
2631            },
2632            MzAclItem {
2633                grantee: new_owner,
2634                grantor: new_owner,
2635                acl_mode: AclMode::SELECT,
2636            },
2637        ]);
2638        Catalog::update_privilege_owners(&mut privileges, old_owner, new_owner);
2639        assert_eq!(1, privileges.all_values().count());
2640        assert_eq!(
2641            vec![MzAclItem {
2642                grantee: new_owner,
2643                grantor: new_owner,
2644                acl_mode: AclMode::SELECT.union(AclMode::UPDATE)
2645            }],
2646            privileges.all_values_owned().collect::<Vec<_>>()
2647        );
2648    }
2649}