Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43    RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48    SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58    SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62    StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106    WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114    WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119/// A future that resolves to a real-time recency timestamp.
120type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = return_if_err!(res, ctx);
255                    f(ctx, next);
256                }
257                _ = rx, if cancel_enabled => {
258                    ctx.retire(Err(AdapterError::Canceled));
259                }
260            }
261        });
262    }
263
264    async fn create_source_inner(
265        &self,
266        session: &Session,
267        plans: Vec<plan::CreateSourcePlanBundle>,
268    ) -> Result<CreateSourceInner, AdapterError> {
269        let mut ops = vec![];
270        let mut sources = vec![];
271
272        let if_not_exists_ids = plans
273            .iter()
274            .filter_map(
275                |plan::CreateSourcePlanBundle {
276                     item_id,
277                     global_id: _,
278                     plan,
279                     resolved_ids: _,
280                     available_source_references: _,
281                 }| {
282                    if plan.if_not_exists {
283                        Some((*item_id, plan.name.clone()))
284                    } else {
285                        None
286                    }
287                },
288            )
289            .collect::<BTreeMap<_, _>>();
290
291        for plan::CreateSourcePlanBundle {
292            item_id,
293            global_id,
294            mut plan,
295            resolved_ids,
296            available_source_references,
297        } in plans
298        {
299            let name = plan.name.clone();
300
301            match plan.source.data_source {
302                plan::DataSourceDesc::Ingestion(ref desc)
303                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304                    let cluster_id = plan
305                        .in_cluster
306                        .expect("ingestion plans must specify cluster");
307                    match desc.connection {
308                        GenericSourceConnection::Postgres(_)
309                        | GenericSourceConnection::MySql(_)
310                        | GenericSourceConnection::SqlServer(_)
311                        | GenericSourceConnection::Kafka(_)
312                        | GenericSourceConnection::LoadGenerator(_) => {
313                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315                                    .get(self.catalog().system_config().dyncfgs());
316
317                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318                                {
319                                    return Err(AdapterError::Unsupported(
320                                        "sources in clusters with >1 replicas",
321                                    ));
322                                }
323                            }
324                        }
325                    }
326                }
327                plan::DataSourceDesc::Webhook { .. } => {
328                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331                            .get(self.catalog().system_config().dyncfgs());
332
333                        if !enable_multi_replica_sources {
334                            if cluster.replica_ids().len() > 1 {
335                                return Err(AdapterError::Unsupported(
336                                    "webhook sources in clusters with >1 replicas",
337                                ));
338                            }
339                        }
340                    }
341                }
342                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343            }
344
345            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
346            if let mz_sql::plan::DataSourceDesc::Webhook {
347                validate_using: Some(validate),
348                ..
349            } = &mut plan.source.data_source
350            {
351                if let Err(reason) = validate.reduce_expression().await {
352                    self.metrics
353                        .webhook_validation_reduce_failures
354                        .with_label_values(&[reason])
355                        .inc();
356                    return Err(AdapterError::Internal(format!(
357                        "failed to reduce check expression, {reason}"
358                    )));
359                }
360            }
361
362            // If this source contained a set of available source references, update the
363            // source references catalog table.
364            let mut reference_ops = vec![];
365            if let Some(references) = &available_source_references {
366                reference_ops.push(catalog::Op::UpdateSourceReferences {
367                    source_id: item_id,
368                    references: references.clone().into(),
369                });
370            }
371
372            let source = Source::new(plan, global_id, resolved_ids, None, false);
373            ops.push(catalog::Op::CreateItem {
374                id: item_id,
375                name,
376                item: CatalogItem::Source(source.clone()),
377                owner_id: *session.current_role_id(),
378            });
379            sources.push((item_id, source));
380            // These operations must be executed after the source is added to the catalog.
381            ops.extend(reference_ops);
382        }
383
384        Ok(CreateSourceInner {
385            ops,
386            sources,
387            if_not_exists_ids,
388        })
389    }
390
391    /// Subsources are planned differently from other statements because they
392    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
393    /// Because of this, we have usually "missed" the opportunity to plan them
394    /// through the normal statement execution life cycle (the exception being
395    /// during bootstrapping).
396    ///
397    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
398    pub(crate) fn plan_subsource(
399        &self,
400        session: &Session,
401        params: &mz_sql::plan::Params,
402        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403        item_id: CatalogItemId,
404        global_id: GlobalId,
405    ) -> Result<CreateSourcePlanBundle, AdapterError> {
406        let catalog = self.catalog().for_session(session);
407        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409        let plan = self.plan_statement(
410            session,
411            Statement::CreateSubsource(subsource_stmt),
412            params,
413            &resolved_ids,
414        )?;
415        let plan = match plan {
416            Plan::CreateSource(plan) => plan,
417            _ => unreachable!(),
418        };
419        Ok(CreateSourcePlanBundle {
420            item_id,
421            global_id,
422            plan,
423            resolved_ids,
424            available_source_references: None,
425        })
426    }
427
428    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
429    pub(crate) async fn plan_purified_alter_source_add_subsource(
430        &mut self,
431        session: &Session,
432        params: Params,
433        source_name: ResolvedItemName,
434        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436    ) -> Result<(Plan, ResolvedIds), AdapterError> {
437        let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439        // Generate subsource statements
440        let conn_catalog = self.catalog().for_system_session();
441        let pcx = plan::PlanContext::zero();
442        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444        let entry = self.catalog().get_entry(source_name.item_id());
445        let source = entry.source().ok_or_else(|| {
446            AdapterError::internal(
447                "plan alter source",
448                format!("expected Source found {entry:?}"),
449            )
450        })?;
451
452        let item_id = entry.id();
453        let ingestion_id = source.global_id();
454        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456        let id_ts = self.get_catalog_write_ts().await;
457        let ids = self
458            .catalog()
459            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
460            .await?;
461        for (subsource_stmt, (item_id, global_id)) in
462            subsource_stmts.into_iter().zip_eq(ids.into_iter())
463        {
464            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
465            subsource_plans.push(s);
466        }
467
468        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
469            subsources: subsource_plans,
470            options,
471        };
472
473        Ok((
474            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
475                item_id,
476                ingestion_id,
477                action,
478            }),
479            ResolvedIds::empty(),
480        ))
481    }
482
483    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
484    pub(crate) fn plan_purified_alter_source_refresh_references(
485        &self,
486        _session: &Session,
487        _params: Params,
488        source_name: ResolvedItemName,
489        available_source_references: plan::SourceReferences,
490    ) -> Result<(Plan, ResolvedIds), AdapterError> {
491        let entry = self.catalog().get_entry(source_name.item_id());
492        let source = entry.source().ok_or_else(|| {
493            AdapterError::internal(
494                "plan alter source",
495                format!("expected Source found {entry:?}"),
496            )
497        })?;
498        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
499            references: available_source_references,
500        };
501
502        Ok((
503            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
504                item_id: entry.id(),
505                ingestion_id: source.global_id(),
506                action,
507            }),
508            ResolvedIds::empty(),
509        ))
510    }
511
512    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
513    /// the primary source, and any ingestion export subsources (e.g. PG
514    /// tables).
515    pub(crate) async fn plan_purified_create_source(
516        &mut self,
517        ctx: &ExecuteContext,
518        params: Params,
519        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
520        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
521        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
522        available_source_references: plan::SourceReferences,
523    ) -> Result<(Plan, ResolvedIds), AdapterError> {
524        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
525
526        // 1. First plan the progress subsource, if any.
527        if let Some(progress_stmt) = progress_stmt {
528            // The primary source depends on this subsource because the primary
529            // source needs to know its shard ID, and the easiest way of
530            // guaranteeing that the shard ID is discoverable is to create this
531            // collection first.
532            assert_none!(progress_stmt.of_source);
533            let id_ts = self.get_catalog_write_ts().await;
534            let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
535            let progress_plan =
536                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
537            let progress_full_name = self
538                .catalog()
539                .resolve_full_name(&progress_plan.plan.name, None);
540            let progress_subsource = ResolvedItemName::Item {
541                id: progress_plan.item_id,
542                qualifiers: progress_plan.plan.name.qualifiers.clone(),
543                full_name: progress_full_name,
544                print_id: true,
545                version: RelationVersionSelector::Latest,
546            };
547
548            create_source_plans.push(progress_plan);
549
550            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
551        }
552
553        let catalog = self.catalog().for_session(ctx.session());
554        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
555
556        let propagated_with_options: Vec<_> = source_stmt
557            .with_options
558            .iter()
559            .filter_map(|opt| match opt.name {
560                CreateSourceOptionName::TimestampInterval => None,
561                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
562                    name: CreateSubsourceOptionName::RetainHistory,
563                    value: opt.value.clone(),
564                }),
565            })
566            .collect();
567
568        // 2. Then plan the main source.
569        let source_plan = match self.plan_statement(
570            ctx.session(),
571            Statement::CreateSource(source_stmt),
572            &params,
573            &resolved_ids,
574        )? {
575            Plan::CreateSource(plan) => plan,
576            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
577        };
578
579        let id_ts = self.get_catalog_write_ts().await;
580        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
581
582        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
583        let of_source = ResolvedItemName::Item {
584            id: item_id,
585            qualifiers: source_plan.name.qualifiers.clone(),
586            full_name: source_full_name,
587            print_id: true,
588            version: RelationVersionSelector::Latest,
589        };
590
591        // Generate subsource statements
592        let conn_catalog = self.catalog().for_system_session();
593        let pcx = plan::PlanContext::zero();
594        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
595
596        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
597
598        for subsource_stmt in subsource_stmts.iter_mut() {
599            subsource_stmt
600                .with_options
601                .extend(propagated_with_options.iter().cloned())
602        }
603
604        create_source_plans.push(CreateSourcePlanBundle {
605            item_id,
606            global_id,
607            plan: source_plan,
608            resolved_ids: resolved_ids.clone(),
609            available_source_references: Some(available_source_references),
610        });
611
612        // 3. Finally, plan all the subsources
613        let id_ts = self.get_catalog_write_ts().await;
614        let ids = self
615            .catalog()
616            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
617            .await?;
618        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
619            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
620            create_source_plans.push(plan);
621        }
622
623        Ok((
624            Plan::CreateSources(create_source_plans),
625            ResolvedIds::empty(),
626        ))
627    }
628
629    #[instrument]
630    pub(super) async fn sequence_create_source(
631        &mut self,
632        ctx: &mut ExecuteContext,
633        plans: Vec<plan::CreateSourcePlanBundle>,
634    ) -> Result<ExecuteResponse, AdapterError> {
635        let CreateSourceInner {
636            ops,
637            sources,
638            if_not_exists_ids,
639        } = self.create_source_inner(ctx.session(), plans).await?;
640
641        let transact_result = self
642            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
643            .await;
644
645        // Check if any sources are webhook sources and report them as created.
646        for (item_id, source) in &sources {
647            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
648                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
649                    ctx.session()
650                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
651                }
652            }
653        }
654
655        match transact_result {
656            Ok(()) => Ok(ExecuteResponse::CreatedSource),
657            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
658                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
659            })) if if_not_exists_ids.contains_key(&id) => {
660                ctx.session()
661                    .add_notice(AdapterNotice::ObjectAlreadyExists {
662                        name: if_not_exists_ids[&id].item.clone(),
663                        ty: "source",
664                    });
665                Ok(ExecuteResponse::CreatedSource)
666            }
667            Err(err) => Err(err),
668        }
669    }
670
671    #[instrument]
672    pub(super) async fn sequence_create_connection(
673        &mut self,
674        mut ctx: ExecuteContext,
675        plan: plan::CreateConnectionPlan,
676        resolved_ids: ResolvedIds,
677    ) {
678        let id_ts = self.get_catalog_write_ts().await;
679        let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
680            Ok(item_id) => item_id,
681            Err(err) => return ctx.retire(Err(err.into())),
682        };
683
684        match &plan.connection.details {
685            ConnectionDetails::Ssh { key_1, key_2, .. } => {
686                let key_1 = match key_1.as_key_pair() {
687                    Some(key_1) => key_1.clone(),
688                    None => {
689                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
690                            "the PUBLIC KEY 1 option cannot be explicitly specified"
691                        ))));
692                    }
693                };
694
695                let key_2 = match key_2.as_key_pair() {
696                    Some(key_2) => key_2.clone(),
697                    None => {
698                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
699                            "the PUBLIC KEY 2 option cannot be explicitly specified"
700                        ))));
701                    }
702                };
703
704                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
705                let secret = key_set.to_bytes();
706                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
707                    return ctx.retire(Err(err.into()));
708                }
709            }
710            _ => (),
711        };
712
713        if plan.validate {
714            let internal_cmd_tx = self.internal_cmd_tx.clone();
715            let transient_revision = self.catalog().transient_revision();
716            let conn_id = ctx.session().conn_id().clone();
717            let otel_ctx = OpenTelemetryContext::obtain();
718            let role_metadata = ctx.session().role_metadata().clone();
719
720            let connection = plan
721                .connection
722                .details
723                .to_connection()
724                .into_inline_connection(self.catalog().state());
725
726            let current_storage_parameters = self.controller.storage.config().clone();
727            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
728                let result = match connection
729                    .validate(connection_id, &current_storage_parameters)
730                    .await
731                {
732                    Ok(()) => Ok(plan),
733                    Err(err) => Err(err.into()),
734                };
735
736                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
737                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
738                    CreateConnectionValidationReady {
739                        ctx,
740                        result,
741                        connection_id,
742                        connection_gid,
743                        plan_validity: PlanValidity::new(
744                            transient_revision,
745                            resolved_ids.items().copied().collect(),
746                            None,
747                            None,
748                            role_metadata,
749                        ),
750                        otel_ctx,
751                        resolved_ids: resolved_ids.clone(),
752                    },
753                ));
754                if let Err(e) = result {
755                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
756                }
757            });
758        } else {
759            let result = self
760                .sequence_create_connection_stage_finish(
761                    &mut ctx,
762                    connection_id,
763                    connection_gid,
764                    plan,
765                    resolved_ids,
766                )
767                .await;
768            ctx.retire(result);
769        }
770    }
771
772    #[instrument]
773    pub(crate) async fn sequence_create_connection_stage_finish(
774        &mut self,
775        ctx: &mut ExecuteContext,
776        connection_id: CatalogItemId,
777        connection_gid: GlobalId,
778        plan: plan::CreateConnectionPlan,
779        resolved_ids: ResolvedIds,
780    ) -> Result<ExecuteResponse, AdapterError> {
781        let ops = vec![catalog::Op::CreateItem {
782            id: connection_id,
783            name: plan.name.clone(),
784            item: CatalogItem::Connection(Connection {
785                create_sql: plan.connection.create_sql,
786                global_id: connection_gid,
787                details: plan.connection.details.clone(),
788                resolved_ids,
789            }),
790            owner_id: *ctx.session().current_role_id(),
791        }];
792
793        // VPC endpoint creation for AWS PrivateLink connections is now handled
794        // in apply_catalog_implications.
795        let conn_id = ctx.session().conn_id().clone();
796        let transact_result = self
797            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
798            .await;
799
800        match transact_result {
801            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
802            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
803                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
804            })) if plan.if_not_exists => {
805                ctx.session()
806                    .add_notice(AdapterNotice::ObjectAlreadyExists {
807                        name: plan.name.item,
808                        ty: "connection",
809                    });
810                Ok(ExecuteResponse::CreatedConnection)
811            }
812            Err(err) => Err(err),
813        }
814    }
815
816    #[instrument]
817    pub(super) async fn sequence_create_database(
818        &mut self,
819        session: &Session,
820        plan: plan::CreateDatabasePlan,
821    ) -> Result<ExecuteResponse, AdapterError> {
822        let ops = vec![catalog::Op::CreateDatabase {
823            name: plan.name.clone(),
824            owner_id: *session.current_role_id(),
825        }];
826        match self.catalog_transact(Some(session), ops).await {
827            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
828            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
829                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
830            })) if plan.if_not_exists => {
831                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
832                Ok(ExecuteResponse::CreatedDatabase)
833            }
834            Err(err) => Err(err),
835        }
836    }
837
838    #[instrument]
839    pub(super) async fn sequence_create_schema(
840        &mut self,
841        session: &Session,
842        plan: plan::CreateSchemaPlan,
843    ) -> Result<ExecuteResponse, AdapterError> {
844        let op = catalog::Op::CreateSchema {
845            database_id: plan.database_spec,
846            schema_name: plan.schema_name.clone(),
847            owner_id: *session.current_role_id(),
848        };
849        match self.catalog_transact(Some(session), vec![op]).await {
850            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
851            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
852                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853            })) if plan.if_not_exists => {
854                session.add_notice(AdapterNotice::SchemaAlreadyExists {
855                    name: plan.schema_name,
856                });
857                Ok(ExecuteResponse::CreatedSchema)
858            }
859            Err(err) => Err(err),
860        }
861    }
862
863    /// Validates the role attributes for a `CREATE ROLE` statement.
864    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866            if attributes.superuser.is_some()
867                || attributes.password.is_some()
868                || attributes.login.is_some()
869            {
870                return Err(AdapterError::UnavailableFeature {
871                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873                });
874            }
875        }
876        Ok(())
877    }
878
879    #[instrument]
880    pub(super) async fn sequence_create_role(
881        &mut self,
882        conn_id: Option<&ConnectionId>,
883        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884    ) -> Result<ExecuteResponse, AdapterError> {
885        self.validate_role_attributes(&attributes.clone())?;
886        let op = catalog::Op::CreateRole { name, attributes };
887        self.catalog_transact_with_context(conn_id, None, vec![op])
888            .await
889            .map(|_| ExecuteResponse::CreatedRole)
890    }
891
892    #[instrument]
893    pub(super) async fn sequence_create_network_policy(
894        &mut self,
895        session: &Session,
896        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897    ) -> Result<ExecuteResponse, AdapterError> {
898        let op = catalog::Op::CreateNetworkPolicy {
899            rules,
900            name,
901            owner_id: *session.current_role_id(),
902        };
903        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904            .await
905            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906    }
907
908    #[instrument]
909    pub(super) async fn sequence_alter_network_policy(
910        &mut self,
911        session: &Session,
912        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913    ) -> Result<ExecuteResponse, AdapterError> {
914        // TODO(network_policy): Consider role based network policies here.
915        let current_network_policy_name =
916            self.catalog().system_config().default_network_policy_name();
917        // Check if the way we're alerting the policy is still valid for the current connection.
918        if current_network_policy_name == name {
919            self.validate_alter_network_policy(session, &rules)?;
920        }
921
922        let op = catalog::Op::AlterNetworkPolicy {
923            id,
924            rules,
925            name,
926            owner_id: *session.current_role_id(),
927        };
928        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929            .await
930            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931    }
932
933    #[instrument]
934    pub(super) async fn sequence_create_table(
935        &mut self,
936        ctx: &mut ExecuteContext,
937        plan: plan::CreateTablePlan,
938        resolved_ids: ResolvedIds,
939    ) -> Result<ExecuteResponse, AdapterError> {
940        let plan::CreateTablePlan {
941            name,
942            table,
943            if_not_exists,
944        } = plan;
945
946        let conn_id = if table.temporary {
947            Some(ctx.session().conn_id())
948        } else {
949            None
950        };
951        let id_ts = self.get_catalog_write_ts().await;
952        let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955        let data_source = match table.data_source {
956            plan::TableDataSource::TableWrites { defaults } => {
957                TableDataSource::TableWrites { defaults }
958            }
959            plan::TableDataSource::DataSource {
960                desc: data_source_plan,
961                timeline,
962            } => match data_source_plan {
963                plan::DataSourceDesc::IngestionExport {
964                    ingestion_id,
965                    external_reference,
966                    details,
967                    data_config,
968                } => TableDataSource::DataSource {
969                    desc: DataSourceDesc::IngestionExport {
970                        ingestion_id,
971                        external_reference,
972                        details,
973                        data_config,
974                    },
975                    timeline,
976                },
977                plan::DataSourceDesc::Webhook {
978                    validate_using,
979                    body_format,
980                    headers,
981                    cluster_id,
982                } => TableDataSource::DataSource {
983                    desc: DataSourceDesc::Webhook {
984                        validate_using,
985                        body_format,
986                        headers,
987                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988                    },
989                    timeline,
990                },
991                o => {
992                    unreachable!("CREATE TABLE data source got {:?}", o)
993                }
994            },
995        };
996
997        let is_webhook = if let TableDataSource::DataSource {
998            desc: DataSourceDesc::Webhook { .. },
999            timeline: _,
1000        } = &data_source
1001        {
1002            true
1003        } else {
1004            false
1005        };
1006
1007        let table = Table {
1008            create_sql: Some(table.create_sql),
1009            desc: table.desc,
1010            collections,
1011            conn_id: conn_id.cloned(),
1012            resolved_ids,
1013            custom_logical_compaction_window: table.compaction_window,
1014            is_retained_metrics_object: false,
1015            data_source,
1016        };
1017        let ops = vec![catalog::Op::CreateItem {
1018            id: table_id,
1019            name: name.clone(),
1020            item: CatalogItem::Table(table.clone()),
1021            owner_id: *ctx.session().current_role_id(),
1022        }];
1023
1024        let catalog_result = self
1025            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026            .await;
1027
1028        if is_webhook {
1029            // try_get_webhook_url will make up a URL for things that are not
1030            // webhooks, so we guard against that here.
1031            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032                ctx.session()
1033                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034            }
1035        }
1036
1037        match catalog_result {
1038            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1041            })) if if_not_exists => {
1042                ctx.session_mut()
1043                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1044                        name: name.item,
1045                        ty: "table",
1046                    });
1047                Ok(ExecuteResponse::CreatedTable)
1048            }
1049            Err(err) => Err(err),
1050        }
1051    }
1052
1053    #[instrument]
1054    pub(super) async fn sequence_create_sink(
1055        &mut self,
1056        ctx: ExecuteContext,
1057        plan: plan::CreateSinkPlan,
1058        resolved_ids: ResolvedIds,
1059    ) {
1060        let plan::CreateSinkPlan {
1061            name,
1062            sink,
1063            with_snapshot,
1064            if_not_exists,
1065            in_cluster,
1066        } = plan;
1067
1068        // First try to allocate an ID and an OID. If either fails, we're done.
1069        let id_ts = self.get_catalog_write_ts().await;
1070        let (item_id, global_id) =
1071            return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1072
1073        let catalog_sink = Sink {
1074            create_sql: sink.create_sql,
1075            global_id,
1076            from: sink.from,
1077            connection: sink.connection,
1078            envelope: sink.envelope,
1079            version: sink.version,
1080            with_snapshot,
1081            resolved_ids,
1082            cluster_id: in_cluster,
1083            commit_interval: sink.commit_interval,
1084        };
1085
1086        let ops = vec![catalog::Op::CreateItem {
1087            id: item_id,
1088            name: name.clone(),
1089            item: CatalogItem::Sink(catalog_sink.clone()),
1090            owner_id: *ctx.session().current_role_id(),
1091        }];
1092
1093        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1094
1095        match result {
1096            Ok(()) => {}
1097            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1098                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1099            })) if if_not_exists => {
1100                ctx.session()
1101                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1102                        name: name.item,
1103                        ty: "sink",
1104                    });
1105                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1106                return;
1107            }
1108            Err(e) => {
1109                ctx.retire(Err(e));
1110                return;
1111            }
1112        };
1113
1114        self.create_storage_export(global_id, &catalog_sink)
1115            .await
1116            .unwrap_or_terminate("cannot fail to create exports");
1117
1118        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1119            .await;
1120
1121        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1122    }
1123
1124    /// Validates that a view definition does not contain any expressions that may lead to
1125    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1126    ///
1127    /// We prevent these expressions so that we can add columns to system tables without
1128    /// changing the definition of the view.
1129    ///
1130    /// Here is a bit of a hand wavy proof as to why we only need to check the
1131    /// immediate view definition for system objects and ambiguous column
1132    /// references, and not the entire dependency tree:
1133    ///
1134    ///   - A view with no object references cannot have any ambiguous column
1135    ///   references to a system object, because it has no system objects.
1136    ///   - A view with a direct reference to a system object and a * or
1137    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1138    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1139    ///   any ambiguous column references to a system object, because all column
1140    ///   references are explicitly named.
1141    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1142    ///   system object cannot have any ambiguous column references to a system
1143    ///   object, because there are no system objects in the top level view and
1144    ///   all sub-views are guaranteed to have no ambiguous column references to
1145    ///   system objects.
1146    pub(super) fn validate_system_column_references(
1147        &self,
1148        uses_ambiguous_columns: bool,
1149        depends_on: &BTreeSet<GlobalId>,
1150    ) -> Result<(), AdapterError> {
1151        if uses_ambiguous_columns
1152            && depends_on
1153                .iter()
1154                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1155        {
1156            Err(AdapterError::AmbiguousSystemColumnReference)
1157        } else {
1158            Ok(())
1159        }
1160    }
1161
1162    #[instrument]
1163    pub(super) async fn sequence_create_type(
1164        &mut self,
1165        session: &Session,
1166        plan: plan::CreateTypePlan,
1167        resolved_ids: ResolvedIds,
1168    ) -> Result<ExecuteResponse, AdapterError> {
1169        let id_ts = self.get_catalog_write_ts().await;
1170        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1171        // Validate the type definition (e.g., composite columns) before storing.
1172        plan.typ
1173            .inner
1174            .desc(&self.catalog().for_session(session))
1175            .map_err(AdapterError::from)?;
1176        let typ = Type {
1177            create_sql: Some(plan.typ.create_sql),
1178            global_id,
1179            details: CatalogTypeDetails {
1180                array_id: None,
1181                typ: plan.typ.inner,
1182                pg_metadata: None,
1183            },
1184            resolved_ids,
1185        };
1186        let op = catalog::Op::CreateItem {
1187            id: item_id,
1188            name: plan.name,
1189            item: CatalogItem::Type(typ),
1190            owner_id: *session.current_role_id(),
1191        };
1192        match self.catalog_transact(Some(session), vec![op]).await {
1193            Ok(()) => Ok(ExecuteResponse::CreatedType),
1194            Err(err) => Err(err),
1195        }
1196    }
1197
1198    #[instrument]
1199    pub(super) async fn sequence_comment_on(
1200        &mut self,
1201        session: &Session,
1202        plan: plan::CommentPlan,
1203    ) -> Result<ExecuteResponse, AdapterError> {
1204        let op = catalog::Op::Comment {
1205            object_id: plan.object_id,
1206            sub_component: plan.sub_component,
1207            comment: plan.comment,
1208        };
1209        self.catalog_transact(Some(session), vec![op]).await?;
1210        Ok(ExecuteResponse::Comment)
1211    }
1212
1213    #[instrument]
1214    pub(super) async fn sequence_drop_objects(
1215        &mut self,
1216        ctx: &mut ExecuteContext,
1217        plan::DropObjectsPlan {
1218            drop_ids,
1219            object_type,
1220            referenced_ids,
1221        }: plan::DropObjectsPlan,
1222    ) -> Result<ExecuteResponse, AdapterError> {
1223        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1224        let mut objects = Vec::new();
1225        for obj_id in &drop_ids {
1226            if !referenced_ids_hashset.contains(obj_id) {
1227                let object_info = ErrorMessageObjectDescription::from_id(
1228                    obj_id,
1229                    &self.catalog().for_session(ctx.session()),
1230                )
1231                .to_string();
1232                objects.push(object_info);
1233            }
1234        }
1235
1236        if !objects.is_empty() {
1237            ctx.session()
1238                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1239        }
1240
1241        let DropOps {
1242            ops,
1243            dropped_active_db,
1244            dropped_active_cluster,
1245            dropped_in_use_indexes,
1246        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1247
1248        self.catalog_transact_with_context(None, Some(ctx), ops)
1249            .await?;
1250
1251        fail::fail_point!("after_sequencer_drop_replica");
1252
1253        if dropped_active_db {
1254            ctx.session()
1255                .add_notice(AdapterNotice::DroppedActiveDatabase {
1256                    name: ctx.session().vars().database().to_string(),
1257                });
1258        }
1259        if dropped_active_cluster {
1260            ctx.session()
1261                .add_notice(AdapterNotice::DroppedActiveCluster {
1262                    name: ctx.session().vars().cluster().to_string(),
1263                });
1264        }
1265        for dropped_in_use_index in dropped_in_use_indexes {
1266            ctx.session()
1267                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1268            self.metrics
1269                .optimization_notices
1270                .with_label_values(&["DroppedInUseIndex"])
1271                .inc_by(1);
1272        }
1273        Ok(ExecuteResponse::DroppedObject(object_type))
1274    }
1275
1276    fn validate_dropped_role_ownership(
1277        &self,
1278        session: &Session,
1279        dropped_roles: &BTreeMap<RoleId, &str>,
1280    ) -> Result<(), AdapterError> {
1281        fn privilege_check(
1282            privileges: &PrivilegeMap,
1283            dropped_roles: &BTreeMap<RoleId, &str>,
1284            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1285            object_id: &SystemObjectId,
1286            catalog: &ConnCatalog,
1287        ) {
1288            for privilege in privileges.all_values() {
1289                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1290                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1291                    let object_description =
1292                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1293                    dependent_objects
1294                        .entry(role_name.to_string())
1295                        .or_default()
1296                        .push(format!(
1297                            "privileges on {object_description} granted by {grantor_name}",
1298                        ));
1299                }
1300                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1301                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1302                    let object_description =
1303                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1304                    dependent_objects
1305                        .entry(role_name.to_string())
1306                        .or_default()
1307                        .push(format!(
1308                            "privileges granted on {object_description} to {grantee_name}"
1309                        ));
1310                }
1311            }
1312        }
1313
1314        let catalog = self.catalog().for_session(session);
1315        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1316        for entry in self.catalog.entries() {
1317            let id = SystemObjectId::Object(entry.id().into());
1318            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1319                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1320                dependent_objects
1321                    .entry(role_name.to_string())
1322                    .or_default()
1323                    .push(format!("owner of {object_description}"));
1324            }
1325            privilege_check(
1326                entry.privileges(),
1327                dropped_roles,
1328                &mut dependent_objects,
1329                &id,
1330                &catalog,
1331            );
1332        }
1333        for database in self.catalog.databases() {
1334            let database_id = SystemObjectId::Object(database.id().into());
1335            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1336                let object_description =
1337                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1338                dependent_objects
1339                    .entry(role_name.to_string())
1340                    .or_default()
1341                    .push(format!("owner of {object_description}"));
1342            }
1343            privilege_check(
1344                &database.privileges,
1345                dropped_roles,
1346                &mut dependent_objects,
1347                &database_id,
1348                &catalog,
1349            );
1350            for schema in database.schemas_by_id.values() {
1351                let schema_id = SystemObjectId::Object(
1352                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1353                );
1354                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1355                    let object_description =
1356                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1357                    dependent_objects
1358                        .entry(role_name.to_string())
1359                        .or_default()
1360                        .push(format!("owner of {object_description}"));
1361                }
1362                privilege_check(
1363                    &schema.privileges,
1364                    dropped_roles,
1365                    &mut dependent_objects,
1366                    &schema_id,
1367                    &catalog,
1368                );
1369            }
1370        }
1371        for cluster in self.catalog.clusters() {
1372            let cluster_id = SystemObjectId::Object(cluster.id().into());
1373            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1374                let object_description =
1375                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1376                dependent_objects
1377                    .entry(role_name.to_string())
1378                    .or_default()
1379                    .push(format!("owner of {object_description}"));
1380            }
1381            privilege_check(
1382                &cluster.privileges,
1383                dropped_roles,
1384                &mut dependent_objects,
1385                &cluster_id,
1386                &catalog,
1387            );
1388            for replica in cluster.replicas() {
1389                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1390                    let replica_id =
1391                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1392                    let object_description =
1393                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1394                    dependent_objects
1395                        .entry(role_name.to_string())
1396                        .or_default()
1397                        .push(format!("owner of {object_description}"));
1398                }
1399            }
1400        }
1401        privilege_check(
1402            self.catalog().system_privileges(),
1403            dropped_roles,
1404            &mut dependent_objects,
1405            &SystemObjectId::System,
1406            &catalog,
1407        );
1408        for (default_privilege_object, default_privilege_acl_items) in
1409            self.catalog.default_privileges()
1410        {
1411            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1412                dependent_objects
1413                    .entry(role_name.to_string())
1414                    .or_default()
1415                    .push(format!(
1416                        "default privileges on {}S created by {}",
1417                        default_privilege_object.object_type, role_name
1418                    ));
1419            }
1420            for default_privilege_acl_item in default_privilege_acl_items {
1421                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1422                    dependent_objects
1423                        .entry(role_name.to_string())
1424                        .or_default()
1425                        .push(format!(
1426                            "default privileges on {}S granted to {}",
1427                            default_privilege_object.object_type, role_name
1428                        ));
1429                }
1430            }
1431        }
1432
1433        if !dependent_objects.is_empty() {
1434            Err(AdapterError::DependentObject(dependent_objects))
1435        } else {
1436            Ok(())
1437        }
1438    }
1439
1440    #[instrument]
1441    pub(super) async fn sequence_drop_owned(
1442        &mut self,
1443        session: &Session,
1444        plan: plan::DropOwnedPlan,
1445    ) -> Result<ExecuteResponse, AdapterError> {
1446        for role_id in &plan.role_ids {
1447            self.catalog().ensure_not_reserved_role(role_id)?;
1448        }
1449
1450        let mut privilege_revokes = plan.privilege_revokes;
1451
1452        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1453        let session_catalog = self.catalog().for_session(session);
1454        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1455            && !session.is_superuser()
1456        {
1457            // Obtain all roles that the current session is a member of.
1458            let role_membership =
1459                session_catalog.collect_role_membership(session.current_role_id());
1460            let invalid_revokes: BTreeSet<_> = privilege_revokes
1461                .extract_if(.., |(_, privilege)| {
1462                    !role_membership.contains(&privilege.grantor)
1463                })
1464                .map(|(object_id, _)| object_id)
1465                .collect();
1466            for invalid_revoke in invalid_revokes {
1467                let object_description =
1468                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1469                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1470            }
1471        }
1472
1473        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1474            catalog::Op::UpdatePrivilege {
1475                target_id: object_id,
1476                privilege,
1477                variant: UpdatePrivilegeVariant::Revoke,
1478            }
1479        });
1480        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1481            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1482                privilege_object,
1483                privilege_acl_item,
1484                variant: UpdatePrivilegeVariant::Revoke,
1485            },
1486        );
1487        let DropOps {
1488            ops: drop_ops,
1489            dropped_active_db,
1490            dropped_active_cluster,
1491            dropped_in_use_indexes,
1492        } = self.sequence_drop_common(session, plan.drop_ids)?;
1493
1494        let ops = privilege_revoke_ops
1495            .chain(default_privilege_revoke_ops)
1496            .chain(drop_ops.into_iter())
1497            .collect();
1498
1499        self.catalog_transact(Some(session), ops).await?;
1500
1501        if dropped_active_db {
1502            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1503                name: session.vars().database().to_string(),
1504            });
1505        }
1506        if dropped_active_cluster {
1507            session.add_notice(AdapterNotice::DroppedActiveCluster {
1508                name: session.vars().cluster().to_string(),
1509            });
1510        }
1511        for dropped_in_use_index in dropped_in_use_indexes {
1512            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1513        }
1514        Ok(ExecuteResponse::DroppedOwned)
1515    }
1516
1517    fn sequence_drop_common(
1518        &self,
1519        session: &Session,
1520        ids: Vec<ObjectId>,
1521    ) -> Result<DropOps, AdapterError> {
1522        let mut dropped_active_db = false;
1523        let mut dropped_active_cluster = false;
1524        let mut dropped_in_use_indexes = Vec::new();
1525        let mut dropped_roles = BTreeMap::new();
1526        let mut dropped_databases = BTreeSet::new();
1527        let mut dropped_schemas = BTreeSet::new();
1528        // Dropping either the group role or the member role of a role membership will trigger a
1529        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1530        // role membership twice.
1531        let mut role_revokes = BTreeSet::new();
1532        // Dropping a database or a schema will revoke all default roles associated with that
1533        // database or schema.
1534        let mut default_privilege_revokes = BTreeSet::new();
1535
1536        // Clusters we're dropping
1537        let mut clusters_to_drop = BTreeSet::new();
1538
1539        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1540        for id in &ids {
1541            match id {
1542                ObjectId::Database(id) => {
1543                    let name = self.catalog().get_database(id).name();
1544                    if name == session.vars().database() {
1545                        dropped_active_db = true;
1546                    }
1547                    dropped_databases.insert(id);
1548                }
1549                ObjectId::Schema((_, spec)) => {
1550                    if let SchemaSpecifier::Id(id) = spec {
1551                        dropped_schemas.insert(id);
1552                    }
1553                }
1554                ObjectId::Cluster(id) => {
1555                    clusters_to_drop.insert(*id);
1556                    if let Some(active_id) = self
1557                        .catalog()
1558                        .active_cluster(session)
1559                        .ok()
1560                        .map(|cluster| cluster.id())
1561                    {
1562                        if id == &active_id {
1563                            dropped_active_cluster = true;
1564                        }
1565                    }
1566                }
1567                ObjectId::Role(id) => {
1568                    let role = self.catalog().get_role(id);
1569                    let name = role.name();
1570                    dropped_roles.insert(*id, name);
1571                    // We must revoke all role memberships that the dropped roles belongs to.
1572                    for (group_id, grantor_id) in &role.membership.map {
1573                        role_revokes.insert((*group_id, *id, *grantor_id));
1574                    }
1575                }
1576                ObjectId::Item(id) => {
1577                    if let Some(index) = self.catalog().get_entry(id).index() {
1578                        let humanizer = self.catalog().for_session(session);
1579                        let dependants = self
1580                            .controller
1581                            .compute
1582                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1583                            .ok()
1584                            .into_iter()
1585                            .flatten()
1586                            .filter(|dependant_id| {
1587                                // Transient Ids belong to Peeks. We are not interested for now in
1588                                // peeks depending on a dropped index.
1589                                // TODO: show a different notice in this case. Something like
1590                                // "There is an in-progress ad hoc SELECT that uses the dropped
1591                                // index. The resources used by the index will be freed when all
1592                                // such SELECTs complete."
1593                                if dependant_id.is_transient() {
1594                                    return false;
1595                                }
1596                                // The item should exist, but don't panic if it doesn't.
1597                                let Some(dependent_id) = humanizer
1598                                    .try_get_item_by_global_id(dependant_id)
1599                                    .map(|item| item.id())
1600                                else {
1601                                    return false;
1602                                };
1603                                // If the dependent object is also being dropped, then there is no
1604                                // problem, so we don't want a notice.
1605                                !ids_set.contains(&ObjectId::Item(dependent_id))
1606                            })
1607                            .flat_map(|dependant_id| {
1608                                // If we are not able to find a name for this ID it probably means
1609                                // we have already dropped the compute collection, in which case we
1610                                // can ignore it.
1611                                humanizer.humanize_id(dependant_id)
1612                            })
1613                            .collect_vec();
1614                        if !dependants.is_empty() {
1615                            dropped_in_use_indexes.push(DroppedInUseIndex {
1616                                index_name: humanizer
1617                                    .humanize_id(index.global_id())
1618                                    .unwrap_or_else(|| id.to_string()),
1619                                dependant_objects: dependants,
1620                            });
1621                        }
1622                    }
1623                }
1624                _ => {}
1625            }
1626        }
1627
1628        for id in &ids {
1629            match id {
1630                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1631                // unless they are internal replicas, which exist outside the scope
1632                // of managed clusters.
1633                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1634                    if !clusters_to_drop.contains(cluster_id) {
1635                        let cluster = self.catalog.get_cluster(*cluster_id);
1636                        if cluster.is_managed() {
1637                            let replica =
1638                                cluster.replica(*replica_id).expect("Catalog out of sync");
1639                            if !replica.config.location.internal() {
1640                                coord_bail!("cannot drop replica of managed cluster");
1641                            }
1642                        }
1643                    }
1644                }
1645                _ => {}
1646            }
1647        }
1648
1649        for role_id in dropped_roles.keys() {
1650            self.catalog().ensure_not_reserved_role(role_id)?;
1651        }
1652        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1653        // If any role is a member of a dropped role, then we must revoke that membership.
1654        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1655        for role in self.catalog().user_roles() {
1656            for dropped_role_id in
1657                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1658            {
1659                role_revokes.insert((
1660                    **dropped_role_id,
1661                    role.id(),
1662                    *role
1663                        .membership
1664                        .map
1665                        .get(*dropped_role_id)
1666                        .expect("included in keys above"),
1667                ));
1668            }
1669        }
1670
1671        for (default_privilege_object, default_privilege_acls) in
1672            self.catalog().default_privileges()
1673        {
1674            if matches!(
1675                &default_privilege_object.database_id,
1676                Some(database_id) if dropped_databases.contains(database_id),
1677            ) || matches!(
1678                &default_privilege_object.schema_id,
1679                Some(schema_id) if dropped_schemas.contains(schema_id),
1680            ) {
1681                for default_privilege_acl in default_privilege_acls {
1682                    default_privilege_revokes.insert((
1683                        default_privilege_object.clone(),
1684                        default_privilege_acl.clone(),
1685                    ));
1686                }
1687            }
1688        }
1689
1690        let ops = role_revokes
1691            .into_iter()
1692            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1693                role_id,
1694                member_id,
1695                grantor_id,
1696            })
1697            .chain(default_privilege_revokes.into_iter().map(
1698                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1699                    privilege_object,
1700                    privilege_acl_item,
1701                    variant: UpdatePrivilegeVariant::Revoke,
1702                },
1703            ))
1704            .chain(iter::once(catalog::Op::DropObjects(
1705                ids.into_iter()
1706                    .map(DropObjectInfo::manual_drop_from_object_id)
1707                    .collect(),
1708            )))
1709            .collect();
1710
1711        Ok(DropOps {
1712            ops,
1713            dropped_active_db,
1714            dropped_active_cluster,
1715            dropped_in_use_indexes,
1716        })
1717    }
1718
1719    pub(super) fn sequence_explain_schema(
1720        &self,
1721        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1722    ) -> Result<ExecuteResponse, AdapterError> {
1723        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1724            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1725        })?;
1726
1727        let json_string = json_string(&json_value);
1728        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1729        Ok(Self::send_immediate_rows(row))
1730    }
1731
1732    pub(super) fn sequence_show_all_variables(
1733        &self,
1734        session: &Session,
1735    ) -> Result<ExecuteResponse, AdapterError> {
1736        let mut rows = viewable_variables(self.catalog().state(), session)
1737            .map(|v| (v.name(), v.value(), v.description()))
1738            .collect::<Vec<_>>();
1739        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1740
1741        // TODO(parkmycar): Pack all of these into a single RowCollection.
1742        let rows: Vec<_> = rows
1743            .into_iter()
1744            .map(|(name, val, desc)| {
1745                Row::pack_slice(&[
1746                    Datum::String(name),
1747                    Datum::String(&val),
1748                    Datum::String(desc),
1749                ])
1750            })
1751            .collect();
1752        Ok(Self::send_immediate_rows(rows))
1753    }
1754
1755    pub(super) fn sequence_show_variable(
1756        &self,
1757        session: &Session,
1758        plan: plan::ShowVariablePlan,
1759    ) -> Result<ExecuteResponse, AdapterError> {
1760        if &plan.name == SCHEMA_ALIAS {
1761            let schemas = self.catalog.resolve_search_path(session);
1762            let schema = schemas.first();
1763            return match schema {
1764                Some((database_spec, schema_spec)) => {
1765                    let schema_name = &self
1766                        .catalog
1767                        .get_schema(database_spec, schema_spec, session.conn_id())
1768                        .name()
1769                        .schema;
1770                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1771                    Ok(Self::send_immediate_rows(row))
1772                }
1773                None => {
1774                    if session.vars().current_object_missing_warnings() {
1775                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1776                            search_path: session
1777                                .vars()
1778                                .search_path()
1779                                .into_iter()
1780                                .map(|schema| schema.to_string())
1781                                .collect(),
1782                        });
1783                    }
1784                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1785                }
1786            };
1787        }
1788
1789        let variable = session
1790            .vars()
1791            .get(self.catalog().system_config(), &plan.name)
1792            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1793
1794        // In lieu of plumbing the user to all system config functions, just check that the var is
1795        // visible.
1796        variable.visible(session.user(), self.catalog().system_config())?;
1797
1798        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1799        if variable.name() == vars::DATABASE.name()
1800            && matches!(
1801                self.catalog().resolve_database(&variable.value()),
1802                Err(CatalogError::UnknownDatabase(_))
1803            )
1804            && session.vars().current_object_missing_warnings()
1805        {
1806            let name = variable.value();
1807            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1808        } else if variable.name() == vars::CLUSTER.name()
1809            && matches!(
1810                self.catalog().resolve_cluster(&variable.value()),
1811                Err(CatalogError::UnknownCluster(_))
1812            )
1813            && session.vars().current_object_missing_warnings()
1814        {
1815            let name = variable.value();
1816            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1817        }
1818        Ok(Self::send_immediate_rows(row))
1819    }
1820
1821    #[instrument]
1822    pub(super) async fn sequence_inspect_shard(
1823        &self,
1824        session: &Session,
1825        plan: plan::InspectShardPlan,
1826    ) -> Result<ExecuteResponse, AdapterError> {
1827        // TODO: Not thrilled about this rbac special case here, but probably
1828        // sufficient for now.
1829        if !session.user().is_internal() {
1830            return Err(AdapterError::Unauthorized(
1831                rbac::UnauthorizedError::MzSystem {
1832                    action: "inspect".into(),
1833                },
1834            ));
1835        }
1836        let state = self
1837            .controller
1838            .storage
1839            .inspect_persist_state(plan.id)
1840            .await?;
1841        let jsonb = Jsonb::from_serde_json(state)?;
1842        Ok(Self::send_immediate_rows(jsonb.into_row()))
1843    }
1844
1845    #[instrument]
1846    pub(super) fn sequence_set_variable(
1847        &self,
1848        session: &mut Session,
1849        plan: plan::SetVariablePlan,
1850    ) -> Result<ExecuteResponse, AdapterError> {
1851        let (name, local) = (plan.name, plan.local);
1852        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1853            self.validate_set_isolation_level(session)?;
1854        }
1855        if &name == vars::CLUSTER.name() {
1856            self.validate_set_cluster(session)?;
1857        }
1858
1859        let vars = session.vars_mut();
1860        let values = match plan.value {
1861            plan::VariableValue::Default => None,
1862            plan::VariableValue::Values(values) => Some(values),
1863        };
1864
1865        match values {
1866            Some(values) => {
1867                vars.set(
1868                    self.catalog().system_config(),
1869                    &name,
1870                    VarInput::SqlSet(&values),
1871                    local,
1872                )?;
1873
1874                let vars = session.vars();
1875
1876                // Emit a warning when deprecated variables are used.
1877                // TODO(database-issues#8069) remove this after sufficient time has passed
1878                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1879                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1880                } else if name == vars::CLUSTER.name()
1881                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1882                {
1883                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1884                }
1885
1886                // Database or cluster value does not correspond to a catalog item.
1887                if name.as_str() == vars::DATABASE.name()
1888                    && matches!(
1889                        self.catalog().resolve_database(vars.database()),
1890                        Err(CatalogError::UnknownDatabase(_))
1891                    )
1892                    && session.vars().current_object_missing_warnings()
1893                {
1894                    let name = vars.database().to_string();
1895                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1896                } else if name.as_str() == vars::CLUSTER.name()
1897                    && matches!(
1898                        self.catalog().resolve_cluster(vars.cluster()),
1899                        Err(CatalogError::UnknownCluster(_))
1900                    )
1901                    && session.vars().current_object_missing_warnings()
1902                {
1903                    let name = vars.cluster().to_string();
1904                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1905                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1906                    let v = values.into_first().to_lowercase();
1907                    if v == IsolationLevel::ReadUncommitted.as_str()
1908                        || v == IsolationLevel::ReadCommitted.as_str()
1909                        || v == IsolationLevel::RepeatableRead.as_str()
1910                    {
1911                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1912                            isolation_level: v,
1913                        });
1914                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1915                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1916                    }
1917                }
1918            }
1919            None => vars.reset(self.catalog().system_config(), &name, local)?,
1920        }
1921
1922        Ok(ExecuteResponse::SetVariable { name, reset: false })
1923    }
1924
1925    pub(super) fn sequence_reset_variable(
1926        &self,
1927        session: &mut Session,
1928        plan: plan::ResetVariablePlan,
1929    ) -> Result<ExecuteResponse, AdapterError> {
1930        let name = plan.name;
1931        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1932            self.validate_set_isolation_level(session)?;
1933        }
1934        if &name == vars::CLUSTER.name() {
1935            self.validate_set_cluster(session)?;
1936        }
1937        session
1938            .vars_mut()
1939            .reset(self.catalog().system_config(), &name, false)?;
1940        Ok(ExecuteResponse::SetVariable { name, reset: true })
1941    }
1942
1943    pub(super) fn sequence_set_transaction(
1944        &self,
1945        session: &mut Session,
1946        plan: plan::SetTransactionPlan,
1947    ) -> Result<ExecuteResponse, AdapterError> {
1948        // TODO(jkosh44) Only supports isolation levels for now.
1949        for mode in plan.modes {
1950            match mode {
1951                TransactionMode::AccessMode(_) => {
1952                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1953                }
1954                TransactionMode::IsolationLevel(isolation_level) => {
1955                    self.validate_set_isolation_level(session)?;
1956
1957                    session.vars_mut().set(
1958                        self.catalog().system_config(),
1959                        TRANSACTION_ISOLATION_VAR_NAME,
1960                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1961                        plan.local,
1962                    )?
1963                }
1964            }
1965        }
1966        Ok(ExecuteResponse::SetVariable {
1967            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1968            reset: false,
1969        })
1970    }
1971
1972    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1973        if session.transaction().contains_ops() {
1974            Err(AdapterError::InvalidSetIsolationLevel)
1975        } else {
1976            Ok(())
1977        }
1978    }
1979
1980    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1981        if session.transaction().contains_ops() {
1982            Err(AdapterError::InvalidSetCluster)
1983        } else {
1984            Ok(())
1985        }
1986    }
1987
1988    #[instrument]
1989    pub(super) async fn sequence_end_transaction(
1990        &mut self,
1991        mut ctx: ExecuteContext,
1992        mut action: EndTransactionAction,
1993    ) {
1994        // If the transaction has failed, we can only rollback.
1995        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1996            (&action, ctx.session().transaction())
1997        {
1998            action = EndTransactionAction::Rollback;
1999        }
2000        let response = match action {
2001            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2002                params: BTreeMap::new(),
2003            }),
2004            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2005                params: BTreeMap::new(),
2006            }),
2007        };
2008
2009        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2010
2011        let (response, action) = match result {
2012            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2013                (response, action)
2014            }
2015            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2016                // Make sure we have the correct set of write locks for this transaction.
2017                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2018                // transactions.
2019                let validated_locks = match write_lock_guards {
2020                    None => None,
2021                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2022                        Ok(locks) => Some(locks),
2023                        Err(missing) => {
2024                            tracing::error!(?missing, "programming error, missing write locks");
2025                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2026                        }
2027                    },
2028                };
2029
2030                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2031                for WriteOp { id, rows } in writes {
2032                    let total_rows = collected_writes.entry(id).or_default();
2033                    total_rows.push(rows);
2034                }
2035
2036                self.submit_write(PendingWriteTxn::User {
2037                    span: Span::current(),
2038                    writes: collected_writes,
2039                    write_locks: validated_locks,
2040                    pending_txn: PendingTxn {
2041                        ctx,
2042                        response,
2043                        action,
2044                    },
2045                });
2046                return;
2047            }
2048            Ok((
2049                Some(TransactionOps::Peeks {
2050                    determination,
2051                    requires_linearization: RequireLinearization::Required,
2052                    ..
2053                }),
2054                _,
2055            )) if ctx.session().vars().transaction_isolation()
2056                == &IsolationLevel::StrictSerializable =>
2057            {
2058                let conn_id = ctx.session().conn_id().clone();
2059                let pending_read_txn = PendingReadTxn {
2060                    txn: PendingRead::Read {
2061                        txn: PendingTxn {
2062                            ctx,
2063                            response,
2064                            action,
2065                        },
2066                    },
2067                    timestamp_context: determination.timestamp_context,
2068                    created: Instant::now(),
2069                    num_requeues: 0,
2070                    otel_ctx: OpenTelemetryContext::obtain(),
2071                };
2072                self.strict_serializable_reads_tx
2073                    .send((conn_id, pending_read_txn))
2074                    .expect("sending to strict_serializable_reads_tx cannot fail");
2075                return;
2076            }
2077            Ok((
2078                Some(TransactionOps::Peeks {
2079                    determination,
2080                    requires_linearization: RequireLinearization::Required,
2081                    ..
2082                }),
2083                _,
2084            )) if ctx.session().vars().transaction_isolation()
2085                == &IsolationLevel::StrongSessionSerializable =>
2086            {
2087                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2088                    ctx.session_mut()
2089                        .ensure_timestamp_oracle(timeline.clone())
2090                        .apply_write(*ts);
2091                }
2092                (response, action)
2093            }
2094            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2095                self.internal_cmd_tx
2096                    .send(Message::ExecuteSingleStatementTransaction {
2097                        ctx,
2098                        otel_ctx: OpenTelemetryContext::obtain(),
2099                        stmt,
2100                        params,
2101                    })
2102                    .expect("must send");
2103                return;
2104            }
2105            Ok((_, _)) => (response, action),
2106            Err(err) => (Err(err), EndTransactionAction::Rollback),
2107        };
2108        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2109        // Append any parameters that changed to the response.
2110        let response = response.map(|mut r| {
2111            r.extend_params(changed);
2112            ExecuteResponse::from(r)
2113        });
2114
2115        ctx.retire(response);
2116    }
2117
2118    #[instrument]
2119    async fn sequence_end_transaction_inner(
2120        &mut self,
2121        ctx: &mut ExecuteContext,
2122        action: EndTransactionAction,
2123    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2124        let txn = self.clear_transaction(ctx.session_mut()).await;
2125
2126        if let EndTransactionAction::Commit = action {
2127            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2128                match &mut ops {
2129                    TransactionOps::Writes(writes) => {
2130                        for WriteOp { id, .. } in &mut writes.iter() {
2131                            // Re-verify this id exists.
2132                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2133                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2134                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2135                                })
2136                            })?;
2137                        }
2138
2139                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2140                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2141                    }
2142                    TransactionOps::DDL {
2143                        ops,
2144                        state: _,
2145                        side_effects,
2146                        revision,
2147                    } => {
2148                        // Make sure our catalog hasn't changed.
2149                        if *revision != self.catalog().transient_revision() {
2150                            return Err(AdapterError::DDLTransactionRace);
2151                        }
2152                        // Commit all of our queued ops.
2153                        let ops = std::mem::take(ops);
2154                        let side_effects = std::mem::take(side_effects);
2155                        self.catalog_transact_with_side_effects(
2156                            Some(ctx),
2157                            ops,
2158                            move |a, mut ctx| {
2159                                Box::pin(async move {
2160                                    for side_effect in side_effects {
2161                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2162                                    }
2163                                })
2164                            },
2165                        )
2166                        .await?;
2167                    }
2168                    _ => (),
2169                }
2170                return Ok((Some(ops), write_lock_guards));
2171            }
2172        }
2173
2174        Ok((None, None))
2175    }
2176
2177    pub(super) async fn sequence_side_effecting_func(
2178        &mut self,
2179        ctx: ExecuteContext,
2180        plan: SideEffectingFunc,
2181    ) {
2182        match plan {
2183            SideEffectingFunc::PgCancelBackend { connection_id } => {
2184                if ctx.session().conn_id().unhandled() == connection_id {
2185                    // As a special case, if we're canceling ourselves, we send
2186                    // back a canceled resposne to the client issuing the query,
2187                    // and so we need to do no further processing of the cancel.
2188                    ctx.retire(Err(AdapterError::Canceled));
2189                    return;
2190                }
2191
2192                let res = if let Some((id_handle, _conn_meta)) =
2193                    self.active_conns.get_key_value(&connection_id)
2194                {
2195                    // check_plan already verified role membership.
2196                    self.handle_privileged_cancel(id_handle.clone()).await;
2197                    Datum::True
2198                } else {
2199                    Datum::False
2200                };
2201                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2202            }
2203        }
2204    }
2205
2206    /// Execute a side-effecting function from the frontend peek path.
2207    /// This is separate from `sequence_side_effecting_func` because
2208    /// - It doesn't have an ExecuteContext.
2209    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2210    ///   peek sequencing can't look at `active_conns`.
2211    ///
2212    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2213    /// sequencing.
2214    pub(crate) async fn execute_side_effecting_func(
2215        &mut self,
2216        plan: SideEffectingFunc,
2217        conn_id: ConnectionId,
2218        current_role: RoleId,
2219    ) -> Result<ExecuteResponse, AdapterError> {
2220        match plan {
2221            SideEffectingFunc::PgCancelBackend { connection_id } => {
2222                if conn_id.unhandled() == connection_id {
2223                    // As a special case, if we're canceling ourselves, we return
2224                    // a canceled response to the client issuing the query,
2225                    // and so we need to do no further processing of the cancel.
2226                    return Err(AdapterError::Canceled);
2227                }
2228
2229                // Perform RBAC check: the current user must be a member of the role
2230                // that owns the connection being cancelled.
2231                if let Some((_id_handle, conn_meta)) =
2232                    self.active_conns.get_key_value(&connection_id)
2233                {
2234                    let target_role = *conn_meta.authenticated_role_id();
2235                    let role_membership = self
2236                        .catalog()
2237                        .state()
2238                        .collect_role_membership(&current_role);
2239                    if !role_membership.contains(&target_role) {
2240                        let target_role_name = self
2241                            .catalog()
2242                            .try_get_role(&target_role)
2243                            .map(|role| role.name().to_string())
2244                            .unwrap_or_else(|| target_role.to_string());
2245                        return Err(AdapterError::Unauthorized(
2246                            rbac::UnauthorizedError::RoleMembership {
2247                                role_names: vec![target_role_name],
2248                            },
2249                        ));
2250                    }
2251
2252                    // RBAC check passed, proceed with cancellation.
2253                    let id_handle = self
2254                        .active_conns
2255                        .get_key_value(&connection_id)
2256                        .map(|(id, _)| id.clone())
2257                        .expect("checked above");
2258                    self.handle_privileged_cancel(id_handle).await;
2259                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2260                } else {
2261                    // Connection not found, return false.
2262                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2263                }
2264            }
2265        }
2266    }
2267
2268    /// Inner method that performs the actual real-time recency timestamp determination.
2269    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2270    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2271    pub(crate) async fn determine_real_time_recent_timestamp(
2272        &self,
2273        source_ids: impl Iterator<Item = GlobalId>,
2274        real_time_recency_timeout: Duration,
2275    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2276        let item_ids = source_ids
2277            .map(|gid| {
2278                self.catalog
2279                    .try_resolve_item_id(&gid)
2280                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2281            })
2282            .collect::<Result<Vec<_>, _>>()?;
2283
2284        // Find all dependencies transitively because we need to ensure that
2285        // RTR queries determine the timestamp from the sources' (i.e.
2286        // storage objects that ingest data from external systems) remap
2287        // data. We "cheat" a little bit and filter out any IDs that aren't
2288        // user objects because we know they are not a RTR source.
2289        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2290        // If none of the sources are user objects, we don't need to provide
2291        // a RTR timestamp.
2292        if to_visit.is_empty() {
2293            return Ok(None);
2294        }
2295
2296        let mut timestamp_objects = BTreeSet::new();
2297
2298        while let Some(id) = to_visit.pop_front() {
2299            timestamp_objects.insert(id);
2300            to_visit.extend(
2301                self.catalog()
2302                    .get_entry(&id)
2303                    .uses()
2304                    .into_iter()
2305                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2306            );
2307        }
2308        let timestamp_objects = timestamp_objects
2309            .into_iter()
2310            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2311            .collect();
2312
2313        let r = self
2314            .controller
2315            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2316            .await?;
2317
2318        Ok(Some(r))
2319    }
2320
2321    /// Checks to see if the session needs a real time recency timestamp and if so returns
2322    /// a future that will return the timestamp.
2323    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2324        &self,
2325        session: &Session,
2326        source_ids: impl Iterator<Item = GlobalId>,
2327    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2328        let vars = session.vars();
2329
2330        if vars.real_time_recency()
2331            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2332            && !session.contains_read_timestamp()
2333        {
2334            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2335                .await
2336        } else {
2337            Ok(None)
2338        }
2339    }
2340
2341    #[instrument]
2342    pub(super) async fn sequence_explain_plan(
2343        &mut self,
2344        ctx: ExecuteContext,
2345        plan: plan::ExplainPlanPlan,
2346        target_cluster: TargetCluster,
2347    ) {
2348        match &plan.explainee {
2349            plan::Explainee::Statement(stmt) => match stmt {
2350                plan::ExplaineeStatement::CreateView { .. } => {
2351                    self.explain_create_view(ctx, plan).await;
2352                }
2353                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2354                    self.explain_create_materialized_view(ctx, plan).await;
2355                }
2356                plan::ExplaineeStatement::CreateIndex { .. } => {
2357                    self.explain_create_index(ctx, plan).await;
2358                }
2359                plan::ExplaineeStatement::Select { .. } => {
2360                    self.explain_peek(ctx, plan, target_cluster).await;
2361                }
2362                plan::ExplaineeStatement::Subscribe { .. } => {
2363                    self.explain_subscribe(ctx, plan, target_cluster).await;
2364                }
2365            },
2366            plan::Explainee::View(_) => {
2367                let result = self.explain_view(&ctx, plan);
2368                ctx.retire(result);
2369            }
2370            plan::Explainee::MaterializedView(_) => {
2371                let result = self.explain_materialized_view(&ctx, plan);
2372                ctx.retire(result);
2373            }
2374            plan::Explainee::Index(_) => {
2375                let result = self.explain_index(&ctx, plan);
2376                ctx.retire(result);
2377            }
2378            plan::Explainee::ReplanView(_) => {
2379                self.explain_replan_view(ctx, plan).await;
2380            }
2381            plan::Explainee::ReplanMaterializedView(_) => {
2382                self.explain_replan_materialized_view(ctx, plan).await;
2383            }
2384            plan::Explainee::ReplanIndex(_) => {
2385                self.explain_replan_index(ctx, plan).await;
2386            }
2387        };
2388    }
2389
2390    pub(super) async fn sequence_explain_pushdown(
2391        &mut self,
2392        ctx: ExecuteContext,
2393        plan: plan::ExplainPushdownPlan,
2394        target_cluster: TargetCluster,
2395    ) {
2396        match plan.explainee {
2397            Explainee::Statement(ExplaineeStatement::Select {
2398                broken: false,
2399                plan,
2400                desc: _,
2401            }) => {
2402                let stage = return_if_err!(
2403                    self.peek_validate(
2404                        ctx.session(),
2405                        plan,
2406                        target_cluster,
2407                        None,
2408                        ExplainContext::Pushdown,
2409                        Some(ctx.session().vars().max_query_result_size()),
2410                    ),
2411                    ctx
2412                );
2413                self.sequence_staged(ctx, Span::current(), stage).await;
2414            }
2415            Explainee::MaterializedView(item_id) => {
2416                self.explain_pushdown_materialized_view(ctx, item_id).await;
2417            }
2418            _ => {
2419                ctx.retire(Err(AdapterError::Unsupported(
2420                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2421                )));
2422            }
2423        };
2424    }
2425
2426    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2427    async fn execute_explain_pushdown_with_read_holds(
2428        &self,
2429        ctx: ExecuteContext,
2430        as_of: Antichain<Timestamp>,
2431        mz_now: ResultSpec<'static>,
2432        read_holds: Option<ReadHolds<Timestamp>>,
2433        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2434    ) {
2435        let fut = self
2436            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2437            .await;
2438        task::spawn(|| "render explain pushdown", async move {
2439            // Transfer the necessary read holds over to the background task
2440            let _read_holds = read_holds;
2441            let res = fut.await;
2442            ctx.retire(res);
2443        });
2444    }
2445
2446    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2447    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2448        &self,
2449        session: &Session,
2450        as_of: Antichain<Timestamp>,
2451        mz_now: ResultSpec<'static>,
2452        imports: I,
2453    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2454        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2455        super::explain_pushdown_future_inner(
2456            session,
2457            &self.catalog,
2458            &self.controller.storage_collections,
2459            as_of,
2460            mz_now,
2461            imports,
2462        )
2463        .await
2464    }
2465
2466    #[instrument]
2467    pub(super) async fn sequence_insert(
2468        &mut self,
2469        mut ctx: ExecuteContext,
2470        plan: plan::InsertPlan,
2471    ) {
2472        // Normally, this would get checked when trying to add "write ops" to
2473        // the transaction but we go down diverging paths below, based on
2474        // whether the INSERT is only constant values or not.
2475        //
2476        // For the non-constant case we sequence an implicit read-then-write,
2477        // which messes with the transaction ops and would allow an implicit
2478        // read-then-write to sneak into a read-only transaction.
2479        if !ctx.session_mut().transaction().allows_writes() {
2480            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2481            return;
2482        }
2483
2484        // The structure of this code originates from a time where
2485        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2486        // optimized `MirRelationExpr`.
2487        //
2488        // Ideally, we would like to make the `selection.as_const().is_some()`
2489        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2490        // are planned as a Wrap($n, $vals) call, so until we can reduce
2491        // HirRelationExpr this will always returns false.
2492        //
2493        // Unfortunately, hitting the default path of the match below also
2494        // causes a lot of tests to fail, so we opted to go with the extra
2495        // `plan.values.clone()` statements when producing the `optimized_mir`
2496        // and re-optimize the values in the `sequence_read_then_write` call.
2497        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2498            // We don't perform any optimizations on an expression that is already
2499            // a constant for writes, as we want to maximize bulk-insert throughput.
2500            let expr = return_if_err!(
2501                plan.values
2502                    .clone()
2503                    .lower(self.catalog().system_config(), None),
2504                ctx
2505            );
2506            OptimizedMirRelationExpr(expr)
2507        } else {
2508            // Collect optimizer parameters.
2509            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2510
2511            // (`optimize::view::Optimizer` has a special case for constant queries.)
2512            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2513
2514            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2515            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2516        };
2517
2518        match optimized_mir.into_inner() {
2519            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2520                let catalog = self.owned_catalog();
2521                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2522                    let result =
2523                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2524                    ctx.retire(result);
2525                });
2526            }
2527            // All non-constant values must be planned as read-then-writes.
2528            _ => {
2529                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2530                    Some(table) => {
2531                        // Inserts always occur at the latest version of the table.
2532                        let desc = table.relation_desc_latest().expect("table has a desc");
2533                        desc.arity()
2534                    }
2535                    None => {
2536                        ctx.retire(Err(AdapterError::Catalog(
2537                            mz_catalog::memory::error::Error {
2538                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2539                                    plan.id.to_string(),
2540                                )),
2541                            },
2542                        )));
2543                        return;
2544                    }
2545                };
2546
2547                let finishing = RowSetFinishing {
2548                    order_by: vec![],
2549                    limit: None,
2550                    offset: 0,
2551                    project: (0..desc_arity).collect(),
2552                };
2553
2554                let read_then_write_plan = plan::ReadThenWritePlan {
2555                    id: plan.id,
2556                    selection: plan.values,
2557                    finishing,
2558                    assignments: BTreeMap::new(),
2559                    kind: MutationKind::Insert,
2560                    returning: plan.returning,
2561                };
2562
2563                self.sequence_read_then_write(ctx, read_then_write_plan)
2564                    .await;
2565            }
2566        }
2567    }
2568
2569    /// ReadThenWrite is a plan whose writes depend on the results of a
2570    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2571    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2572    /// serializability violation could occur.
2573    #[instrument]
2574    pub(super) async fn sequence_read_then_write(
2575        &mut self,
2576        mut ctx: ExecuteContext,
2577        plan: plan::ReadThenWritePlan,
2578    ) {
2579        let mut source_ids: BTreeSet<_> = plan
2580            .selection
2581            .depends_on()
2582            .into_iter()
2583            .map(|gid| self.catalog().resolve_item_id(&gid))
2584            .collect();
2585        source_ids.insert(plan.id);
2586
2587        // If the transaction doesn't already have write locks, acquire them.
2588        if ctx.session().transaction().write_locks().is_none() {
2589            // Pre-define all of the locks we need.
2590            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2591
2592            // Try acquiring all of our locks.
2593            for id in &source_ids {
2594                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2595                    write_locks.insert_lock(*id, lock);
2596                }
2597            }
2598
2599            // See if we acquired all of the neccessary locks.
2600            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2601                Ok(locks) => locks,
2602                Err(missing) => {
2603                    // Defer our write if we couldn't acquire all of the locks.
2604                    let role_metadata = ctx.session().role_metadata().clone();
2605                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2606                    let plan = DeferredPlan {
2607                        ctx,
2608                        plan: Plan::ReadThenWrite(plan),
2609                        validity: PlanValidity::new(
2610                            self.catalog.transient_revision(),
2611                            source_ids.clone(),
2612                            None,
2613                            None,
2614                            role_metadata,
2615                        ),
2616                        requires_locks: source_ids,
2617                    };
2618                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2619                }
2620            };
2621
2622            ctx.session_mut()
2623                .try_grant_write_locks(write_locks)
2624                .expect("session has already been granted write locks");
2625        }
2626
2627        let plan::ReadThenWritePlan {
2628            id,
2629            kind,
2630            selection,
2631            mut assignments,
2632            finishing,
2633            mut returning,
2634        } = plan;
2635
2636        // Read then writes can be queued, so re-verify the id exists.
2637        let desc = match self.catalog().try_get_entry(&id) {
2638            Some(table) => {
2639                // Inserts always occur at the latest version of the table.
2640                table
2641                    .relation_desc_latest()
2642                    .expect("table has a desc")
2643                    .into_owned()
2644            }
2645            None => {
2646                ctx.retire(Err(AdapterError::Catalog(
2647                    mz_catalog::memory::error::Error {
2648                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2649                    },
2650                )));
2651                return;
2652            }
2653        };
2654
2655        // Disallow mz_now in any position because read time and write time differ.
2656        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2657            || assignments.values().any(|e| e.contains_temporal())
2658            || returning.iter().any(|e| e.contains_temporal());
2659        if contains_temporal {
2660            ctx.retire(Err(AdapterError::Unsupported(
2661                "calls to mz_now in write statements",
2662            )));
2663            return;
2664        }
2665
2666        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2667        //
2668        // - They do not refer to any objects whose notion of time moves differently than that of
2669        // user tables. This limitation is meant to ensure no writes occur between this read and the
2670        // subsequent write.
2671        // - They do not use mz_now(), whose time produced during read will differ from the write
2672        //   timestamp.
2673        fn validate_read_dependencies(
2674            catalog: &Catalog,
2675            id: &CatalogItemId,
2676        ) -> Result<(), AdapterError> {
2677            use CatalogItemType::*;
2678            use mz_catalog::memory::objects;
2679            let mut ids_to_check = Vec::new();
2680            let valid = match catalog.try_get_entry(id) {
2681                Some(entry) => {
2682                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2683                    | CatalogItem::MaterializedView(objects::MaterializedView {
2684                        optimized_expr,
2685                        ..
2686                    }) = entry.item()
2687                    {
2688                        if optimized_expr.contains_temporal() {
2689                            return Err(AdapterError::Unsupported(
2690                                "calls to mz_now in write statements",
2691                            ));
2692                        }
2693                    }
2694                    match entry.item().typ() {
2695                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2696                            ids_to_check.extend(entry.uses());
2697                            let valid_id = id.is_user() || matches!(typ, Func);
2698                            valid_id
2699                        }
2700                        Source | Secret | Connection => false,
2701                        // Cannot select from sinks or indexes.
2702                        Sink | Index => unreachable!(),
2703                        Table => {
2704                            if !id.is_user() {
2705                                // We can't read from non-user tables
2706                                false
2707                            } else {
2708                                // We can't read from tables that are source-exports
2709                                entry.source_export_details().is_none()
2710                            }
2711                        }
2712                        Type => true,
2713                    }
2714                }
2715                None => false,
2716            };
2717            if !valid {
2718                let (object_name, object_type) = match catalog.try_get_entry(id) {
2719                    Some(entry) => {
2720                        let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2721                        let object_type = match entry.item().typ() {
2722                            // We only need the disallowed types here; the allowed types are handled above.
2723                            Source => "source",
2724                            Secret => "secret",
2725                            Connection => "connection",
2726                            Table => {
2727                                if !id.is_user() {
2728                                    "system table"
2729                                } else {
2730                                    "source-export table"
2731                                }
2732                            }
2733                            View => "system view",
2734                            MaterializedView => "system materialized view",
2735                            ContinualTask => "system task",
2736                            _ => "invalid dependency",
2737                        };
2738                        (object_name, object_type.to_string())
2739                    }
2740                    None => (id.to_string(), "unknown".to_string()),
2741                };
2742                return Err(AdapterError::InvalidTableMutationSelection {
2743                    object_name,
2744                    object_type,
2745                });
2746            }
2747            for id in ids_to_check {
2748                validate_read_dependencies(catalog, &id)?;
2749            }
2750            Ok(())
2751        }
2752
2753        for gid in selection.depends_on() {
2754            let item_id = self.catalog().resolve_item_id(&gid);
2755            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2756                ctx.retire(Err(err));
2757                return;
2758            }
2759        }
2760
2761        let (peek_tx, peek_rx) = oneshot::channel();
2762        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2763        let (tx, _, session, extra) = ctx.into_parts();
2764        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2765        // execution context, because this peek does not directly correspond to an execute,
2766        // and so we don't need to take any action on its retirement.
2767        // TODO[btv]: we might consider extending statement logging to log the inner
2768        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2769        // and mint a new "real" execution context here. We'd also have to add some logic to
2770        // make sure such "sub-statements" are always sampled when the top-level statement is
2771        //
2772        // It's debatable whether this makes sense conceptually,
2773        // because the inner fragment here is not actually a
2774        // "statement" in its own right.
2775        let peek_ctx = ExecuteContext::from_parts(
2776            peek_client_tx,
2777            self.internal_cmd_tx.clone(),
2778            session,
2779            Default::default(),
2780        );
2781
2782        self.sequence_peek(
2783            peek_ctx,
2784            plan::SelectPlan {
2785                select: None,
2786                source: selection,
2787                when: QueryWhen::FreshestTableWrite,
2788                finishing,
2789                copy_to: None,
2790            },
2791            TargetCluster::Active,
2792            None,
2793        )
2794        .await;
2795
2796        let internal_cmd_tx = self.internal_cmd_tx.clone();
2797        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2798        let catalog = self.owned_catalog();
2799        let max_result_size = self.catalog().system_config().max_result_size();
2800
2801        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2802            let (peek_response, session) = match peek_rx.await {
2803                Ok(Response {
2804                    result: Ok(resp),
2805                    session,
2806                    otel_ctx,
2807                }) => {
2808                    otel_ctx.attach_as_parent();
2809                    (resp, session)
2810                }
2811                Ok(Response {
2812                    result: Err(e),
2813                    session,
2814                    otel_ctx,
2815                }) => {
2816                    let ctx =
2817                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2818                    otel_ctx.attach_as_parent();
2819                    ctx.retire(Err(e));
2820                    return;
2821                }
2822                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2823                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2824            };
2825            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2826            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2827
2828            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2829            if timeout_dur == Duration::ZERO {
2830                timeout_dur = Duration::MAX;
2831            }
2832
2833            let style = ExprPrepOneShot {
2834                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2835                session: ctx.session(),
2836                catalog_state: catalog.state(),
2837            };
2838            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2839                return_if_err!(style.prep_scalar_expr(expr), ctx);
2840            }
2841
2842            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2843                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2844                    let arena = RowArena::new();
2845                    let mut diffs = Vec::new();
2846                    let mut datum_vec = mz_repr::DatumVec::new();
2847
2848                    while let Some(row) = rows.next() {
2849                        if !assignments.is_empty() {
2850                            assert!(
2851                                matches!(kind, MutationKind::Update),
2852                                "only updates support assignments"
2853                            );
2854                            let mut datums = datum_vec.borrow_with(row);
2855                            let mut updates = vec![];
2856                            for (idx, expr) in &assignments {
2857                                let updated = match expr.eval(&datums, &arena) {
2858                                    Ok(updated) => updated,
2859                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2860                                };
2861                                updates.push((*idx, updated));
2862                            }
2863                            for (idx, new_value) in updates {
2864                                datums[idx] = new_value;
2865                            }
2866                            let updated = Row::pack_slice(&datums);
2867                            diffs.push((updated, Diff::ONE));
2868                        }
2869                        match kind {
2870                            // Updates and deletes always remove the
2871                            // current row. Updates will also add an
2872                            // updated value.
2873                            MutationKind::Update | MutationKind::Delete => {
2874                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2875                            }
2876                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2877                        }
2878                    }
2879
2880                    // Sum of all the rows' byte size, for checking if we go
2881                    // above the max_result_size threshold.
2882                    let mut byte_size: u64 = 0;
2883                    for (row, diff) in &diffs {
2884                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2885                        if diff.is_positive() {
2886                            for (idx, datum) in row.iter().enumerate() {
2887                                desc.constraints_met(idx, &datum)?;
2888                            }
2889                        }
2890                    }
2891                    Ok((diffs, byte_size))
2892                };
2893
2894            let diffs = match peek_response {
2895                ExecuteResponse::SendingRowsStreaming {
2896                    rows: mut rows_stream,
2897                    ..
2898                } => {
2899                    let mut byte_size: u64 = 0;
2900                    let mut diffs = Vec::new();
2901                    let result = loop {
2902                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2903                            Ok(Some(res)) => match res {
2904                                PeekResponseUnary::Rows(new_rows) => {
2905                                    match make_diffs(new_rows) {
2906                                        Ok((mut new_diffs, new_byte_size)) => {
2907                                            byte_size = byte_size.saturating_add(new_byte_size);
2908                                            if byte_size > max_result_size {
2909                                                break Err(AdapterError::ResultSize(format!(
2910                                                    "result exceeds max size of {max_result_size}"
2911                                                )));
2912                                            }
2913                                            diffs.append(&mut new_diffs)
2914                                        }
2915                                        Err(e) => break Err(e),
2916                                    };
2917                                }
2918                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2919                                PeekResponseUnary::Error(e) => {
2920                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2921                                }
2922                            },
2923                            Ok(None) => break Ok(diffs),
2924                            Err(_) => {
2925                                // We timed out, so remove the pending peek. This is
2926                                // best-effort and doesn't guarantee we won't
2927                                // receive a response.
2928                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2929                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2930                                    conn_id: ctx.session().conn_id().clone(),
2931                                });
2932                                if let Err(e) = result {
2933                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2934                                }
2935                                break Err(AdapterError::StatementTimeout);
2936                            }
2937                        }
2938                    };
2939
2940                    result
2941                }
2942                ExecuteResponse::SendingRowsImmediate { rows } => {
2943                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2944                }
2945                resp => Err(AdapterError::Unstructured(anyhow!(
2946                    "unexpected peek response: {resp:?}"
2947                ))),
2948            };
2949
2950            let mut returning_rows = Vec::new();
2951            let mut diff_err: Option<AdapterError> = None;
2952            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2953                let arena = RowArena::new();
2954                for (row, diff) in diffs {
2955                    if !diff.is_positive() {
2956                        continue;
2957                    }
2958                    let mut returning_row = Row::with_capacity(returning.len());
2959                    let mut packer = returning_row.packer();
2960                    for expr in &returning {
2961                        let datums: Vec<_> = row.iter().collect();
2962                        match expr.eval(&datums, &arena) {
2963                            Ok(datum) => {
2964                                packer.push(datum);
2965                            }
2966                            Err(err) => {
2967                                diff_err = Some(err.into());
2968                                break;
2969                            }
2970                        }
2971                    }
2972                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2973                    let diff = match NonZeroUsize::try_from(diff) {
2974                        Ok(diff) => diff,
2975                        Err(err) => {
2976                            diff_err = Some(err.into());
2977                            break;
2978                        }
2979                    };
2980                    returning_rows.push((returning_row, diff));
2981                    if diff_err.is_some() {
2982                        break;
2983                    }
2984                }
2985            }
2986            let diffs = if let Some(err) = diff_err {
2987                Err(err)
2988            } else {
2989                diffs
2990            };
2991
2992            // We need to clear out the timestamp context so the write doesn't fail due to a
2993            // read only transaction.
2994            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2995            // No matter what isolation level the client is using, we must linearize this
2996            // read. The write will be performed right after this, as part of a single
2997            // transaction, so the write must have a timestamp greater than or equal to the
2998            // read.
2999            //
3000            // Note: It's only OK for the write to have a greater timestamp than the read
3001            // because the write lock prevents any other writes from happening in between
3002            // the read and write.
3003            if let Some(timestamp_context) = timestamp_context {
3004                let (tx, rx) = tokio::sync::oneshot::channel();
3005                let conn_id = ctx.session().conn_id().clone();
3006                let pending_read_txn = PendingReadTxn {
3007                    txn: PendingRead::ReadThenWrite { ctx, tx },
3008                    timestamp_context,
3009                    created: Instant::now(),
3010                    num_requeues: 0,
3011                    otel_ctx: OpenTelemetryContext::obtain(),
3012                };
3013                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3014                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3015                if let Err(e) = result {
3016                    warn!(
3017                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3018                        e
3019                    );
3020                    return;
3021                }
3022                let result = rx.await;
3023                // It is not an error for these results to be ready after `tx` has been dropped.
3024                ctx = match result {
3025                    Ok(Some(ctx)) => ctx,
3026                    Ok(None) => {
3027                        // Coordinator took our context and will handle responding to the client.
3028                        // This usually indicates that our transaction was aborted.
3029                        return;
3030                    }
3031                    Err(e) => {
3032                        warn!(
3033                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3034                            e
3035                        );
3036                        return;
3037                    }
3038                };
3039            }
3040
3041            match diffs {
3042                Ok(diffs) => {
3043                    let result = Self::send_diffs(
3044                        ctx.session_mut(),
3045                        plan::SendDiffsPlan {
3046                            id,
3047                            updates: diffs,
3048                            kind,
3049                            returning: returning_rows,
3050                            max_result_size,
3051                        },
3052                    );
3053                    ctx.retire(result);
3054                }
3055                Err(e) => {
3056                    ctx.retire(Err(e));
3057                }
3058            }
3059        });
3060    }
3061
3062    #[instrument]
3063    pub(super) async fn sequence_alter_item_rename(
3064        &mut self,
3065        ctx: &mut ExecuteContext,
3066        plan: plan::AlterItemRenamePlan,
3067    ) -> Result<ExecuteResponse, AdapterError> {
3068        let op = catalog::Op::RenameItem {
3069            id: plan.id,
3070            current_full_name: plan.current_full_name,
3071            to_name: plan.to_name,
3072        };
3073        match self
3074            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3075            .await
3076        {
3077            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3078            Err(err) => Err(err),
3079        }
3080    }
3081
3082    #[instrument]
3083    pub(super) async fn sequence_alter_retain_history(
3084        &mut self,
3085        ctx: &mut ExecuteContext,
3086        plan: plan::AlterRetainHistoryPlan,
3087    ) -> Result<ExecuteResponse, AdapterError> {
3088        let ops = vec![catalog::Op::AlterRetainHistory {
3089            id: plan.id,
3090            value: plan.value,
3091            window: plan.window,
3092        }];
3093        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3094            Box::pin(async move {
3095                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3096                let cluster = match catalog_item {
3097                    CatalogItem::Table(_)
3098                    | CatalogItem::MaterializedView(_)
3099                    | CatalogItem::Source(_)
3100                    | CatalogItem::ContinualTask(_) => None,
3101                    CatalogItem::Index(index) => Some(index.cluster_id),
3102                    CatalogItem::Log(_)
3103                    | CatalogItem::View(_)
3104                    | CatalogItem::Sink(_)
3105                    | CatalogItem::Type(_)
3106                    | CatalogItem::Func(_)
3107                    | CatalogItem::Secret(_)
3108                    | CatalogItem::Connection(_) => unreachable!(),
3109                };
3110                match cluster {
3111                    Some(cluster) => {
3112                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3113                    }
3114                    None => {
3115                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3116                    }
3117                }
3118            })
3119        })
3120        .await?;
3121        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3122    }
3123
3124    #[instrument]
3125    pub(super) async fn sequence_alter_schema_rename(
3126        &mut self,
3127        ctx: &mut ExecuteContext,
3128        plan: plan::AlterSchemaRenamePlan,
3129    ) -> Result<ExecuteResponse, AdapterError> {
3130        let (database_spec, schema_spec) = plan.cur_schema_spec;
3131        let op = catalog::Op::RenameSchema {
3132            database_spec,
3133            schema_spec,
3134            new_name: plan.new_schema_name,
3135            check_reserved_names: true,
3136        };
3137        match self
3138            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3139            .await
3140        {
3141            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3142            Err(err) => Err(err),
3143        }
3144    }
3145
3146    #[instrument]
3147    pub(super) async fn sequence_alter_schema_swap(
3148        &mut self,
3149        ctx: &mut ExecuteContext,
3150        plan: plan::AlterSchemaSwapPlan,
3151    ) -> Result<ExecuteResponse, AdapterError> {
3152        let plan::AlterSchemaSwapPlan {
3153            schema_a_spec: (schema_a_db, schema_a),
3154            schema_a_name,
3155            schema_b_spec: (schema_b_db, schema_b),
3156            schema_b_name,
3157            name_temp,
3158        } = plan;
3159
3160        let op_a = catalog::Op::RenameSchema {
3161            database_spec: schema_a_db,
3162            schema_spec: schema_a,
3163            new_name: name_temp,
3164            check_reserved_names: false,
3165        };
3166        let op_b = catalog::Op::RenameSchema {
3167            database_spec: schema_b_db,
3168            schema_spec: schema_b,
3169            new_name: schema_a_name,
3170            check_reserved_names: false,
3171        };
3172        let op_c = catalog::Op::RenameSchema {
3173            database_spec: schema_a_db,
3174            schema_spec: schema_a,
3175            new_name: schema_b_name,
3176            check_reserved_names: false,
3177        };
3178
3179        match self
3180            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3181                Box::pin(async {})
3182            })
3183            .await
3184        {
3185            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3186            Err(err) => Err(err),
3187        }
3188    }
3189
3190    #[instrument]
3191    pub(super) async fn sequence_alter_role(
3192        &mut self,
3193        session: &Session,
3194        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3195    ) -> Result<ExecuteResponse, AdapterError> {
3196        let catalog = self.catalog().for_session(session);
3197        let role = catalog.get_role(&id);
3198
3199        // We'll send these notices to the user, if the operation is successful.
3200        let mut notices = vec![];
3201
3202        // Get the attributes and variables from the role, as they currently are.
3203        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3204        let mut vars = role.vars().clone();
3205
3206        // Whether to set the password to NULL. This is a special case since the existing
3207        // password is not stored in the role attributes.
3208        let mut nopassword = false;
3209
3210        // Apply our updates.
3211        match option {
3212            PlannedAlterRoleOption::Attributes(attrs) => {
3213                self.validate_role_attributes(&attrs.clone().into())?;
3214
3215                if let Some(inherit) = attrs.inherit {
3216                    attributes.inherit = inherit;
3217                }
3218
3219                if let Some(password) = attrs.password {
3220                    attributes.password = Some(password);
3221                    attributes.scram_iterations =
3222                        Some(self.catalog().system_config().scram_iterations())
3223                }
3224
3225                if let Some(superuser) = attrs.superuser {
3226                    attributes.superuser = Some(superuser);
3227                }
3228
3229                if let Some(login) = attrs.login {
3230                    attributes.login = Some(login);
3231                }
3232
3233                if attrs.nopassword.unwrap_or(false) {
3234                    nopassword = true;
3235                }
3236
3237                if let Some(notice) = self.should_emit_rbac_notice(session) {
3238                    notices.push(notice);
3239                }
3240            }
3241            PlannedAlterRoleOption::Variable(variable) => {
3242                // Get the variable to make sure it's valid and visible.
3243                let session_var = session.vars().inspect(variable.name())?;
3244                // Return early if it's not visible.
3245                session_var.visible(session.user(), catalog.system_vars())?;
3246
3247                // Emit a warning when deprecated variables are used.
3248                // TODO(database-issues#8069) remove this after sufficient time has passed
3249                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3250                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3251                } else if let PlannedRoleVariable::Set {
3252                    name,
3253                    value: VariableValue::Values(vals),
3254                } = &variable
3255                {
3256                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3257                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3258                    }
3259                }
3260
3261                let var_name = match variable {
3262                    PlannedRoleVariable::Set { name, value } => {
3263                        // Update our persisted set.
3264                        match &value {
3265                            VariableValue::Default => {
3266                                vars.remove(&name);
3267                            }
3268                            VariableValue::Values(vals) => {
3269                                let var = match &vals[..] {
3270                                    [val] => OwnedVarInput::Flat(val.clone()),
3271                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3272                                };
3273                                // Make sure the input is valid.
3274                                session_var.check(var.borrow())?;
3275
3276                                vars.insert(name.clone(), var);
3277                            }
3278                        };
3279                        name
3280                    }
3281                    PlannedRoleVariable::Reset { name } => {
3282                        // Remove it from our persisted values.
3283                        vars.remove(&name);
3284                        name
3285                    }
3286                };
3287
3288                // Emit a notice that they need to reconnect to see the change take effect.
3289                notices.push(AdapterNotice::VarDefaultUpdated {
3290                    role: Some(name.clone()),
3291                    var_name: Some(var_name),
3292                });
3293            }
3294        }
3295
3296        let op = catalog::Op::AlterRole {
3297            id,
3298            name,
3299            attributes,
3300            nopassword,
3301            vars: RoleVars { map: vars },
3302        };
3303        let response = self
3304            .catalog_transact(Some(session), vec![op])
3305            .await
3306            .map(|_| ExecuteResponse::AlteredRole)?;
3307
3308        // Send all of our queued notices.
3309        session.add_notices(notices);
3310
3311        Ok(response)
3312    }
3313
3314    #[instrument]
3315    pub(super) async fn sequence_alter_sink_prepare(
3316        &mut self,
3317        ctx: ExecuteContext,
3318        plan: plan::AlterSinkPlan,
3319    ) {
3320        // Put a read hold on the new relation
3321        let id_bundle = crate::CollectionIdBundle {
3322            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3323            compute_ids: BTreeMap::new(),
3324        };
3325        let read_hold = self.acquire_read_holds(&id_bundle);
3326
3327        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3328            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3329            return;
3330        };
3331
3332        let otel_ctx = OpenTelemetryContext::obtain();
3333        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3334
3335        let plan_validity = PlanValidity::new(
3336            self.catalog().transient_revision(),
3337            BTreeSet::from_iter([plan.item_id, from_item_id]),
3338            Some(plan.in_cluster),
3339            None,
3340            ctx.session().role_metadata().clone(),
3341        );
3342
3343        info!(
3344            "preparing alter sink for {}: frontiers={:?} export={:?}",
3345            plan.global_id,
3346            self.controller
3347                .storage_collections
3348                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3349            self.controller.storage.export(plan.global_id)
3350        );
3351
3352        // Now we must wait for the sink to make enough progress such that there is overlap between
3353        // the new `from` collection's read hold and the sink's write frontier.
3354        //
3355        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3356        // the watch set never completes and neither does the `ALTER SINK` command.
3357        self.install_storage_watch_set(
3358            ctx.session().conn_id().clone(),
3359            BTreeSet::from_iter([plan.global_id]),
3360            read_ts,
3361            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3362                ctx: Some(ctx),
3363                otel_ctx,
3364                plan,
3365                plan_validity,
3366                read_hold,
3367            }),
3368        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3369    }
3370
3371    #[instrument]
3372    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3373        ctx.otel_ctx.attach_as_parent();
3374
3375        let plan::AlterSinkPlan {
3376            item_id,
3377            global_id,
3378            sink: sink_plan,
3379            with_snapshot,
3380            in_cluster,
3381        } = ctx.plan.clone();
3382
3383        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3384        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3385        // arbitrarily changed since we performed planning, and we must re-assert that it still
3386        // matches our requirements.
3387        //
3388        // The `PlanValidity` check ensures that both the sink and the new source relation still
3389        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3390        // time, which we do by comparing the catalog sink version to the one in the plan.
3391        match ctx.plan_validity.check(self.catalog()) {
3392            Ok(()) => {}
3393            Err(err) => {
3394                ctx.retire(Err(err));
3395                return;
3396            }
3397        }
3398
3399        let entry = self.catalog().get_entry(&item_id);
3400        let CatalogItem::Sink(old_sink) = entry.item() else {
3401            panic!("invalid item kind for `AlterSinkPlan`");
3402        };
3403
3404        if sink_plan.version != old_sink.version + 1 {
3405            ctx.retire(Err(AdapterError::ChangedPlan(
3406                "sink was altered concurrently".into(),
3407            )));
3408            return;
3409        }
3410
3411        info!(
3412            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3413            self.controller
3414                .storage_collections
3415                .collections_frontiers(vec![global_id, sink_plan.from]),
3416            self.controller.storage.export(global_id),
3417        );
3418
3419        // Assert that we can recover the updates that happened at the timestamps of the write
3420        // frontier. This must be true in this call.
3421        let write_frontier = &self
3422            .controller
3423            .storage
3424            .export(global_id)
3425            .expect("sink known to exist")
3426            .write_frontier;
3427        let as_of = ctx.read_hold.least_valid_read();
3428        assert!(
3429            write_frontier.iter().all(|t| as_of.less_than(t)),
3430            "{:?} should be strictly less than {:?}",
3431            &*as_of,
3432            &**write_frontier
3433        );
3434
3435        // Parse the `create_sql` so we can update it to the new sink definition.
3436        //
3437        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3438        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3439        // names in the `create_sql` may have changed, for example due to a schema swap.
3440        let create_sql = &old_sink.create_sql;
3441        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3442        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3443            unreachable!("invalid statement kind for sink");
3444        };
3445
3446        // Update the sink version.
3447        stmt.with_options
3448            .retain(|o| o.name != CreateSinkOptionName::Version);
3449        stmt.with_options.push(CreateSinkOption {
3450            name: CreateSinkOptionName::Version,
3451            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3452                sink_plan.version.to_string(),
3453            ))),
3454        });
3455
3456        let conn_catalog = self.catalog().for_system_session();
3457        let (mut stmt, resolved_ids) =
3458            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3459
3460        // Update the `from` relation.
3461        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3462        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3463        stmt.from = ResolvedItemName::Item {
3464            id: from_entry.id(),
3465            qualifiers: from_entry.name.qualifiers.clone(),
3466            full_name,
3467            print_id: true,
3468            version: from_entry.version,
3469        };
3470
3471        let new_sink = Sink {
3472            create_sql: stmt.to_ast_string_stable(),
3473            global_id,
3474            from: sink_plan.from,
3475            connection: sink_plan.connection.clone(),
3476            envelope: sink_plan.envelope,
3477            version: sink_plan.version,
3478            with_snapshot,
3479            resolved_ids: resolved_ids.clone(),
3480            cluster_id: in_cluster,
3481            commit_interval: sink_plan.commit_interval,
3482        };
3483
3484        let ops = vec![catalog::Op::UpdateItem {
3485            id: item_id,
3486            name: entry.name().clone(),
3487            to_item: CatalogItem::Sink(new_sink),
3488        }];
3489
3490        match self
3491            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3492            .await
3493        {
3494            Ok(()) => {}
3495            Err(err) => {
3496                ctx.retire(Err(err));
3497                return;
3498            }
3499        }
3500
3501        let storage_sink_desc = StorageSinkDesc {
3502            from: sink_plan.from,
3503            from_desc: from_entry
3504                .relation_desc()
3505                .expect("sinks can only be built on items with descs")
3506                .into_owned(),
3507            connection: sink_plan
3508                .connection
3509                .clone()
3510                .into_inline_connection(self.catalog().state()),
3511            envelope: sink_plan.envelope,
3512            as_of,
3513            with_snapshot,
3514            version: sink_plan.version,
3515            from_storage_metadata: (),
3516            to_storage_metadata: (),
3517            commit_interval: sink_plan.commit_interval,
3518        };
3519
3520        self.controller
3521            .storage
3522            .alter_export(
3523                global_id,
3524                ExportDescription {
3525                    sink: storage_sink_desc,
3526                    instance_id: in_cluster,
3527                },
3528            )
3529            .await
3530            .unwrap_or_terminate("cannot fail to alter source desc");
3531
3532        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3533    }
3534
3535    #[instrument]
3536    pub(super) async fn sequence_alter_connection(
3537        &mut self,
3538        ctx: ExecuteContext,
3539        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3540    ) {
3541        match action {
3542            AlterConnectionAction::RotateKeys => {
3543                self.sequence_rotate_keys(ctx, id).await;
3544            }
3545            AlterConnectionAction::AlterOptions {
3546                set_options,
3547                drop_options,
3548                validate,
3549            } => {
3550                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3551                    .await
3552            }
3553        }
3554    }
3555
3556    #[instrument]
3557    async fn sequence_alter_connection_options(
3558        &mut self,
3559        mut ctx: ExecuteContext,
3560        id: CatalogItemId,
3561        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3562        drop_options: BTreeSet<ConnectionOptionName>,
3563        validate: bool,
3564    ) {
3565        let cur_entry = self.catalog().get_entry(&id);
3566        let cur_conn = cur_entry.connection().expect("known to be connection");
3567        let connection_gid = cur_conn.global_id();
3568
3569        let inner = || -> Result<Connection, AdapterError> {
3570            // Parse statement.
3571            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3572                .expect("invalid create sql persisted to catalog")
3573                .into_element()
3574                .ast
3575            {
3576                Statement::CreateConnection(stmt) => stmt,
3577                _ => unreachable!("proved type is source"),
3578            };
3579
3580            let catalog = self.catalog().for_system_session();
3581
3582            // Resolve items in statement
3583            let (mut create_conn_stmt, resolved_ids) =
3584                mz_sql::names::resolve(&catalog, create_conn_stmt)
3585                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3586
3587            // Retain options that are neither set nor dropped.
3588            create_conn_stmt
3589                .values
3590                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3591
3592            // Set new values
3593            create_conn_stmt.values.extend(
3594                set_options
3595                    .into_iter()
3596                    .map(|(name, value)| ConnectionOption { name, value }),
3597            );
3598
3599            // Open a new catalog, which we will use to re-plan our
3600            // statement with the desired config.
3601            let mut catalog = self.catalog().for_system_session();
3602            catalog.mark_id_unresolvable_for_replanning(id);
3603
3604            // Re-define our source in terms of the amended statement
3605            let plan = match mz_sql::plan::plan(
3606                None,
3607                &catalog,
3608                Statement::CreateConnection(create_conn_stmt),
3609                &Params::empty(),
3610                &resolved_ids,
3611            )
3612            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3613            {
3614                Plan::CreateConnection(plan) => plan,
3615                _ => unreachable!("create source plan is only valid response"),
3616            };
3617
3618            // Parse statement.
3619            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3620                .expect("invalid create sql persisted to catalog")
3621                .into_element()
3622                .ast
3623            {
3624                Statement::CreateConnection(stmt) => stmt,
3625                _ => unreachable!("proved type is source"),
3626            };
3627
3628            let catalog = self.catalog().for_system_session();
3629
3630            // Resolve items in statement
3631            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3632                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3633
3634            Ok(Connection {
3635                create_sql: plan.connection.create_sql,
3636                global_id: cur_conn.global_id,
3637                details: plan.connection.details,
3638                resolved_ids: new_deps,
3639            })
3640        };
3641
3642        let conn = match inner() {
3643            Ok(conn) => conn,
3644            Err(e) => {
3645                return ctx.retire(Err(e));
3646            }
3647        };
3648
3649        if validate {
3650            let connection = conn
3651                .details
3652                .to_connection()
3653                .into_inline_connection(self.catalog().state());
3654
3655            let internal_cmd_tx = self.internal_cmd_tx.clone();
3656            let transient_revision = self.catalog().transient_revision();
3657            let conn_id = ctx.session().conn_id().clone();
3658            let otel_ctx = OpenTelemetryContext::obtain();
3659            let role_metadata = ctx.session().role_metadata().clone();
3660            let current_storage_parameters = self.controller.storage.config().clone();
3661
3662            task::spawn(
3663                || format!("validate_alter_connection:{conn_id}"),
3664                async move {
3665                    let resolved_ids = conn.resolved_ids.clone();
3666                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3667                    let result = match connection.validate(id, &current_storage_parameters).await {
3668                        Ok(()) => Ok(conn),
3669                        Err(err) => Err(err.into()),
3670                    };
3671
3672                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3673                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3674                        AlterConnectionValidationReady {
3675                            ctx,
3676                            result,
3677                            connection_id: id,
3678                            connection_gid,
3679                            plan_validity: PlanValidity::new(
3680                                transient_revision,
3681                                dependency_ids.clone(),
3682                                None,
3683                                None,
3684                                role_metadata,
3685                            ),
3686                            otel_ctx,
3687                            resolved_ids,
3688                        },
3689                    ));
3690                    if let Err(e) = result {
3691                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3692                    }
3693                },
3694            );
3695        } else {
3696            let result = self
3697                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3698                .await;
3699            ctx.retire(result);
3700        }
3701    }
3702
3703    #[instrument]
3704    pub(crate) async fn sequence_alter_connection_stage_finish(
3705        &mut self,
3706        session: &Session,
3707        id: CatalogItemId,
3708        connection: Connection,
3709    ) -> Result<ExecuteResponse, AdapterError> {
3710        match self.catalog.get_entry(&id).item() {
3711            CatalogItem::Connection(curr_conn) => {
3712                curr_conn
3713                    .details
3714                    .to_connection()
3715                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3716                    .map_err(StorageError::from)?;
3717            }
3718            _ => unreachable!("known to be a connection"),
3719        };
3720
3721        let ops = vec![catalog::Op::UpdateItem {
3722            id,
3723            name: self.catalog.get_entry(&id).name().clone(),
3724            to_item: CatalogItem::Connection(connection.clone()),
3725        }];
3726
3727        self.catalog_transact(Some(session), ops).await?;
3728
3729        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3730        // and propagating connection changes to dependent sources, sinks, and
3731        // tables) is handled in `apply_catalog_implications` via
3732        // `handle_alter_connection`. The catalog transact above triggers that
3733        // code path.
3734
3735        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3736    }
3737
3738    #[instrument]
3739    pub(super) async fn sequence_alter_source(
3740        &mut self,
3741        session: &Session,
3742        plan::AlterSourcePlan {
3743            item_id,
3744            ingestion_id,
3745            action,
3746        }: plan::AlterSourcePlan,
3747    ) -> Result<ExecuteResponse, AdapterError> {
3748        let cur_entry = self.catalog().get_entry(&item_id);
3749        let cur_source = cur_entry.source().expect("known to be source");
3750
3751        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3752            // Parse statement.
3753            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3754                .expect("invalid create sql persisted to catalog")
3755                .into_element()
3756                .ast
3757            {
3758                Statement::CreateSource(stmt) => stmt,
3759                _ => unreachable!("proved type is source"),
3760            };
3761
3762            let catalog = coord.catalog().for_system_session();
3763
3764            // Resolve items in statement
3765            mz_sql::names::resolve(&catalog, create_source_stmt)
3766                .map_err(|e| AdapterError::internal(err_cx, e))
3767        };
3768
3769        match action {
3770            plan::AlterSourceAction::AddSubsourceExports {
3771                subsources,
3772                options,
3773            } => {
3774                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3775
3776                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3777                    text_columns: mut new_text_columns,
3778                    exclude_columns: mut new_exclude_columns,
3779                    ..
3780                } = options.try_into()?;
3781
3782                // Resolve items in statement
3783                let (mut create_source_stmt, resolved_ids) =
3784                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3785
3786                // Get all currently referred-to items
3787                let catalog = self.catalog();
3788                let curr_references: BTreeSet<_> = catalog
3789                    .get_entry(&item_id)
3790                    .used_by()
3791                    .into_iter()
3792                    .filter_map(|subsource| {
3793                        catalog
3794                            .get_entry(subsource)
3795                            .subsource_details()
3796                            .map(|(_id, reference, _details)| reference)
3797                    })
3798                    .collect();
3799
3800                // We are doing a lot of unwrapping, so just make an error to reference; all of
3801                // these invariants are guaranteed to be true because of how we plan subsources.
3802                let purification_err =
3803                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3804
3805                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3806                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3807                // statement.
3808                match &mut create_source_stmt.connection {
3809                    CreateSourceConnection::Postgres {
3810                        options: curr_options,
3811                        ..
3812                    } => {
3813                        let mz_sql::plan::PgConfigOptionExtracted {
3814                            mut text_columns, ..
3815                        } = curr_options.clone().try_into()?;
3816
3817                        // Drop text columns; we will add them back in
3818                        // as appropriate below.
3819                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3820
3821                        // Drop all text columns that are not currently referred to.
3822                        text_columns.retain(|column_qualified_reference| {
3823                            mz_ore::soft_assert_eq_or_log!(
3824                                column_qualified_reference.0.len(),
3825                                4,
3826                                "all TEXT COLUMNS values must be column-qualified references"
3827                            );
3828                            let mut table = column_qualified_reference.clone();
3829                            table.0.truncate(3);
3830                            curr_references.contains(&table)
3831                        });
3832
3833                        // Merge the current text columns into the new text columns.
3834                        new_text_columns.extend(text_columns);
3835
3836                        // If we have text columns, add them to the options.
3837                        if !new_text_columns.is_empty() {
3838                            new_text_columns.sort();
3839                            let new_text_columns = new_text_columns
3840                                .into_iter()
3841                                .map(WithOptionValue::UnresolvedItemName)
3842                                .collect();
3843
3844                            curr_options.push(PgConfigOption {
3845                                name: PgConfigOptionName::TextColumns,
3846                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3847                            });
3848                        }
3849                    }
3850                    CreateSourceConnection::MySql {
3851                        options: curr_options,
3852                        ..
3853                    } => {
3854                        let mz_sql::plan::MySqlConfigOptionExtracted {
3855                            mut text_columns,
3856                            mut exclude_columns,
3857                            ..
3858                        } = curr_options.clone().try_into()?;
3859
3860                        // Drop both ignore and text columns; we will add them back in
3861                        // as appropriate below.
3862                        curr_options.retain(|o| {
3863                            !matches!(
3864                                o.name,
3865                                MySqlConfigOptionName::TextColumns
3866                                    | MySqlConfigOptionName::ExcludeColumns
3867                            )
3868                        });
3869
3870                        // Drop all text / exclude columns that are not currently referred to.
3871                        let column_referenced =
3872                            |column_qualified_reference: &UnresolvedItemName| {
3873                                mz_ore::soft_assert_eq_or_log!(
3874                                    column_qualified_reference.0.len(),
3875                                    3,
3876                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3877                                );
3878                                let mut table = column_qualified_reference.clone();
3879                                table.0.truncate(2);
3880                                curr_references.contains(&table)
3881                            };
3882                        text_columns.retain(column_referenced);
3883                        exclude_columns.retain(column_referenced);
3884
3885                        // Merge the current text / exclude columns into the new text / exclude columns.
3886                        new_text_columns.extend(text_columns);
3887                        new_exclude_columns.extend(exclude_columns);
3888
3889                        // If we have text columns, add them to the options.
3890                        if !new_text_columns.is_empty() {
3891                            new_text_columns.sort();
3892                            let new_text_columns = new_text_columns
3893                                .into_iter()
3894                                .map(WithOptionValue::UnresolvedItemName)
3895                                .collect();
3896
3897                            curr_options.push(MySqlConfigOption {
3898                                name: MySqlConfigOptionName::TextColumns,
3899                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3900                            });
3901                        }
3902                        // If we have exclude columns, add them to the options.
3903                        if !new_exclude_columns.is_empty() {
3904                            new_exclude_columns.sort();
3905                            let new_exclude_columns = new_exclude_columns
3906                                .into_iter()
3907                                .map(WithOptionValue::UnresolvedItemName)
3908                                .collect();
3909
3910                            curr_options.push(MySqlConfigOption {
3911                                name: MySqlConfigOptionName::ExcludeColumns,
3912                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3913                            });
3914                        }
3915                    }
3916                    CreateSourceConnection::SqlServer {
3917                        options: curr_options,
3918                        ..
3919                    } => {
3920                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3921                            mut text_columns,
3922                            mut exclude_columns,
3923                            ..
3924                        } = curr_options.clone().try_into()?;
3925
3926                        // Drop both ignore and text columns; we will add them back in
3927                        // as appropriate below.
3928                        curr_options.retain(|o| {
3929                            !matches!(
3930                                o.name,
3931                                SqlServerConfigOptionName::TextColumns
3932                                    | SqlServerConfigOptionName::ExcludeColumns
3933                            )
3934                        });
3935
3936                        // Drop all text / exclude columns that are not currently referred to.
3937                        let column_referenced =
3938                            |column_qualified_reference: &UnresolvedItemName| {
3939                                mz_ore::soft_assert_eq_or_log!(
3940                                    column_qualified_reference.0.len(),
3941                                    3,
3942                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3943                                );
3944                                let mut table = column_qualified_reference.clone();
3945                                table.0.truncate(2);
3946                                curr_references.contains(&table)
3947                            };
3948                        text_columns.retain(column_referenced);
3949                        exclude_columns.retain(column_referenced);
3950
3951                        // Merge the current text / exclude columns into the new text / exclude columns.
3952                        new_text_columns.extend(text_columns);
3953                        new_exclude_columns.extend(exclude_columns);
3954
3955                        // If we have text columns, add them to the options.
3956                        if !new_text_columns.is_empty() {
3957                            new_text_columns.sort();
3958                            let new_text_columns = new_text_columns
3959                                .into_iter()
3960                                .map(WithOptionValue::UnresolvedItemName)
3961                                .collect();
3962
3963                            curr_options.push(SqlServerConfigOption {
3964                                name: SqlServerConfigOptionName::TextColumns,
3965                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3966                            });
3967                        }
3968                        // If we have exclude columns, add them to the options.
3969                        if !new_exclude_columns.is_empty() {
3970                            new_exclude_columns.sort();
3971                            let new_exclude_columns = new_exclude_columns
3972                                .into_iter()
3973                                .map(WithOptionValue::UnresolvedItemName)
3974                                .collect();
3975
3976                            curr_options.push(SqlServerConfigOption {
3977                                name: SqlServerConfigOptionName::ExcludeColumns,
3978                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3979                            });
3980                        }
3981                    }
3982                    _ => return Err(purification_err()),
3983                };
3984
3985                let mut catalog = self.catalog().for_system_session();
3986                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3987
3988                // Re-define our source in terms of the amended statement
3989                let plan = match mz_sql::plan::plan(
3990                    None,
3991                    &catalog,
3992                    Statement::CreateSource(create_source_stmt),
3993                    &Params::empty(),
3994                    &resolved_ids,
3995                )
3996                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
3997                {
3998                    Plan::CreateSource(plan) => plan,
3999                    _ => unreachable!("create source plan is only valid response"),
4000                };
4001
4002                // Asserting that we've done the right thing with dependencies
4003                // here requires mocking out objects in the catalog, which is a
4004                // large task for an operation we have to cover in tests anyway.
4005                let source = Source::new(
4006                    plan,
4007                    cur_source.global_id,
4008                    resolved_ids,
4009                    cur_source.custom_logical_compaction_window,
4010                    cur_source.is_retained_metrics_object,
4011                );
4012
4013                // Get new ingestion description for storage.
4014                let desc = match &source.data_source {
4015                    DataSourceDesc::Ingestion { desc, .. }
4016                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4017                        desc.clone().into_inline_connection(self.catalog().state())
4018                    }
4019                    _ => unreachable!("already verified of type ingestion"),
4020                };
4021
4022                self.controller
4023                    .storage
4024                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4025                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4026
4027                // Redefine source. This must be done before we create any new
4028                // subsources so that it has the right ingestion.
4029                let mut ops = vec![catalog::Op::UpdateItem {
4030                    id: item_id,
4031                    // Look this up again so we don't have to hold an immutable reference to the
4032                    // entry for so long.
4033                    name: self.catalog.get_entry(&item_id).name().clone(),
4034                    to_item: CatalogItem::Source(source),
4035                }];
4036
4037                let CreateSourceInner {
4038                    ops: new_ops,
4039                    sources: _,
4040                    if_not_exists_ids,
4041                } = self.create_source_inner(session, subsources).await?;
4042
4043                ops.extend(new_ops.into_iter());
4044
4045                assert!(
4046                    if_not_exists_ids.is_empty(),
4047                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4048                );
4049
4050                self.catalog_transact(Some(session), ops).await?;
4051            }
4052            plan::AlterSourceAction::RefreshReferences { references } => {
4053                self.catalog_transact(
4054                    Some(session),
4055                    vec![catalog::Op::UpdateSourceReferences {
4056                        source_id: item_id,
4057                        references: references.into(),
4058                    }],
4059                )
4060                .await?;
4061            }
4062        }
4063
4064        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4065    }
4066
4067    #[instrument]
4068    pub(super) async fn sequence_alter_system_set(
4069        &mut self,
4070        session: &Session,
4071        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4072    ) -> Result<ExecuteResponse, AdapterError> {
4073        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4074        // We want to ensure that the network policy we're switching too actually exists.
4075        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4076            self.validate_alter_system_network_policy(session, &value)?;
4077        }
4078
4079        let op = match value {
4080            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4081                name: name.clone(),
4082                value: OwnedVarInput::SqlSet(values),
4083            },
4084            plan::VariableValue::Default => {
4085                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4086            }
4087        };
4088        self.catalog_transact(Some(session), vec![op]).await?;
4089
4090        session.add_notice(AdapterNotice::VarDefaultUpdated {
4091            role: None,
4092            var_name: Some(name),
4093        });
4094        Ok(ExecuteResponse::AlteredSystemConfiguration)
4095    }
4096
4097    #[instrument]
4098    pub(super) async fn sequence_alter_system_reset(
4099        &mut self,
4100        session: &Session,
4101        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4102    ) -> Result<ExecuteResponse, AdapterError> {
4103        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4104        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4105        self.catalog_transact(Some(session), vec![op]).await?;
4106        session.add_notice(AdapterNotice::VarDefaultUpdated {
4107            role: None,
4108            var_name: Some(name),
4109        });
4110        Ok(ExecuteResponse::AlteredSystemConfiguration)
4111    }
4112
4113    #[instrument]
4114    pub(super) async fn sequence_alter_system_reset_all(
4115        &mut self,
4116        session: &Session,
4117        _: plan::AlterSystemResetAllPlan,
4118    ) -> Result<ExecuteResponse, AdapterError> {
4119        self.is_user_allowed_to_alter_system(session, None)?;
4120        let op = catalog::Op::ResetAllSystemConfiguration;
4121        self.catalog_transact(Some(session), vec![op]).await?;
4122        session.add_notice(AdapterNotice::VarDefaultUpdated {
4123            role: None,
4124            var_name: None,
4125        });
4126        Ok(ExecuteResponse::AlteredSystemConfiguration)
4127    }
4128
4129    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4130    fn is_user_allowed_to_alter_system(
4131        &self,
4132        session: &Session,
4133        var_name: Option<&str>,
4134    ) -> Result<(), AdapterError> {
4135        match (session.user().kind(), var_name) {
4136            // Only internal superusers can reset all system variables.
4137            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4138            // Whether or not a variable can be modified depends if we're an internal superuser.
4139            (UserKind::Superuser, Some(name))
4140                if session.user().is_internal()
4141                    || self.catalog().system_config().user_modifiable(name) =>
4142            {
4143                // In lieu of plumbing the user to all system config functions, just check that
4144                // the var is visible.
4145                let var = self.catalog().system_config().get(name)?;
4146                var.visible(session.user(), self.catalog().system_config())?;
4147                Ok(())
4148            }
4149            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4150            // session variables.
4151            (UserKind::Regular, Some(name))
4152                if self.catalog().system_config().user_modifiable(name) =>
4153            {
4154                Err(AdapterError::Unauthorized(
4155                    rbac::UnauthorizedError::Superuser {
4156                        action: format!("toggle the '{name}' system configuration parameter"),
4157                    },
4158                ))
4159            }
4160            _ => Err(AdapterError::Unauthorized(
4161                rbac::UnauthorizedError::MzSystem {
4162                    action: "alter system".into(),
4163                },
4164            )),
4165        }
4166    }
4167
4168    fn validate_alter_system_network_policy(
4169        &self,
4170        session: &Session,
4171        policy_value: &plan::VariableValue,
4172    ) -> Result<(), AdapterError> {
4173        let policy_name = match &policy_value {
4174            // Make sure the compiled in default still exists.
4175            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4176            plan::VariableValue::Values(values) if values.len() == 1 => {
4177                values.iter().next().cloned()
4178            }
4179            plan::VariableValue::Values(values) => {
4180                tracing::warn!(?values, "can't set multiple network policies at once");
4181                None
4182            }
4183        };
4184        let maybe_network_policy = policy_name
4185            .as_ref()
4186            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4187        let Some(network_policy) = maybe_network_policy else {
4188            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4189                VarError::InvalidParameterValue {
4190                    name: NETWORK_POLICY.name(),
4191                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4192                    reason: "no network policy with such name exists".to_string(),
4193                },
4194            )));
4195        };
4196        self.validate_alter_network_policy(session, &network_policy.rules)
4197    }
4198
4199    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4200    ///
4201    /// This helps prevent users from modifying network policies in a way that would lock out their
4202    /// current connection.
4203    fn validate_alter_network_policy(
4204        &self,
4205        session: &Session,
4206        policy_rules: &Vec<NetworkPolicyRule>,
4207    ) -> Result<(), AdapterError> {
4208        // If the user is not an internal user attempt to protect them from
4209        // blocking themselves.
4210        if session.user().is_internal() {
4211            return Ok(());
4212        }
4213        if let Some(ip) = session.meta().client_ip() {
4214            validate_ip_with_policy_rules(ip, policy_rules)
4215                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4216        } else {
4217            // Sessions without IPs are only temporarily constructed for default values
4218            // they should not be permitted here.
4219            return Err(AdapterError::NetworkPolicyDenied(
4220                NetworkPolicyError::MissingIp,
4221            ));
4222        }
4223        Ok(())
4224    }
4225
4226    // Returns the name of the portal to execute.
4227    #[instrument]
4228    pub(super) fn sequence_execute(
4229        &self,
4230        session: &mut Session,
4231        plan: plan::ExecutePlan,
4232    ) -> Result<String, AdapterError> {
4233        // Verify the stmt is still valid.
4234        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4235        let ps = session
4236            .get_prepared_statement_unverified(&plan.name)
4237            .expect("known to exist");
4238        let stmt = ps.stmt().cloned();
4239        let desc = ps.desc().clone();
4240        let state_revision = ps.state_revision;
4241        let logging = Arc::clone(ps.logging());
4242        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4243    }
4244
4245    #[instrument]
4246    pub(super) async fn sequence_grant_privileges(
4247        &mut self,
4248        session: &Session,
4249        plan::GrantPrivilegesPlan {
4250            update_privileges,
4251            grantees,
4252        }: plan::GrantPrivilegesPlan,
4253    ) -> Result<ExecuteResponse, AdapterError> {
4254        self.sequence_update_privileges(
4255            session,
4256            update_privileges,
4257            grantees,
4258            UpdatePrivilegeVariant::Grant,
4259        )
4260        .await
4261    }
4262
4263    #[instrument]
4264    pub(super) async fn sequence_revoke_privileges(
4265        &mut self,
4266        session: &Session,
4267        plan::RevokePrivilegesPlan {
4268            update_privileges,
4269            revokees,
4270        }: plan::RevokePrivilegesPlan,
4271    ) -> Result<ExecuteResponse, AdapterError> {
4272        self.sequence_update_privileges(
4273            session,
4274            update_privileges,
4275            revokees,
4276            UpdatePrivilegeVariant::Revoke,
4277        )
4278        .await
4279    }
4280
4281    #[instrument]
4282    async fn sequence_update_privileges(
4283        &mut self,
4284        session: &Session,
4285        update_privileges: Vec<UpdatePrivilege>,
4286        grantees: Vec<RoleId>,
4287        variant: UpdatePrivilegeVariant,
4288    ) -> Result<ExecuteResponse, AdapterError> {
4289        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4290        let mut warnings = Vec::new();
4291        let catalog = self.catalog().for_session(session);
4292
4293        for UpdatePrivilege {
4294            acl_mode,
4295            target_id,
4296            grantor,
4297        } in update_privileges
4298        {
4299            let actual_object_type = catalog.get_system_object_type(&target_id);
4300            // For all relations we allow all applicable table privileges, but send a warning if the
4301            // privilege isn't actually applicable to the object type.
4302            if actual_object_type.is_relation() {
4303                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4304                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4305                if !non_applicable_privileges.is_empty() {
4306                    let object_description =
4307                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4308                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4309                        non_applicable_privileges,
4310                        object_description,
4311                    })
4312                }
4313            }
4314
4315            if let SystemObjectId::Object(object_id) = &target_id {
4316                self.catalog()
4317                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4318            }
4319
4320            let privileges = self
4321                .catalog()
4322                .get_privileges(&target_id, session.conn_id())
4323                // Should be unreachable since the parser will refuse to parse grant/revoke
4324                // statements on objects without privileges.
4325                .ok_or(AdapterError::Unsupported(
4326                    "GRANTs/REVOKEs on an object type with no privileges",
4327                ))?;
4328
4329            for grantee in &grantees {
4330                self.catalog().ensure_not_system_role(grantee)?;
4331                self.catalog().ensure_not_predefined_role(grantee)?;
4332                let existing_privilege = privileges
4333                    .get_acl_item(grantee, &grantor)
4334                    .map(Cow::Borrowed)
4335                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4336
4337                match variant {
4338                    UpdatePrivilegeVariant::Grant
4339                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4340                    {
4341                        ops.push(catalog::Op::UpdatePrivilege {
4342                            target_id: target_id.clone(),
4343                            privilege: MzAclItem {
4344                                grantee: *grantee,
4345                                grantor,
4346                                acl_mode,
4347                            },
4348                            variant,
4349                        });
4350                    }
4351                    UpdatePrivilegeVariant::Revoke
4352                        if !existing_privilege
4353                            .acl_mode
4354                            .intersection(acl_mode)
4355                            .is_empty() =>
4356                    {
4357                        ops.push(catalog::Op::UpdatePrivilege {
4358                            target_id: target_id.clone(),
4359                            privilege: MzAclItem {
4360                                grantee: *grantee,
4361                                grantor,
4362                                acl_mode,
4363                            },
4364                            variant,
4365                        });
4366                    }
4367                    // no-op
4368                    _ => {}
4369                }
4370            }
4371        }
4372
4373        if ops.is_empty() {
4374            session.add_notices(warnings);
4375            return Ok(variant.into());
4376        }
4377
4378        let res = self
4379            .catalog_transact(Some(session), ops)
4380            .await
4381            .map(|_| match variant {
4382                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4383                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4384            });
4385        if res.is_ok() {
4386            session.add_notices(warnings);
4387        }
4388        res
4389    }
4390
4391    #[instrument]
4392    pub(super) async fn sequence_alter_default_privileges(
4393        &mut self,
4394        session: &Session,
4395        plan::AlterDefaultPrivilegesPlan {
4396            privilege_objects,
4397            privilege_acl_items,
4398            is_grant,
4399        }: plan::AlterDefaultPrivilegesPlan,
4400    ) -> Result<ExecuteResponse, AdapterError> {
4401        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4402        let variant = if is_grant {
4403            UpdatePrivilegeVariant::Grant
4404        } else {
4405            UpdatePrivilegeVariant::Revoke
4406        };
4407        for privilege_object in &privilege_objects {
4408            self.catalog()
4409                .ensure_not_system_role(&privilege_object.role_id)?;
4410            self.catalog()
4411                .ensure_not_predefined_role(&privilege_object.role_id)?;
4412            if let Some(database_id) = privilege_object.database_id {
4413                self.catalog()
4414                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4415            }
4416            if let Some(schema_id) = privilege_object.schema_id {
4417                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4418                let schema_spec: SchemaSpecifier = schema_id.into();
4419
4420                self.catalog().ensure_not_reserved_object(
4421                    &(database_spec, schema_spec).into(),
4422                    session.conn_id(),
4423                )?;
4424            }
4425            for privilege_acl_item in &privilege_acl_items {
4426                self.catalog()
4427                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4428                self.catalog()
4429                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4430                ops.push(catalog::Op::UpdateDefaultPrivilege {
4431                    privilege_object: privilege_object.clone(),
4432                    privilege_acl_item: privilege_acl_item.clone(),
4433                    variant,
4434                })
4435            }
4436        }
4437
4438        self.catalog_transact(Some(session), ops).await?;
4439        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4440    }
4441
4442    #[instrument]
4443    pub(super) async fn sequence_grant_role(
4444        &mut self,
4445        session: &Session,
4446        plan::GrantRolePlan {
4447            role_ids,
4448            member_ids,
4449            grantor_id,
4450        }: plan::GrantRolePlan,
4451    ) -> Result<ExecuteResponse, AdapterError> {
4452        let catalog = self.catalog();
4453        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4454        for role_id in role_ids {
4455            for member_id in &member_ids {
4456                let member_membership: BTreeSet<_> =
4457                    catalog.get_role(member_id).membership().keys().collect();
4458                if member_membership.contains(&role_id) {
4459                    let role_name = catalog.get_role(&role_id).name().to_string();
4460                    let member_name = catalog.get_role(member_id).name().to_string();
4461                    // We need this check so we don't accidentally return a success on a reserved role.
4462                    catalog.ensure_not_reserved_role(member_id)?;
4463                    catalog.ensure_grantable_role(&role_id)?;
4464                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4465                        role_name,
4466                        member_name,
4467                    });
4468                } else {
4469                    ops.push(catalog::Op::GrantRole {
4470                        role_id,
4471                        member_id: *member_id,
4472                        grantor_id,
4473                    });
4474                }
4475            }
4476        }
4477
4478        if ops.is_empty() {
4479            return Ok(ExecuteResponse::GrantedRole);
4480        }
4481
4482        self.catalog_transact(Some(session), ops)
4483            .await
4484            .map(|_| ExecuteResponse::GrantedRole)
4485    }
4486
4487    #[instrument]
4488    pub(super) async fn sequence_revoke_role(
4489        &mut self,
4490        session: &Session,
4491        plan::RevokeRolePlan {
4492            role_ids,
4493            member_ids,
4494            grantor_id,
4495        }: plan::RevokeRolePlan,
4496    ) -> Result<ExecuteResponse, AdapterError> {
4497        let catalog = self.catalog();
4498        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4499        for role_id in role_ids {
4500            for member_id in &member_ids {
4501                let member_membership: BTreeSet<_> =
4502                    catalog.get_role(member_id).membership().keys().collect();
4503                if !member_membership.contains(&role_id) {
4504                    let role_name = catalog.get_role(&role_id).name().to_string();
4505                    let member_name = catalog.get_role(member_id).name().to_string();
4506                    // We need this check so we don't accidentally return a success on a reserved role.
4507                    catalog.ensure_not_reserved_role(member_id)?;
4508                    catalog.ensure_grantable_role(&role_id)?;
4509                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4510                        role_name,
4511                        member_name,
4512                    });
4513                } else {
4514                    ops.push(catalog::Op::RevokeRole {
4515                        role_id,
4516                        member_id: *member_id,
4517                        grantor_id,
4518                    });
4519                }
4520            }
4521        }
4522
4523        if ops.is_empty() {
4524            return Ok(ExecuteResponse::RevokedRole);
4525        }
4526
4527        self.catalog_transact(Some(session), ops)
4528            .await
4529            .map(|_| ExecuteResponse::RevokedRole)
4530    }
4531
4532    #[instrument]
4533    pub(super) async fn sequence_alter_owner(
4534        &mut self,
4535        session: &Session,
4536        plan::AlterOwnerPlan {
4537            id,
4538            object_type,
4539            new_owner,
4540        }: plan::AlterOwnerPlan,
4541    ) -> Result<ExecuteResponse, AdapterError> {
4542        let mut ops = vec![catalog::Op::UpdateOwner {
4543            id: id.clone(),
4544            new_owner,
4545        }];
4546
4547        match &id {
4548            ObjectId::Item(global_id) => {
4549                let entry = self.catalog().get_entry(global_id);
4550
4551                // Cannot directly change the owner of an index.
4552                if entry.is_index() {
4553                    let name = self
4554                        .catalog()
4555                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4556                        .to_string();
4557                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4558                    return Ok(ExecuteResponse::AlteredObject(object_type));
4559                }
4560
4561                // Alter owner cascades down to dependent indexes.
4562                let dependent_index_ops = entry
4563                    .used_by()
4564                    .into_iter()
4565                    .filter(|id| self.catalog().get_entry(id).is_index())
4566                    .map(|id| catalog::Op::UpdateOwner {
4567                        id: ObjectId::Item(*id),
4568                        new_owner,
4569                    });
4570                ops.extend(dependent_index_ops);
4571
4572                // Alter owner cascades down to progress collections.
4573                let dependent_subsources =
4574                    entry
4575                        .progress_id()
4576                        .into_iter()
4577                        .map(|item_id| catalog::Op::UpdateOwner {
4578                            id: ObjectId::Item(item_id),
4579                            new_owner,
4580                        });
4581                ops.extend(dependent_subsources);
4582            }
4583            ObjectId::Cluster(cluster_id) => {
4584                let cluster = self.catalog().get_cluster(*cluster_id);
4585                // Alter owner cascades down to cluster replicas.
4586                let managed_cluster_replica_ops =
4587                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4588                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4589                        new_owner,
4590                    });
4591                ops.extend(managed_cluster_replica_ops);
4592            }
4593            _ => {}
4594        }
4595
4596        self.catalog_transact(Some(session), ops)
4597            .await
4598            .map(|_| ExecuteResponse::AlteredObject(object_type))
4599    }
4600
4601    #[instrument]
4602    pub(super) async fn sequence_reassign_owned(
4603        &mut self,
4604        session: &Session,
4605        plan::ReassignOwnedPlan {
4606            old_roles,
4607            new_role,
4608            reassign_ids,
4609        }: plan::ReassignOwnedPlan,
4610    ) -> Result<ExecuteResponse, AdapterError> {
4611        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4612            self.catalog().ensure_not_reserved_role(role_id)?;
4613        }
4614
4615        let ops = reassign_ids
4616            .into_iter()
4617            .map(|id| catalog::Op::UpdateOwner {
4618                id,
4619                new_owner: new_role,
4620            })
4621            .collect();
4622
4623        self.catalog_transact(Some(session), ops)
4624            .await
4625            .map(|_| ExecuteResponse::ReassignOwned)
4626    }
4627
4628    #[instrument]
4629    pub(crate) async fn handle_deferred_statement(&mut self) {
4630        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4631        // was processed, removing the single element from deferred_statements, so it is expected
4632        // that this is sometimes empty.
4633        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4634            return;
4635        };
4636        match ps {
4637            crate::coord::PlanStatement::Statement { stmt, params } => {
4638                self.handle_execute_inner(stmt, params, ctx).await;
4639            }
4640            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4641                self.sequence_plan(ctx, plan, resolved_ids).await;
4642            }
4643        }
4644    }
4645
4646    #[instrument]
4647    // TODO(parkmycar): Remove this once we have an actual implementation.
4648    #[allow(clippy::unused_async)]
4649    pub(super) async fn sequence_alter_table(
4650        &mut self,
4651        ctx: &mut ExecuteContext,
4652        plan: plan::AlterTablePlan,
4653    ) -> Result<ExecuteResponse, AdapterError> {
4654        let plan::AlterTablePlan {
4655            relation_id,
4656            column_name,
4657            column_type,
4658            raw_sql_type,
4659        } = plan;
4660
4661        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4662        let id_ts = self.get_catalog_write_ts().await;
4663        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4664        let ops = vec![catalog::Op::AlterAddColumn {
4665            id: relation_id,
4666            new_global_id,
4667            name: column_name,
4668            typ: column_type,
4669            sql: raw_sql_type,
4670        }];
4671
4672        self.catalog_transact_with_context(None, Some(ctx), ops)
4673            .await?;
4674
4675        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4676    }
4677
4678    /// Prepares to apply a replacement materialized view.
4679    #[instrument]
4680    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4681        &mut self,
4682        ctx: ExecuteContext,
4683        plan: AlterMaterializedViewApplyReplacementPlan,
4684    ) {
4685        // To ensure there is no time gap in the output, we can only apply a replacement if the
4686        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4687        // might not be the case initially, so we have to wait. To this end, we install a watch set
4688        // waiting for the target MV's write frontier to advance sufficiently.
4689        //
4690        // Note that the replacement's dataflow is not performing any writes, so it can only be
4691        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4692        // replacement's write frontier is always <= the target's.
4693
4694        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4695
4696        let plan_validity = PlanValidity::new(
4697            self.catalog().transient_revision(),
4698            BTreeSet::from_iter([id, replacement_id]),
4699            None,
4700            None,
4701            ctx.session().role_metadata().clone(),
4702        );
4703
4704        let target = self.catalog.get_entry(&id);
4705        let target_gid = target.latest_global_id();
4706
4707        let replacement = self.catalog.get_entry(&replacement_id);
4708        let replacement_gid = replacement.latest_global_id();
4709
4710        let target_upper = self
4711            .controller
4712            .storage_collections
4713            .collection_frontiers(target_gid)
4714            .expect("target MV exists")
4715            .write_frontier;
4716        let replacement_upper = self
4717            .controller
4718            .compute
4719            .collection_frontiers(replacement_gid, replacement.cluster_id())
4720            .expect("replacement MV exists")
4721            .write_frontier;
4722
4723        info!(
4724            %id, %replacement_id, ?target_upper, ?replacement_upper,
4725            "preparing materialized view replacement application",
4726        );
4727
4728        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4729            // A replacement's write frontier can only become empty if the target's write frontier
4730            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4731            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4732            // the user by returning an error, rather than applying the useless replacement.
4733            //
4734            // Note that we can't assert on `target_upper` being empty here, because the reporting
4735            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4736            // from persist, which we cannot do without incurring I/O.
4737            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4738                name: target.name().item.clone(),
4739            }));
4740            return;
4741        };
4742
4743        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4744        // specified timestamp. Since we only need to wait until the target frontier is >= the
4745        // replacement's frontier, we can step back the timestamp.
4746        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4747
4748        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4749        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4750        // command.
4751        self.install_storage_watch_set(
4752            ctx.session().conn_id().clone(),
4753            BTreeSet::from_iter([target_gid]),
4754            replacement_upper_ts,
4755            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4756                ctx: Some(ctx),
4757                otel_ctx: OpenTelemetryContext::obtain(),
4758                plan,
4759                plan_validity,
4760            }),
4761        )
4762        .expect("target collection exists");
4763    }
4764
4765    /// Finishes applying a replacement materialized view after the frontier wait completed.
4766    #[instrument]
4767    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4768        &mut self,
4769        mut ctx: AlterMaterializedViewReadyContext,
4770    ) {
4771        ctx.otel_ctx.attach_as_parent();
4772
4773        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4774
4775        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4776        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4777        // world has arbitrarily changed since we performed planning, and we must re-assert
4778        // that it still matches our requirements.
4779        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4780            ctx.retire(Err(err));
4781            return;
4782        }
4783
4784        info!(
4785            %id, %replacement_id,
4786            "finishing materialized view replacement application",
4787        );
4788
4789        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4790        match self
4791            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4792            .await
4793        {
4794            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4795                ObjectType::MaterializedView,
4796            ))),
4797            Err(err) => ctx.retire(Err(err)),
4798        }
4799    }
4800
4801    pub(super) async fn statistics_oracle(
4802        &self,
4803        session: &Session,
4804        source_ids: &BTreeSet<GlobalId>,
4805        query_as_of: &Antichain<Timestamp>,
4806        is_oneshot: bool,
4807    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4808        super::statistics_oracle(
4809            session,
4810            source_ids,
4811            query_as_of,
4812            is_oneshot,
4813            self.catalog().system_config(),
4814            self.controller.storage_collections.as_ref(),
4815        )
4816        .await
4817    }
4818}
4819
4820impl Coordinator {
4821    /// Process the metainfo from a newly created non-transient dataflow.
4822    async fn process_dataflow_metainfo(
4823        &mut self,
4824        df_meta: DataflowMetainfo,
4825        export_id: GlobalId,
4826        ctx: Option<&mut ExecuteContext>,
4827        notice_ids: Vec<GlobalId>,
4828    ) -> Option<BuiltinTableAppendNotify> {
4829        // Emit raw notices to the user.
4830        if let Some(ctx) = ctx {
4831            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4832        }
4833
4834        // Create a metainfo with rendered notices.
4835        let df_meta = self
4836            .catalog()
4837            .render_notices(df_meta, notice_ids, Some(export_id));
4838
4839        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4840        // in-memory state.
4841        if self.catalog().state().system_config().enable_mz_notices()
4842            && !df_meta.optimizer_notices.is_empty()
4843        {
4844            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4845            self.catalog().state().pack_optimizer_notices(
4846                &mut builtin_table_updates,
4847                df_meta.optimizer_notices.iter(),
4848                Diff::ONE,
4849            );
4850
4851            // Save the metainfo.
4852            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4853
4854            Some(
4855                self.builtin_table_update()
4856                    .execute(builtin_table_updates)
4857                    .await
4858                    .0,
4859            )
4860        } else {
4861            // Save the metainfo.
4862            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4863
4864            None
4865        }
4866    }
4867}