Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument, soft_assert_or_log};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43    RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48    SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58    SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62    StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106    WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114    WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119/// A future that resolves to a real-time recency timestamp.
120type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = return_if_err!(res, ctx);
255                    f(ctx, next);
256                }
257                _ = rx, if cancel_enabled => {
258                    ctx.retire(Err(AdapterError::Canceled));
259                }
260            }
261        });
262    }
263
264    async fn create_source_inner(
265        &self,
266        session: &Session,
267        plans: Vec<plan::CreateSourcePlanBundle>,
268    ) -> Result<CreateSourceInner, AdapterError> {
269        let mut ops = vec![];
270        let mut sources = vec![];
271
272        let if_not_exists_ids = plans
273            .iter()
274            .filter_map(
275                |plan::CreateSourcePlanBundle {
276                     item_id,
277                     global_id: _,
278                     plan,
279                     resolved_ids: _,
280                     available_source_references: _,
281                 }| {
282                    if plan.if_not_exists {
283                        Some((*item_id, plan.name.clone()))
284                    } else {
285                        None
286                    }
287                },
288            )
289            .collect::<BTreeMap<_, _>>();
290
291        for plan::CreateSourcePlanBundle {
292            item_id,
293            global_id,
294            mut plan,
295            resolved_ids,
296            available_source_references,
297        } in plans
298        {
299            let name = plan.name.clone();
300
301            match plan.source.data_source {
302                plan::DataSourceDesc::Ingestion(ref desc)
303                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304                    let cluster_id = plan
305                        .in_cluster
306                        .expect("ingestion plans must specify cluster");
307                    match desc.connection {
308                        GenericSourceConnection::Postgres(_)
309                        | GenericSourceConnection::MySql(_)
310                        | GenericSourceConnection::SqlServer(_)
311                        | GenericSourceConnection::Kafka(_)
312                        | GenericSourceConnection::LoadGenerator(_) => {
313                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315                                    .get(self.catalog().system_config().dyncfgs());
316
317                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318                                {
319                                    return Err(AdapterError::Unsupported(
320                                        "sources in clusters with >1 replicas",
321                                    ));
322                                }
323                            }
324                        }
325                    }
326                }
327                plan::DataSourceDesc::Webhook { .. } => {
328                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331                            .get(self.catalog().system_config().dyncfgs());
332
333                        if !enable_multi_replica_sources {
334                            if cluster.replica_ids().len() > 1 {
335                                return Err(AdapterError::Unsupported(
336                                    "webhook sources in clusters with >1 replicas",
337                                ));
338                            }
339                        }
340                    }
341                }
342                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343            }
344
345            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
346            if let mz_sql::plan::DataSourceDesc::Webhook {
347                validate_using: Some(validate),
348                ..
349            } = &mut plan.source.data_source
350            {
351                if let Err(reason) = validate.reduce_expression().await {
352                    self.metrics
353                        .webhook_validation_reduce_failures
354                        .with_label_values(&[reason])
355                        .inc();
356                    return Err(AdapterError::Internal(format!(
357                        "failed to reduce check expression, {reason}"
358                    )));
359                }
360            }
361
362            // If this source contained a set of available source references, update the
363            // source references catalog table.
364            let mut reference_ops = vec![];
365            if let Some(references) = &available_source_references {
366                reference_ops.push(catalog::Op::UpdateSourceReferences {
367                    source_id: item_id,
368                    references: references.clone().into(),
369                });
370            }
371
372            let source = Source::new(plan, global_id, resolved_ids, None, false);
373            ops.push(catalog::Op::CreateItem {
374                id: item_id,
375                name,
376                item: CatalogItem::Source(source.clone()),
377                owner_id: *session.current_role_id(),
378            });
379            sources.push((item_id, source));
380            // These operations must be executed after the source is added to the catalog.
381            ops.extend(reference_ops);
382        }
383
384        Ok(CreateSourceInner {
385            ops,
386            sources,
387            if_not_exists_ids,
388        })
389    }
390
391    /// Subsources are planned differently from other statements because they
392    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
393    /// Because of this, we have usually "missed" the opportunity to plan them
394    /// through the normal statement execution life cycle (the exception being
395    /// during bootstrapping).
396    ///
397    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
398    pub(crate) fn plan_subsource(
399        &self,
400        session: &Session,
401        params: &mz_sql::plan::Params,
402        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403        item_id: CatalogItemId,
404        global_id: GlobalId,
405    ) -> Result<CreateSourcePlanBundle, AdapterError> {
406        let catalog = self.catalog().for_session(session);
407        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409        let plan = self.plan_statement(
410            session,
411            Statement::CreateSubsource(subsource_stmt),
412            params,
413            &resolved_ids,
414        )?;
415        let plan = match plan {
416            Plan::CreateSource(plan) => plan,
417            _ => unreachable!(),
418        };
419        Ok(CreateSourcePlanBundle {
420            item_id,
421            global_id,
422            plan,
423            resolved_ids,
424            available_source_references: None,
425        })
426    }
427
428    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
429    pub(crate) async fn plan_purified_alter_source_add_subsource(
430        &mut self,
431        session: &Session,
432        params: Params,
433        source_name: ResolvedItemName,
434        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436    ) -> Result<(Plan, ResolvedIds), AdapterError> {
437        let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439        // Generate subsource statements
440        let conn_catalog = self.catalog().for_system_session();
441        let pcx = plan::PlanContext::zero();
442        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444        let entry = self.catalog().get_entry(source_name.item_id());
445        let source = entry.source().ok_or_else(|| {
446            AdapterError::internal(
447                "plan alter source",
448                format!("expected Source found {entry:?}"),
449            )
450        })?;
451
452        let item_id = entry.id();
453        let ingestion_id = source.global_id();
454        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456        let ids = self
457            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
458            .await?;
459        for (subsource_stmt, (item_id, global_id)) in
460            subsource_stmts.into_iter().zip_eq(ids.into_iter())
461        {
462            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
463            subsource_plans.push(s);
464        }
465
466        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
467            subsources: subsource_plans,
468            options,
469        };
470
471        Ok((
472            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
473                item_id,
474                ingestion_id,
475                action,
476            }),
477            ResolvedIds::empty(),
478        ))
479    }
480
481    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
482    pub(crate) fn plan_purified_alter_source_refresh_references(
483        &self,
484        _session: &Session,
485        _params: Params,
486        source_name: ResolvedItemName,
487        available_source_references: plan::SourceReferences,
488    ) -> Result<(Plan, ResolvedIds), AdapterError> {
489        let entry = self.catalog().get_entry(source_name.item_id());
490        let source = entry.source().ok_or_else(|| {
491            AdapterError::internal(
492                "plan alter source",
493                format!("expected Source found {entry:?}"),
494            )
495        })?;
496        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
497            references: available_source_references,
498        };
499
500        Ok((
501            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
502                item_id: entry.id(),
503                ingestion_id: source.global_id(),
504                action,
505            }),
506            ResolvedIds::empty(),
507        ))
508    }
509
510    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
511    /// the primary source, and any ingestion export subsources (e.g. PG
512    /// tables).
513    pub(crate) async fn plan_purified_create_source(
514        &mut self,
515        ctx: &ExecuteContext,
516        params: Params,
517        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
518        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
519        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
520        available_source_references: plan::SourceReferences,
521    ) -> Result<(Plan, ResolvedIds), AdapterError> {
522        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
523
524        // 1. First plan the progress subsource, if any.
525        if let Some(progress_stmt) = progress_stmt {
526            // The primary source depends on this subsource because the primary
527            // source needs to know its shard ID, and the easiest way of
528            // guaranteeing that the shard ID is discoverable is to create this
529            // collection first.
530            assert_none!(progress_stmt.of_source);
531            let (item_id, global_id) = self.allocate_user_id().await?;
532            let progress_plan =
533                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
534            let progress_full_name = self
535                .catalog()
536                .resolve_full_name(&progress_plan.plan.name, None);
537            let progress_subsource = ResolvedItemName::Item {
538                id: progress_plan.item_id,
539                qualifiers: progress_plan.plan.name.qualifiers.clone(),
540                full_name: progress_full_name,
541                print_id: true,
542                version: RelationVersionSelector::Latest,
543            };
544
545            create_source_plans.push(progress_plan);
546
547            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548        }
549
550        let catalog = self.catalog().for_session(ctx.session());
551        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553        let propagated_with_options: Vec<_> = source_stmt
554            .with_options
555            .iter()
556            .filter_map(|opt| match opt.name {
557                CreateSourceOptionName::TimestampInterval => None,
558                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559                    name: CreateSubsourceOptionName::RetainHistory,
560                    value: opt.value.clone(),
561                }),
562            })
563            .collect();
564
565        // 2. Then plan the main source.
566        let source_plan = match self.plan_statement(
567            ctx.session(),
568            Statement::CreateSource(source_stmt),
569            &params,
570            &resolved_ids,
571        )? {
572            Plan::CreateSource(plan) => plan,
573            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574        };
575
576        let (item_id, global_id) = self.allocate_user_id().await?;
577
578        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579        let of_source = ResolvedItemName::Item {
580            id: item_id,
581            qualifiers: source_plan.name.qualifiers.clone(),
582            full_name: source_full_name,
583            print_id: true,
584            version: RelationVersionSelector::Latest,
585        };
586
587        // Generate subsource statements
588        let conn_catalog = self.catalog().for_system_session();
589        let pcx = plan::PlanContext::zero();
590        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594        for subsource_stmt in subsource_stmts.iter_mut() {
595            subsource_stmt
596                .with_options
597                .extend(propagated_with_options.iter().cloned())
598        }
599
600        create_source_plans.push(CreateSourcePlanBundle {
601            item_id,
602            global_id,
603            plan: source_plan,
604            resolved_ids: resolved_ids.clone(),
605            available_source_references: Some(available_source_references),
606        });
607
608        // 3. Finally, plan all the subsources
609        let ids = self
610            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
611            .await?;
612        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
613            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
614            create_source_plans.push(plan);
615        }
616
617        Ok((
618            Plan::CreateSources(create_source_plans),
619            ResolvedIds::empty(),
620        ))
621    }
622
623    #[instrument]
624    pub(super) async fn sequence_create_source(
625        &mut self,
626        ctx: &mut ExecuteContext,
627        plans: Vec<plan::CreateSourcePlanBundle>,
628    ) -> Result<ExecuteResponse, AdapterError> {
629        let CreateSourceInner {
630            ops,
631            sources,
632            if_not_exists_ids,
633        } = self.create_source_inner(ctx.session(), plans).await?;
634
635        let transact_result = self
636            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
637            .await;
638
639        // Check if any sources are webhook sources and report them as created.
640        for (item_id, source) in &sources {
641            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
642                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
643                    ctx.session()
644                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
645                }
646            }
647        }
648
649        match transact_result {
650            Ok(()) => Ok(ExecuteResponse::CreatedSource),
651            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
652                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
653            })) if if_not_exists_ids.contains_key(&id) => {
654                ctx.session()
655                    .add_notice(AdapterNotice::ObjectAlreadyExists {
656                        name: if_not_exists_ids[&id].item.clone(),
657                        ty: "source",
658                    });
659                Ok(ExecuteResponse::CreatedSource)
660            }
661            Err(err) => Err(err),
662        }
663    }
664
665    #[instrument]
666    pub(super) async fn sequence_create_connection(
667        &mut self,
668        mut ctx: ExecuteContext,
669        plan: plan::CreateConnectionPlan,
670        resolved_ids: ResolvedIds,
671    ) {
672        let (connection_id, connection_gid) = match self.allocate_user_id().await {
673            Ok(item_id) => item_id,
674            Err(err) => return ctx.retire(Err(err)),
675        };
676
677        match &plan.connection.details {
678            ConnectionDetails::Ssh { key_1, key_2, .. } => {
679                let key_1 = match key_1.as_key_pair() {
680                    Some(key_1) => key_1.clone(),
681                    None => {
682                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
683                            "the PUBLIC KEY 1 option cannot be explicitly specified"
684                        ))));
685                    }
686                };
687
688                let key_2 = match key_2.as_key_pair() {
689                    Some(key_2) => key_2.clone(),
690                    None => {
691                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
692                            "the PUBLIC KEY 2 option cannot be explicitly specified"
693                        ))));
694                    }
695                };
696
697                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
698                let secret = key_set.to_bytes();
699                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
700                    return ctx.retire(Err(err.into()));
701                }
702                self.caching_secrets_reader.invalidate(connection_id);
703            }
704            _ => (),
705        };
706
707        if plan.validate {
708            let internal_cmd_tx = self.internal_cmd_tx.clone();
709            let transient_revision = self.catalog().transient_revision();
710            let conn_id = ctx.session().conn_id().clone();
711            let otel_ctx = OpenTelemetryContext::obtain();
712            let role_metadata = ctx.session().role_metadata().clone();
713
714            let connection = plan
715                .connection
716                .details
717                .to_connection()
718                .into_inline_connection(self.catalog().state());
719
720            let current_storage_parameters = self.controller.storage.config().clone();
721            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
722                let result = match connection
723                    .validate(connection_id, &current_storage_parameters)
724                    .await
725                {
726                    Ok(()) => Ok(plan),
727                    Err(err) => Err(err.into()),
728                };
729
730                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
731                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
732                    CreateConnectionValidationReady {
733                        ctx,
734                        result,
735                        connection_id,
736                        connection_gid,
737                        plan_validity: PlanValidity::new(
738                            transient_revision,
739                            resolved_ids.items().copied().collect(),
740                            None,
741                            None,
742                            role_metadata,
743                        ),
744                        otel_ctx,
745                        resolved_ids: resolved_ids.clone(),
746                    },
747                ));
748                if let Err(e) = result {
749                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
750                }
751            });
752        } else {
753            let result = self
754                .sequence_create_connection_stage_finish(
755                    &mut ctx,
756                    connection_id,
757                    connection_gid,
758                    plan,
759                    resolved_ids,
760                )
761                .await;
762            ctx.retire(result);
763        }
764    }
765
766    #[instrument]
767    pub(crate) async fn sequence_create_connection_stage_finish(
768        &mut self,
769        ctx: &mut ExecuteContext,
770        connection_id: CatalogItemId,
771        connection_gid: GlobalId,
772        plan: plan::CreateConnectionPlan,
773        resolved_ids: ResolvedIds,
774    ) -> Result<ExecuteResponse, AdapterError> {
775        let ops = vec![catalog::Op::CreateItem {
776            id: connection_id,
777            name: plan.name.clone(),
778            item: CatalogItem::Connection(Connection {
779                create_sql: plan.connection.create_sql,
780                global_id: connection_gid,
781                details: plan.connection.details.clone(),
782                resolved_ids,
783            }),
784            owner_id: *ctx.session().current_role_id(),
785        }];
786
787        // VPC endpoint creation for AWS PrivateLink connections is now handled
788        // in apply_catalog_implications.
789        let conn_id = ctx.session().conn_id().clone();
790        let transact_result = self
791            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
792            .await;
793
794        match transact_result {
795            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
796            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
797                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
798            })) if plan.if_not_exists => {
799                ctx.session()
800                    .add_notice(AdapterNotice::ObjectAlreadyExists {
801                        name: plan.name.item,
802                        ty: "connection",
803                    });
804                Ok(ExecuteResponse::CreatedConnection)
805            }
806            Err(err) => Err(err),
807        }
808    }
809
810    #[instrument]
811    pub(super) async fn sequence_create_database(
812        &mut self,
813        session: &Session,
814        plan: plan::CreateDatabasePlan,
815    ) -> Result<ExecuteResponse, AdapterError> {
816        let ops = vec![catalog::Op::CreateDatabase {
817            name: plan.name.clone(),
818            owner_id: *session.current_role_id(),
819        }];
820        match self.catalog_transact(Some(session), ops).await {
821            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
822            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
823                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
824            })) if plan.if_not_exists => {
825                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
826                Ok(ExecuteResponse::CreatedDatabase)
827            }
828            Err(err) => Err(err),
829        }
830    }
831
832    #[instrument]
833    pub(super) async fn sequence_create_schema(
834        &mut self,
835        session: &Session,
836        plan: plan::CreateSchemaPlan,
837    ) -> Result<ExecuteResponse, AdapterError> {
838        let op = catalog::Op::CreateSchema {
839            database_id: plan.database_spec,
840            schema_name: plan.schema_name.clone(),
841            owner_id: *session.current_role_id(),
842        };
843        match self.catalog_transact(Some(session), vec![op]).await {
844            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
845            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
846                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
847            })) if plan.if_not_exists => {
848                session.add_notice(AdapterNotice::SchemaAlreadyExists {
849                    name: plan.schema_name,
850                });
851                Ok(ExecuteResponse::CreatedSchema)
852            }
853            Err(err) => Err(err),
854        }
855    }
856
857    /// Validates the role attributes for a `CREATE ROLE` statement.
858    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
859        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
860            if attributes.superuser.is_some() || attributes.password.is_some() {
861                return Err(AdapterError::UnavailableFeature {
862                    feature: "SUPERUSER and PASSWORD attributes".to_string(),
863                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
864                });
865            }
866        }
867        Ok(())
868    }
869
870    #[instrument]
871    pub(super) async fn sequence_create_role(
872        &mut self,
873        conn_id: Option<&ConnectionId>,
874        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
875    ) -> Result<ExecuteResponse, AdapterError> {
876        self.validate_role_attributes(&attributes.clone())?;
877        let op = catalog::Op::CreateRole { name, attributes };
878        self.catalog_transact_with_context(conn_id, None, vec![op])
879            .await
880            .map(|_| ExecuteResponse::CreatedRole)
881    }
882
883    #[instrument]
884    pub(super) async fn sequence_create_network_policy(
885        &mut self,
886        session: &Session,
887        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
888    ) -> Result<ExecuteResponse, AdapterError> {
889        let op = catalog::Op::CreateNetworkPolicy {
890            rules,
891            name,
892            owner_id: *session.current_role_id(),
893        };
894        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
895            .await
896            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
897    }
898
899    #[instrument]
900    pub(super) async fn sequence_alter_network_policy(
901        &mut self,
902        session: &Session,
903        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
904    ) -> Result<ExecuteResponse, AdapterError> {
905        // TODO(network_policy): Consider role based network policies here.
906        let current_network_policy_name =
907            self.catalog().system_config().default_network_policy_name();
908        // Check if the way we're alerting the policy is still valid for the current connection.
909        if current_network_policy_name == name {
910            self.validate_alter_network_policy(session, &rules)?;
911        }
912
913        let op = catalog::Op::AlterNetworkPolicy {
914            id,
915            rules,
916            name,
917            owner_id: *session.current_role_id(),
918        };
919        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
920            .await
921            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
922    }
923
924    #[instrument]
925    pub(super) async fn sequence_create_table(
926        &mut self,
927        ctx: &mut ExecuteContext,
928        plan: plan::CreateTablePlan,
929        resolved_ids: ResolvedIds,
930    ) -> Result<ExecuteResponse, AdapterError> {
931        let plan::CreateTablePlan {
932            name,
933            table,
934            if_not_exists,
935        } = plan;
936
937        let conn_id = if table.temporary {
938            Some(ctx.session().conn_id())
939        } else {
940            None
941        };
942        let (table_id, global_id) = self.allocate_user_id().await?;
943        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
944
945        let data_source = match table.data_source {
946            plan::TableDataSource::TableWrites { defaults } => {
947                TableDataSource::TableWrites { defaults }
948            }
949            plan::TableDataSource::DataSource {
950                desc: data_source_plan,
951                timeline,
952            } => match data_source_plan {
953                plan::DataSourceDesc::IngestionExport {
954                    ingestion_id,
955                    external_reference,
956                    details,
957                    data_config,
958                } => TableDataSource::DataSource {
959                    desc: DataSourceDesc::IngestionExport {
960                        ingestion_id,
961                        external_reference,
962                        details,
963                        data_config,
964                    },
965                    timeline,
966                },
967                plan::DataSourceDesc::Webhook {
968                    validate_using,
969                    body_format,
970                    headers,
971                    cluster_id,
972                } => TableDataSource::DataSource {
973                    desc: DataSourceDesc::Webhook {
974                        validate_using,
975                        body_format,
976                        headers,
977                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
978                    },
979                    timeline,
980                },
981                o => {
982                    unreachable!("CREATE TABLE data source got {:?}", o)
983                }
984            },
985        };
986
987        let is_webhook = if let TableDataSource::DataSource {
988            desc: DataSourceDesc::Webhook { .. },
989            timeline: _,
990        } = &data_source
991        {
992            true
993        } else {
994            false
995        };
996
997        let table = Table {
998            create_sql: Some(table.create_sql),
999            desc: table.desc,
1000            collections,
1001            conn_id: conn_id.cloned(),
1002            resolved_ids,
1003            custom_logical_compaction_window: table.compaction_window,
1004            is_retained_metrics_object: false,
1005            data_source,
1006        };
1007        let ops = vec![catalog::Op::CreateItem {
1008            id: table_id,
1009            name: name.clone(),
1010            item: CatalogItem::Table(table.clone()),
1011            owner_id: *ctx.session().current_role_id(),
1012        }];
1013
1014        let catalog_result = self
1015            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1016            .await;
1017
1018        if is_webhook {
1019            // try_get_webhook_url will make up a URL for things that are not
1020            // webhooks, so we guard against that here.
1021            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1022                ctx.session()
1023                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1024            }
1025        }
1026
1027        match catalog_result {
1028            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1029            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1030                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1031            })) if if_not_exists => {
1032                ctx.session_mut()
1033                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1034                        name: name.item,
1035                        ty: "table",
1036                    });
1037                Ok(ExecuteResponse::CreatedTable)
1038            }
1039            Err(err) => Err(err),
1040        }
1041    }
1042
1043    #[instrument]
1044    pub(super) async fn sequence_create_sink(
1045        &mut self,
1046        ctx: ExecuteContext,
1047        plan: plan::CreateSinkPlan,
1048        resolved_ids: ResolvedIds,
1049    ) {
1050        let plan::CreateSinkPlan {
1051            name,
1052            sink,
1053            with_snapshot,
1054            if_not_exists,
1055            in_cluster,
1056        } = plan;
1057
1058        // First try to allocate an ID and an OID. If either fails, we're done.
1059        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1060
1061        let catalog_sink = Sink {
1062            create_sql: sink.create_sql,
1063            global_id,
1064            from: sink.from,
1065            connection: sink.connection,
1066            envelope: sink.envelope,
1067            version: sink.version,
1068            with_snapshot,
1069            resolved_ids,
1070            cluster_id: in_cluster,
1071            commit_interval: sink.commit_interval,
1072        };
1073
1074        let ops = vec![catalog::Op::CreateItem {
1075            id: item_id,
1076            name: name.clone(),
1077            item: CatalogItem::Sink(catalog_sink.clone()),
1078            owner_id: *ctx.session().current_role_id(),
1079        }];
1080
1081        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1082
1083        match result {
1084            Ok(()) => {}
1085            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1086                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1087            })) if if_not_exists => {
1088                ctx.session()
1089                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1090                        name: name.item,
1091                        ty: "sink",
1092                    });
1093                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1094                return;
1095            }
1096            Err(e) => {
1097                ctx.retire(Err(e));
1098                return;
1099            }
1100        };
1101
1102        self.create_storage_export(global_id, &catalog_sink)
1103            .await
1104            .unwrap_or_terminate("cannot fail to create exports");
1105
1106        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1107            .await;
1108
1109        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1110    }
1111
1112    /// Validates that a view definition does not contain any expressions that may lead to
1113    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1114    ///
1115    /// We prevent these expressions so that we can add columns to system tables without
1116    /// changing the definition of the view.
1117    ///
1118    /// Here is a bit of a hand wavy proof as to why we only need to check the
1119    /// immediate view definition for system objects and ambiguous column
1120    /// references, and not the entire dependency tree:
1121    ///
1122    ///   - A view with no object references cannot have any ambiguous column
1123    ///   references to a system object, because it has no system objects.
1124    ///   - A view with a direct reference to a system object and a * or
1125    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1126    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1127    ///   any ambiguous column references to a system object, because all column
1128    ///   references are explicitly named.
1129    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1130    ///   system object cannot have any ambiguous column references to a system
1131    ///   object, because there are no system objects in the top level view and
1132    ///   all sub-views are guaranteed to have no ambiguous column references to
1133    ///   system objects.
1134    pub(super) fn validate_system_column_references(
1135        &self,
1136        uses_ambiguous_columns: bool,
1137        depends_on: &BTreeSet<GlobalId>,
1138    ) -> Result<(), AdapterError> {
1139        if uses_ambiguous_columns
1140            && depends_on
1141                .iter()
1142                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1143        {
1144            Err(AdapterError::AmbiguousSystemColumnReference)
1145        } else {
1146            Ok(())
1147        }
1148    }
1149
1150    #[instrument]
1151    pub(super) async fn sequence_create_type(
1152        &mut self,
1153        session: &Session,
1154        plan: plan::CreateTypePlan,
1155        resolved_ids: ResolvedIds,
1156    ) -> Result<ExecuteResponse, AdapterError> {
1157        let (item_id, global_id) = self.allocate_user_id().await?;
1158        // Validate the type definition (e.g., composite columns) before storing.
1159        plan.typ
1160            .inner
1161            .desc(&self.catalog().for_session(session))
1162            .map_err(AdapterError::from)?;
1163        let typ = Type {
1164            create_sql: Some(plan.typ.create_sql),
1165            global_id,
1166            details: CatalogTypeDetails {
1167                array_id: None,
1168                typ: plan.typ.inner,
1169                pg_metadata: None,
1170            },
1171            resolved_ids,
1172        };
1173        let op = catalog::Op::CreateItem {
1174            id: item_id,
1175            name: plan.name,
1176            item: CatalogItem::Type(typ),
1177            owner_id: *session.current_role_id(),
1178        };
1179        match self.catalog_transact(Some(session), vec![op]).await {
1180            Ok(()) => Ok(ExecuteResponse::CreatedType),
1181            Err(err) => Err(err),
1182        }
1183    }
1184
1185    #[instrument]
1186    pub(super) async fn sequence_comment_on(
1187        &mut self,
1188        session: &Session,
1189        plan: plan::CommentPlan,
1190    ) -> Result<ExecuteResponse, AdapterError> {
1191        let op = catalog::Op::Comment {
1192            object_id: plan.object_id,
1193            sub_component: plan.sub_component,
1194            comment: plan.comment,
1195        };
1196        self.catalog_transact(Some(session), vec![op]).await?;
1197        Ok(ExecuteResponse::Comment)
1198    }
1199
1200    #[instrument]
1201    pub(super) async fn sequence_drop_objects(
1202        &mut self,
1203        ctx: &mut ExecuteContext,
1204        plan::DropObjectsPlan {
1205            drop_ids,
1206            object_type,
1207            referenced_ids,
1208        }: plan::DropObjectsPlan,
1209    ) -> Result<ExecuteResponse, AdapterError> {
1210        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1211        let mut objects = Vec::new();
1212        for obj_id in &drop_ids {
1213            if !referenced_ids_hashset.contains(obj_id) {
1214                let object_info = ErrorMessageObjectDescription::from_id(
1215                    obj_id,
1216                    &self.catalog().for_session(ctx.session()),
1217                )
1218                .to_string();
1219                objects.push(object_info);
1220            }
1221        }
1222
1223        if !objects.is_empty() {
1224            ctx.session()
1225                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1226        }
1227
1228        // Collect GlobalIds for expression cache invalidation.
1229        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1230            .iter()
1231            .filter_map(|id| match id {
1232                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1233                _ => None,
1234            })
1235            .flatten()
1236            .collect();
1237
1238        let DropOps {
1239            ops,
1240            dropped_active_db,
1241            dropped_active_cluster,
1242            dropped_in_use_indexes,
1243        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1244
1245        self.catalog_transact_with_context(None, Some(ctx), ops)
1246            .await?;
1247
1248        // Invalidate expression cache entries for dropped objects.
1249        if !expr_cache_invalidate_ids.is_empty() {
1250            let _fut = self.catalog().update_expression_cache(
1251                Default::default(),
1252                Default::default(),
1253                expr_cache_invalidate_ids,
1254            );
1255        }
1256
1257        fail::fail_point!("after_sequencer_drop_replica");
1258
1259        if dropped_active_db {
1260            ctx.session()
1261                .add_notice(AdapterNotice::DroppedActiveDatabase {
1262                    name: ctx.session().vars().database().to_string(),
1263                });
1264        }
1265        if dropped_active_cluster {
1266            ctx.session()
1267                .add_notice(AdapterNotice::DroppedActiveCluster {
1268                    name: ctx.session().vars().cluster().to_string(),
1269                });
1270        }
1271        for dropped_in_use_index in dropped_in_use_indexes {
1272            ctx.session()
1273                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1274            self.metrics
1275                .optimization_notices
1276                .with_label_values(&["DroppedInUseIndex"])
1277                .inc_by(1);
1278        }
1279        Ok(ExecuteResponse::DroppedObject(object_type))
1280    }
1281
1282    fn validate_dropped_role_ownership(
1283        &self,
1284        session: &Session,
1285        dropped_roles: &BTreeMap<RoleId, &str>,
1286    ) -> Result<(), AdapterError> {
1287        fn privilege_check(
1288            privileges: &PrivilegeMap,
1289            dropped_roles: &BTreeMap<RoleId, &str>,
1290            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1291            object_id: &SystemObjectId,
1292            catalog: &ConnCatalog,
1293        ) {
1294            for privilege in privileges.all_values() {
1295                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1296                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1297                    let object_description =
1298                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1299                    dependent_objects
1300                        .entry(role_name.to_string())
1301                        .or_default()
1302                        .push(format!(
1303                            "privileges on {object_description} granted by {grantor_name}",
1304                        ));
1305                }
1306                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1307                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1308                    let object_description =
1309                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1310                    dependent_objects
1311                        .entry(role_name.to_string())
1312                        .or_default()
1313                        .push(format!(
1314                            "privileges granted on {object_description} to {grantee_name}"
1315                        ));
1316                }
1317            }
1318        }
1319
1320        let catalog = self.catalog().for_session(session);
1321        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1322        for entry in self.catalog.entries() {
1323            let id = SystemObjectId::Object(entry.id().into());
1324            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1325                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1326                dependent_objects
1327                    .entry(role_name.to_string())
1328                    .or_default()
1329                    .push(format!("owner of {object_description}"));
1330            }
1331            privilege_check(
1332                entry.privileges(),
1333                dropped_roles,
1334                &mut dependent_objects,
1335                &id,
1336                &catalog,
1337            );
1338        }
1339        for database in self.catalog.databases() {
1340            let database_id = SystemObjectId::Object(database.id().into());
1341            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1342                let object_description =
1343                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1344                dependent_objects
1345                    .entry(role_name.to_string())
1346                    .or_default()
1347                    .push(format!("owner of {object_description}"));
1348            }
1349            privilege_check(
1350                &database.privileges,
1351                dropped_roles,
1352                &mut dependent_objects,
1353                &database_id,
1354                &catalog,
1355            );
1356            for schema in database.schemas_by_id.values() {
1357                let schema_id = SystemObjectId::Object(
1358                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1359                );
1360                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1361                    let object_description =
1362                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1363                    dependent_objects
1364                        .entry(role_name.to_string())
1365                        .or_default()
1366                        .push(format!("owner of {object_description}"));
1367                }
1368                privilege_check(
1369                    &schema.privileges,
1370                    dropped_roles,
1371                    &mut dependent_objects,
1372                    &schema_id,
1373                    &catalog,
1374                );
1375            }
1376        }
1377        for cluster in self.catalog.clusters() {
1378            let cluster_id = SystemObjectId::Object(cluster.id().into());
1379            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1380                let object_description =
1381                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1382                dependent_objects
1383                    .entry(role_name.to_string())
1384                    .or_default()
1385                    .push(format!("owner of {object_description}"));
1386            }
1387            privilege_check(
1388                &cluster.privileges,
1389                dropped_roles,
1390                &mut dependent_objects,
1391                &cluster_id,
1392                &catalog,
1393            );
1394            for replica in cluster.replicas() {
1395                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1396                    let replica_id =
1397                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1398                    let object_description =
1399                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1400                    dependent_objects
1401                        .entry(role_name.to_string())
1402                        .or_default()
1403                        .push(format!("owner of {object_description}"));
1404                }
1405            }
1406        }
1407        privilege_check(
1408            self.catalog().system_privileges(),
1409            dropped_roles,
1410            &mut dependent_objects,
1411            &SystemObjectId::System,
1412            &catalog,
1413        );
1414        for (default_privilege_object, default_privilege_acl_items) in
1415            self.catalog.default_privileges()
1416        {
1417            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1418                dependent_objects
1419                    .entry(role_name.to_string())
1420                    .or_default()
1421                    .push(format!(
1422                        "default privileges on {}S created by {}",
1423                        default_privilege_object.object_type, role_name
1424                    ));
1425            }
1426            for default_privilege_acl_item in default_privilege_acl_items {
1427                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1428                    dependent_objects
1429                        .entry(role_name.to_string())
1430                        .or_default()
1431                        .push(format!(
1432                            "default privileges on {}S granted to {}",
1433                            default_privilege_object.object_type, role_name
1434                        ));
1435                }
1436            }
1437        }
1438
1439        if !dependent_objects.is_empty() {
1440            Err(AdapterError::DependentObject(dependent_objects))
1441        } else {
1442            Ok(())
1443        }
1444    }
1445
1446    #[instrument]
1447    pub(super) async fn sequence_drop_owned(
1448        &mut self,
1449        session: &Session,
1450        plan: plan::DropOwnedPlan,
1451    ) -> Result<ExecuteResponse, AdapterError> {
1452        for role_id in &plan.role_ids {
1453            self.catalog().ensure_not_reserved_role(role_id)?;
1454        }
1455
1456        let mut privilege_revokes = plan.privilege_revokes;
1457
1458        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1459        let session_catalog = self.catalog().for_session(session);
1460        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1461            && !session.is_superuser()
1462        {
1463            // Obtain all roles that the current session is a member of.
1464            let role_membership =
1465                session_catalog.collect_role_membership(session.current_role_id());
1466            let invalid_revokes: BTreeSet<_> = privilege_revokes
1467                .extract_if(.., |(_, privilege)| {
1468                    !role_membership.contains(&privilege.grantor)
1469                })
1470                .map(|(object_id, _)| object_id)
1471                .collect();
1472            for invalid_revoke in invalid_revokes {
1473                let object_description =
1474                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1475                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1476            }
1477        }
1478
1479        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1480            catalog::Op::UpdatePrivilege {
1481                target_id: object_id,
1482                privilege,
1483                variant: UpdatePrivilegeVariant::Revoke,
1484            }
1485        });
1486        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1487            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1488                privilege_object,
1489                privilege_acl_item,
1490                variant: UpdatePrivilegeVariant::Revoke,
1491            },
1492        );
1493        let DropOps {
1494            ops: drop_ops,
1495            dropped_active_db,
1496            dropped_active_cluster,
1497            dropped_in_use_indexes,
1498        } = self.sequence_drop_common(session, plan.drop_ids)?;
1499
1500        let ops = privilege_revoke_ops
1501            .chain(default_privilege_revoke_ops)
1502            .chain(drop_ops.into_iter())
1503            .collect();
1504
1505        self.catalog_transact(Some(session), ops).await?;
1506
1507        if dropped_active_db {
1508            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1509                name: session.vars().database().to_string(),
1510            });
1511        }
1512        if dropped_active_cluster {
1513            session.add_notice(AdapterNotice::DroppedActiveCluster {
1514                name: session.vars().cluster().to_string(),
1515            });
1516        }
1517        for dropped_in_use_index in dropped_in_use_indexes {
1518            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1519        }
1520        Ok(ExecuteResponse::DroppedOwned)
1521    }
1522
1523    fn sequence_drop_common(
1524        &self,
1525        session: &Session,
1526        ids: Vec<ObjectId>,
1527    ) -> Result<DropOps, AdapterError> {
1528        let mut dropped_active_db = false;
1529        let mut dropped_active_cluster = false;
1530        let mut dropped_in_use_indexes = Vec::new();
1531        let mut dropped_roles = BTreeMap::new();
1532        let mut dropped_databases = BTreeSet::new();
1533        let mut dropped_schemas = BTreeSet::new();
1534        // Dropping either the group role or the member role of a role membership will trigger a
1535        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1536        // role membership twice.
1537        let mut role_revokes = BTreeSet::new();
1538        // Dropping a database or a schema will revoke all default roles associated with that
1539        // database or schema.
1540        let mut default_privilege_revokes = BTreeSet::new();
1541
1542        // Clusters we're dropping
1543        let mut clusters_to_drop = BTreeSet::new();
1544
1545        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1546        for id in &ids {
1547            match id {
1548                ObjectId::Database(id) => {
1549                    let name = self.catalog().get_database(id).name();
1550                    if name == session.vars().database() {
1551                        dropped_active_db = true;
1552                    }
1553                    dropped_databases.insert(id);
1554                }
1555                ObjectId::Schema((_, spec)) => {
1556                    if let SchemaSpecifier::Id(id) = spec {
1557                        dropped_schemas.insert(id);
1558                    }
1559                }
1560                ObjectId::Cluster(id) => {
1561                    clusters_to_drop.insert(*id);
1562                    if let Some(active_id) = self
1563                        .catalog()
1564                        .active_cluster(session)
1565                        .ok()
1566                        .map(|cluster| cluster.id())
1567                    {
1568                        if id == &active_id {
1569                            dropped_active_cluster = true;
1570                        }
1571                    }
1572                }
1573                ObjectId::Role(id) => {
1574                    let role = self.catalog().get_role(id);
1575                    let name = role.name();
1576                    dropped_roles.insert(*id, name);
1577                    // We must revoke all role memberships that the dropped roles belongs to.
1578                    for (group_id, grantor_id) in &role.membership.map {
1579                        role_revokes.insert((*group_id, *id, *grantor_id));
1580                    }
1581                }
1582                ObjectId::Item(id) => {
1583                    if let Some(index) = self.catalog().get_entry(id).index() {
1584                        let humanizer = self.catalog().for_session(session);
1585                        let dependants = self
1586                            .controller
1587                            .compute
1588                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1589                            .ok()
1590                            .into_iter()
1591                            .flatten()
1592                            .filter(|dependant_id| {
1593                                // Transient Ids belong to Peeks. We are not interested for now in
1594                                // peeks depending on a dropped index.
1595                                // TODO: show a different notice in this case. Something like
1596                                // "There is an in-progress ad hoc SELECT that uses the dropped
1597                                // index. The resources used by the index will be freed when all
1598                                // such SELECTs complete."
1599                                if dependant_id.is_transient() {
1600                                    return false;
1601                                }
1602                                // The item should exist, but don't panic if it doesn't.
1603                                let Some(dependent_id) = humanizer
1604                                    .try_get_item_by_global_id(dependant_id)
1605                                    .map(|item| item.id())
1606                                else {
1607                                    return false;
1608                                };
1609                                // If the dependent object is also being dropped, then there is no
1610                                // problem, so we don't want a notice.
1611                                !ids_set.contains(&ObjectId::Item(dependent_id))
1612                            })
1613                            .flat_map(|dependant_id| {
1614                                // If we are not able to find a name for this ID it probably means
1615                                // we have already dropped the compute collection, in which case we
1616                                // can ignore it.
1617                                humanizer.humanize_id(dependant_id)
1618                            })
1619                            .collect_vec();
1620                        if !dependants.is_empty() {
1621                            dropped_in_use_indexes.push(DroppedInUseIndex {
1622                                index_name: humanizer
1623                                    .humanize_id(index.global_id())
1624                                    .unwrap_or_else(|| id.to_string()),
1625                                dependant_objects: dependants,
1626                            });
1627                        }
1628                    }
1629                }
1630                _ => {}
1631            }
1632        }
1633
1634        for id in &ids {
1635            match id {
1636                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1637                // unless they are internal replicas, which exist outside the scope
1638                // of managed clusters.
1639                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1640                    if !clusters_to_drop.contains(cluster_id) {
1641                        let cluster = self.catalog.get_cluster(*cluster_id);
1642                        if cluster.is_managed() {
1643                            let replica =
1644                                cluster.replica(*replica_id).expect("Catalog out of sync");
1645                            if !replica.config.location.internal() {
1646                                coord_bail!("cannot drop replica of managed cluster");
1647                            }
1648                        }
1649                    }
1650                }
1651                _ => {}
1652            }
1653        }
1654
1655        for role_id in dropped_roles.keys() {
1656            self.catalog().ensure_not_reserved_role(role_id)?;
1657        }
1658        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1659        // If any role is a member of a dropped role, then we must revoke that membership.
1660        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1661        for role in self.catalog().user_roles() {
1662            for dropped_role_id in
1663                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1664            {
1665                role_revokes.insert((
1666                    **dropped_role_id,
1667                    role.id(),
1668                    *role
1669                        .membership
1670                        .map
1671                        .get(*dropped_role_id)
1672                        .expect("included in keys above"),
1673                ));
1674            }
1675        }
1676
1677        for (default_privilege_object, default_privilege_acls) in
1678            self.catalog().default_privileges()
1679        {
1680            if matches!(
1681                &default_privilege_object.database_id,
1682                Some(database_id) if dropped_databases.contains(database_id),
1683            ) || matches!(
1684                &default_privilege_object.schema_id,
1685                Some(schema_id) if dropped_schemas.contains(schema_id),
1686            ) {
1687                for default_privilege_acl in default_privilege_acls {
1688                    default_privilege_revokes.insert((
1689                        default_privilege_object.clone(),
1690                        default_privilege_acl.clone(),
1691                    ));
1692                }
1693            }
1694        }
1695
1696        let ops = role_revokes
1697            .into_iter()
1698            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1699                role_id,
1700                member_id,
1701                grantor_id,
1702            })
1703            .chain(default_privilege_revokes.into_iter().map(
1704                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1705                    privilege_object,
1706                    privilege_acl_item,
1707                    variant: UpdatePrivilegeVariant::Revoke,
1708                },
1709            ))
1710            .chain(iter::once(catalog::Op::DropObjects(
1711                ids.into_iter()
1712                    .map(DropObjectInfo::manual_drop_from_object_id)
1713                    .collect(),
1714            )))
1715            .collect();
1716
1717        Ok(DropOps {
1718            ops,
1719            dropped_active_db,
1720            dropped_active_cluster,
1721            dropped_in_use_indexes,
1722        })
1723    }
1724
1725    pub(super) fn sequence_explain_schema(
1726        &self,
1727        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1728    ) -> Result<ExecuteResponse, AdapterError> {
1729        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1730            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1731        })?;
1732
1733        let json_string = json_string(&json_value);
1734        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1735        Ok(Self::send_immediate_rows(row))
1736    }
1737
1738    pub(super) fn sequence_show_all_variables(
1739        &self,
1740        session: &Session,
1741    ) -> Result<ExecuteResponse, AdapterError> {
1742        let mut rows = viewable_variables(self.catalog().state(), session)
1743            .map(|v| (v.name(), v.value(), v.description()))
1744            .collect::<Vec<_>>();
1745        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1746
1747        // TODO(parkmycar): Pack all of these into a single RowCollection.
1748        let rows: Vec<_> = rows
1749            .into_iter()
1750            .map(|(name, val, desc)| {
1751                Row::pack_slice(&[
1752                    Datum::String(name),
1753                    Datum::String(&val),
1754                    Datum::String(desc),
1755                ])
1756            })
1757            .collect();
1758        Ok(Self::send_immediate_rows(rows))
1759    }
1760
1761    pub(super) fn sequence_show_variable(
1762        &self,
1763        session: &Session,
1764        plan: plan::ShowVariablePlan,
1765    ) -> Result<ExecuteResponse, AdapterError> {
1766        if &plan.name == SCHEMA_ALIAS {
1767            let schemas = self.catalog.resolve_search_path(session);
1768            let schema = schemas.first();
1769            return match schema {
1770                Some((database_spec, schema_spec)) => {
1771                    let schema_name = &self
1772                        .catalog
1773                        .get_schema(database_spec, schema_spec, session.conn_id())
1774                        .name()
1775                        .schema;
1776                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1777                    Ok(Self::send_immediate_rows(row))
1778                }
1779                None => {
1780                    if session.vars().current_object_missing_warnings() {
1781                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1782                            search_path: session
1783                                .vars()
1784                                .search_path()
1785                                .into_iter()
1786                                .map(|schema| schema.to_string())
1787                                .collect(),
1788                        });
1789                    }
1790                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1791                }
1792            };
1793        }
1794
1795        let variable = session
1796            .vars()
1797            .get(self.catalog().system_config(), &plan.name)
1798            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1799
1800        // In lieu of plumbing the user to all system config functions, just check that the var is
1801        // visible.
1802        variable.visible(session.user(), self.catalog().system_config())?;
1803
1804        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1805        if variable.name() == vars::DATABASE.name()
1806            && matches!(
1807                self.catalog().resolve_database(&variable.value()),
1808                Err(CatalogError::UnknownDatabase(_))
1809            )
1810            && session.vars().current_object_missing_warnings()
1811        {
1812            let name = variable.value();
1813            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1814        } else if variable.name() == vars::CLUSTER.name()
1815            && matches!(
1816                self.catalog().resolve_cluster(&variable.value()),
1817                Err(CatalogError::UnknownCluster(_))
1818            )
1819            && session.vars().current_object_missing_warnings()
1820        {
1821            let name = variable.value();
1822            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1823        }
1824        Ok(Self::send_immediate_rows(row))
1825    }
1826
1827    #[instrument]
1828    pub(super) async fn sequence_inspect_shard(
1829        &self,
1830        session: &Session,
1831        plan: plan::InspectShardPlan,
1832    ) -> Result<ExecuteResponse, AdapterError> {
1833        // TODO: Not thrilled about this rbac special case here, but probably
1834        // sufficient for now.
1835        if !session.user().is_internal() {
1836            return Err(AdapterError::Unauthorized(
1837                rbac::UnauthorizedError::MzSystem {
1838                    action: "inspect".into(),
1839                },
1840            ));
1841        }
1842        let state = self
1843            .controller
1844            .storage
1845            .inspect_persist_state(plan.id)
1846            .await?;
1847        let jsonb = Jsonb::from_serde_json(state)?;
1848        Ok(Self::send_immediate_rows(jsonb.into_row()))
1849    }
1850
1851    #[instrument]
1852    pub(super) fn sequence_set_variable(
1853        &self,
1854        session: &mut Session,
1855        plan: plan::SetVariablePlan,
1856    ) -> Result<ExecuteResponse, AdapterError> {
1857        let (name, local) = (plan.name, plan.local);
1858        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1859            self.validate_set_isolation_level(session)?;
1860        }
1861        if &name == vars::CLUSTER.name() {
1862            self.validate_set_cluster(session)?;
1863        }
1864
1865        let vars = session.vars_mut();
1866        let values = match plan.value {
1867            plan::VariableValue::Default => None,
1868            plan::VariableValue::Values(values) => Some(values),
1869        };
1870
1871        match values {
1872            Some(values) => {
1873                vars.set(
1874                    self.catalog().system_config(),
1875                    &name,
1876                    VarInput::SqlSet(&values),
1877                    local,
1878                )?;
1879
1880                let vars = session.vars();
1881
1882                // Emit a warning when deprecated variables are used.
1883                // TODO(database-issues#8069) remove this after sufficient time has passed
1884                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1885                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1886                } else if name == vars::CLUSTER.name()
1887                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1888                {
1889                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1890                }
1891
1892                // Database or cluster value does not correspond to a catalog item.
1893                if name.as_str() == vars::DATABASE.name()
1894                    && matches!(
1895                        self.catalog().resolve_database(vars.database()),
1896                        Err(CatalogError::UnknownDatabase(_))
1897                    )
1898                    && session.vars().current_object_missing_warnings()
1899                {
1900                    let name = vars.database().to_string();
1901                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1902                } else if name.as_str() == vars::CLUSTER.name()
1903                    && matches!(
1904                        self.catalog().resolve_cluster(vars.cluster()),
1905                        Err(CatalogError::UnknownCluster(_))
1906                    )
1907                    && session.vars().current_object_missing_warnings()
1908                {
1909                    let name = vars.cluster().to_string();
1910                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1911                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1912                    let v = values.into_first().to_lowercase();
1913                    if v == IsolationLevel::ReadUncommitted.as_str()
1914                        || v == IsolationLevel::ReadCommitted.as_str()
1915                        || v == IsolationLevel::RepeatableRead.as_str()
1916                    {
1917                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1918                            isolation_level: v,
1919                        });
1920                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1921                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1922                    }
1923                }
1924            }
1925            None => vars.reset(self.catalog().system_config(), &name, local)?,
1926        }
1927
1928        Ok(ExecuteResponse::SetVariable { name, reset: false })
1929    }
1930
1931    pub(super) fn sequence_reset_variable(
1932        &self,
1933        session: &mut Session,
1934        plan: plan::ResetVariablePlan,
1935    ) -> Result<ExecuteResponse, AdapterError> {
1936        let name = plan.name;
1937        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1938            self.validate_set_isolation_level(session)?;
1939        }
1940        if &name == vars::CLUSTER.name() {
1941            self.validate_set_cluster(session)?;
1942        }
1943        session
1944            .vars_mut()
1945            .reset(self.catalog().system_config(), &name, false)?;
1946        Ok(ExecuteResponse::SetVariable { name, reset: true })
1947    }
1948
1949    pub(super) fn sequence_set_transaction(
1950        &self,
1951        session: &mut Session,
1952        plan: plan::SetTransactionPlan,
1953    ) -> Result<ExecuteResponse, AdapterError> {
1954        // TODO(jkosh44) Only supports isolation levels for now.
1955        for mode in plan.modes {
1956            match mode {
1957                TransactionMode::AccessMode(_) => {
1958                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1959                }
1960                TransactionMode::IsolationLevel(isolation_level) => {
1961                    self.validate_set_isolation_level(session)?;
1962
1963                    session.vars_mut().set(
1964                        self.catalog().system_config(),
1965                        TRANSACTION_ISOLATION_VAR_NAME,
1966                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1967                        plan.local,
1968                    )?
1969                }
1970            }
1971        }
1972        Ok(ExecuteResponse::SetVariable {
1973            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1974            reset: false,
1975        })
1976    }
1977
1978    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1979        if session.transaction().contains_ops() {
1980            Err(AdapterError::InvalidSetIsolationLevel)
1981        } else {
1982            Ok(())
1983        }
1984    }
1985
1986    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1987        if session.transaction().contains_ops() {
1988            Err(AdapterError::InvalidSetCluster)
1989        } else {
1990            Ok(())
1991        }
1992    }
1993
1994    #[instrument]
1995    pub(super) async fn sequence_end_transaction(
1996        &mut self,
1997        mut ctx: ExecuteContext,
1998        mut action: EndTransactionAction,
1999    ) {
2000        // If the transaction has failed, we can only rollback.
2001        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2002            (&action, ctx.session().transaction())
2003        {
2004            action = EndTransactionAction::Rollback;
2005        }
2006        let response = match action {
2007            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2008                params: BTreeMap::new(),
2009            }),
2010            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2011                params: BTreeMap::new(),
2012            }),
2013        };
2014
2015        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2016
2017        let (response, action) = match result {
2018            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2019                (response, action)
2020            }
2021            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2022                // Make sure we have the correct set of write locks for this transaction.
2023                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2024                // transactions.
2025                let validated_locks = match write_lock_guards {
2026                    None => None,
2027                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2028                        Ok(locks) => Some(locks),
2029                        Err(missing) => {
2030                            tracing::error!(?missing, "programming error, missing write locks");
2031                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2032                        }
2033                    },
2034                };
2035
2036                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2037                for WriteOp { id, rows } in writes {
2038                    let total_rows = collected_writes.entry(id).or_default();
2039                    total_rows.push(rows);
2040                }
2041
2042                self.submit_write(PendingWriteTxn::User {
2043                    span: Span::current(),
2044                    writes: collected_writes,
2045                    write_locks: validated_locks,
2046                    pending_txn: PendingTxn {
2047                        ctx,
2048                        response,
2049                        action,
2050                    },
2051                });
2052                return;
2053            }
2054            Ok((
2055                Some(TransactionOps::Peeks {
2056                    determination,
2057                    requires_linearization: RequireLinearization::Required,
2058                    ..
2059                }),
2060                _,
2061            )) if ctx.session().vars().transaction_isolation()
2062                == &IsolationLevel::StrictSerializable =>
2063            {
2064                let conn_id = ctx.session().conn_id().clone();
2065                let pending_read_txn = PendingReadTxn {
2066                    txn: PendingRead::Read {
2067                        txn: PendingTxn {
2068                            ctx,
2069                            response,
2070                            action,
2071                        },
2072                    },
2073                    timestamp_context: determination.timestamp_context,
2074                    created: Instant::now(),
2075                    num_requeues: 0,
2076                    otel_ctx: OpenTelemetryContext::obtain(),
2077                };
2078                self.strict_serializable_reads_tx
2079                    .send((conn_id, pending_read_txn))
2080                    .expect("sending to strict_serializable_reads_tx cannot fail");
2081                return;
2082            }
2083            Ok((
2084                Some(TransactionOps::Peeks {
2085                    determination,
2086                    requires_linearization: RequireLinearization::Required,
2087                    ..
2088                }),
2089                _,
2090            )) if ctx.session().vars().transaction_isolation()
2091                == &IsolationLevel::StrongSessionSerializable =>
2092            {
2093                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2094                    ctx.session_mut()
2095                        .ensure_timestamp_oracle(timeline.clone())
2096                        .apply_write(*ts);
2097                }
2098                (response, action)
2099            }
2100            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2101                self.internal_cmd_tx
2102                    .send(Message::ExecuteSingleStatementTransaction {
2103                        ctx,
2104                        otel_ctx: OpenTelemetryContext::obtain(),
2105                        stmt,
2106                        params,
2107                    })
2108                    .expect("must send");
2109                return;
2110            }
2111            Ok((_, _)) => (response, action),
2112            Err(err) => (Err(err), EndTransactionAction::Rollback),
2113        };
2114        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2115        // Append any parameters that changed to the response.
2116        let response = response.map(|mut r| {
2117            r.extend_params(changed);
2118            ExecuteResponse::from(r)
2119        });
2120
2121        ctx.retire(response);
2122    }
2123
2124    #[instrument]
2125    async fn sequence_end_transaction_inner(
2126        &mut self,
2127        ctx: &mut ExecuteContext,
2128        action: EndTransactionAction,
2129    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2130        let txn = self.clear_transaction(ctx.session_mut()).await;
2131
2132        if let EndTransactionAction::Commit = action {
2133            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2134                match &mut ops {
2135                    TransactionOps::Writes(writes) => {
2136                        for WriteOp { id, .. } in &mut writes.iter() {
2137                            // Re-verify this id exists.
2138                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2139                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2140                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2141                                })
2142                            })?;
2143                        }
2144
2145                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2146                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2147                    }
2148                    TransactionOps::DDL {
2149                        ops,
2150                        state: _,
2151                        side_effects,
2152                        revision,
2153                        snapshot: _,
2154                    } => {
2155                        // Make sure our catalog hasn't changed.
2156                        if *revision != self.catalog().transient_revision() {
2157                            return Err(AdapterError::DDLTransactionRace);
2158                        }
2159                        // Commit all of our queued ops.
2160                        let ops = std::mem::take(ops);
2161                        let side_effects = std::mem::take(side_effects);
2162                        self.catalog_transact_with_side_effects(
2163                            Some(ctx),
2164                            ops,
2165                            move |a, mut ctx| {
2166                                Box::pin(async move {
2167                                    for side_effect in side_effects {
2168                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2169                                    }
2170                                })
2171                            },
2172                        )
2173                        .await?;
2174                    }
2175                    _ => (),
2176                }
2177                return Ok((Some(ops), write_lock_guards));
2178            }
2179        }
2180
2181        Ok((None, None))
2182    }
2183
2184    pub(super) async fn sequence_side_effecting_func(
2185        &mut self,
2186        ctx: ExecuteContext,
2187        plan: SideEffectingFunc,
2188    ) {
2189        match plan {
2190            SideEffectingFunc::PgCancelBackend { connection_id } => {
2191                if ctx.session().conn_id().unhandled() == connection_id {
2192                    // As a special case, if we're canceling ourselves, we send
2193                    // back a canceled resposne to the client issuing the query,
2194                    // and so we need to do no further processing of the cancel.
2195                    ctx.retire(Err(AdapterError::Canceled));
2196                    return;
2197                }
2198
2199                let res = if let Some((id_handle, _conn_meta)) =
2200                    self.active_conns.get_key_value(&connection_id)
2201                {
2202                    // check_plan already verified role membership.
2203                    self.handle_privileged_cancel(id_handle.clone()).await;
2204                    Datum::True
2205                } else {
2206                    Datum::False
2207                };
2208                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2209            }
2210        }
2211    }
2212
2213    /// Execute a side-effecting function from the frontend peek path.
2214    /// This is separate from `sequence_side_effecting_func` because
2215    /// - It doesn't have an ExecuteContext.
2216    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2217    ///   peek sequencing can't look at `active_conns`.
2218    ///
2219    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2220    /// sequencing.
2221    pub(crate) async fn execute_side_effecting_func(
2222        &mut self,
2223        plan: SideEffectingFunc,
2224        conn_id: ConnectionId,
2225        current_role: RoleId,
2226    ) -> Result<ExecuteResponse, AdapterError> {
2227        match plan {
2228            SideEffectingFunc::PgCancelBackend { connection_id } => {
2229                if conn_id.unhandled() == connection_id {
2230                    // As a special case, if we're canceling ourselves, we return
2231                    // a canceled response to the client issuing the query,
2232                    // and so we need to do no further processing of the cancel.
2233                    return Err(AdapterError::Canceled);
2234                }
2235
2236                // Perform RBAC check: the current user must be a member of the role
2237                // that owns the connection being cancelled.
2238                if let Some((_id_handle, conn_meta)) =
2239                    self.active_conns.get_key_value(&connection_id)
2240                {
2241                    let target_role = *conn_meta.authenticated_role_id();
2242                    let role_membership = self
2243                        .catalog()
2244                        .state()
2245                        .collect_role_membership(&current_role);
2246                    if !role_membership.contains(&target_role) {
2247                        let target_role_name = self
2248                            .catalog()
2249                            .try_get_role(&target_role)
2250                            .map(|role| role.name().to_string())
2251                            .unwrap_or_else(|| target_role.to_string());
2252                        return Err(AdapterError::Unauthorized(
2253                            rbac::UnauthorizedError::RoleMembership {
2254                                role_names: vec![target_role_name],
2255                            },
2256                        ));
2257                    }
2258
2259                    // RBAC check passed, proceed with cancellation.
2260                    let id_handle = self
2261                        .active_conns
2262                        .get_key_value(&connection_id)
2263                        .map(|(id, _)| id.clone())
2264                        .expect("checked above");
2265                    self.handle_privileged_cancel(id_handle).await;
2266                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2267                } else {
2268                    // Connection not found, return false.
2269                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2270                }
2271            }
2272        }
2273    }
2274
2275    /// Inner method that performs the actual real-time recency timestamp determination.
2276    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2277    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2278    pub(crate) async fn determine_real_time_recent_timestamp(
2279        &self,
2280        source_ids: impl Iterator<Item = GlobalId>,
2281        real_time_recency_timeout: Duration,
2282    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2283        let item_ids = source_ids
2284            .map(|gid| {
2285                self.catalog
2286                    .try_resolve_item_id(&gid)
2287                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2288            })
2289            .collect::<Result<Vec<_>, _>>()?;
2290
2291        // Find all dependencies transitively because we need to ensure that
2292        // RTR queries determine the timestamp from the sources' (i.e.
2293        // storage objects that ingest data from external systems) remap
2294        // data. We "cheat" a little bit and filter out any IDs that aren't
2295        // user objects because we know they are not a RTR source.
2296        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2297        // If none of the sources are user objects, we don't need to provide
2298        // a RTR timestamp.
2299        if to_visit.is_empty() {
2300            return Ok(None);
2301        }
2302
2303        let mut timestamp_objects = BTreeSet::new();
2304
2305        while let Some(id) = to_visit.pop_front() {
2306            timestamp_objects.insert(id);
2307            to_visit.extend(
2308                self.catalog()
2309                    .get_entry(&id)
2310                    .uses()
2311                    .into_iter()
2312                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2313            );
2314        }
2315        let timestamp_objects = timestamp_objects
2316            .into_iter()
2317            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2318            .collect();
2319
2320        let r = self
2321            .controller
2322            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2323            .await?;
2324
2325        Ok(Some(r))
2326    }
2327
2328    /// Checks to see if the session needs a real time recency timestamp and if so returns
2329    /// a future that will return the timestamp.
2330    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2331        &self,
2332        session: &Session,
2333        source_ids: impl Iterator<Item = GlobalId>,
2334    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2335        let vars = session.vars();
2336
2337        if vars.real_time_recency()
2338            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2339            && !session.contains_read_timestamp()
2340        {
2341            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2342                .await
2343        } else {
2344            Ok(None)
2345        }
2346    }
2347
2348    #[instrument]
2349    pub(super) async fn sequence_explain_plan(
2350        &mut self,
2351        ctx: ExecuteContext,
2352        plan: plan::ExplainPlanPlan,
2353        target_cluster: TargetCluster,
2354    ) {
2355        match &plan.explainee {
2356            plan::Explainee::Statement(stmt) => match stmt {
2357                plan::ExplaineeStatement::CreateView { .. } => {
2358                    self.explain_create_view(ctx, plan).await;
2359                }
2360                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2361                    self.explain_create_materialized_view(ctx, plan).await;
2362                }
2363                plan::ExplaineeStatement::CreateIndex { .. } => {
2364                    self.explain_create_index(ctx, plan).await;
2365                }
2366                plan::ExplaineeStatement::Select { .. } => {
2367                    self.explain_peek(ctx, plan, target_cluster).await;
2368                }
2369                plan::ExplaineeStatement::Subscribe { .. } => {
2370                    self.explain_subscribe(ctx, plan, target_cluster).await;
2371                }
2372            },
2373            plan::Explainee::View(_) => {
2374                let result = self.explain_view(&ctx, plan);
2375                ctx.retire(result);
2376            }
2377            plan::Explainee::MaterializedView(_) => {
2378                let result = self.explain_materialized_view(&ctx, plan);
2379                ctx.retire(result);
2380            }
2381            plan::Explainee::Index(_) => {
2382                let result = self.explain_index(&ctx, plan);
2383                ctx.retire(result);
2384            }
2385            plan::Explainee::ReplanView(_) => {
2386                self.explain_replan_view(ctx, plan).await;
2387            }
2388            plan::Explainee::ReplanMaterializedView(_) => {
2389                self.explain_replan_materialized_view(ctx, plan).await;
2390            }
2391            plan::Explainee::ReplanIndex(_) => {
2392                self.explain_replan_index(ctx, plan).await;
2393            }
2394        };
2395    }
2396
2397    pub(super) async fn sequence_explain_pushdown(
2398        &mut self,
2399        ctx: ExecuteContext,
2400        plan: plan::ExplainPushdownPlan,
2401        target_cluster: TargetCluster,
2402    ) {
2403        match plan.explainee {
2404            Explainee::Statement(ExplaineeStatement::Select {
2405                broken: false,
2406                plan,
2407                desc: _,
2408            }) => {
2409                let stage = return_if_err!(
2410                    self.peek_validate(
2411                        ctx.session(),
2412                        plan,
2413                        target_cluster,
2414                        None,
2415                        ExplainContext::Pushdown,
2416                        Some(ctx.session().vars().max_query_result_size()),
2417                    ),
2418                    ctx
2419                );
2420                self.sequence_staged(ctx, Span::current(), stage).await;
2421            }
2422            Explainee::MaterializedView(item_id) => {
2423                self.explain_pushdown_materialized_view(ctx, item_id).await;
2424            }
2425            _ => {
2426                ctx.retire(Err(AdapterError::Unsupported(
2427                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2428                )));
2429            }
2430        };
2431    }
2432
2433    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2434    async fn execute_explain_pushdown_with_read_holds(
2435        &self,
2436        ctx: ExecuteContext,
2437        as_of: Antichain<Timestamp>,
2438        mz_now: ResultSpec<'static>,
2439        read_holds: Option<ReadHolds<Timestamp>>,
2440        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2441    ) {
2442        let fut = self
2443            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2444            .await;
2445        task::spawn(|| "render explain pushdown", async move {
2446            // Transfer the necessary read holds over to the background task
2447            let _read_holds = read_holds;
2448            let res = fut.await;
2449            ctx.retire(res);
2450        });
2451    }
2452
2453    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2454    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2455        &self,
2456        session: &Session,
2457        as_of: Antichain<Timestamp>,
2458        mz_now: ResultSpec<'static>,
2459        imports: I,
2460    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2461        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2462        super::explain_pushdown_future_inner(
2463            session,
2464            &self.catalog,
2465            &self.controller.storage_collections,
2466            as_of,
2467            mz_now,
2468            imports,
2469        )
2470        .await
2471    }
2472
2473    #[instrument]
2474    pub(super) async fn sequence_insert(
2475        &mut self,
2476        mut ctx: ExecuteContext,
2477        plan: plan::InsertPlan,
2478    ) {
2479        // Normally, this would get checked when trying to add "write ops" to
2480        // the transaction but we go down diverging paths below, based on
2481        // whether the INSERT is only constant values or not.
2482        //
2483        // For the non-constant case we sequence an implicit read-then-write,
2484        // which messes with the transaction ops and would allow an implicit
2485        // read-then-write to sneak into a read-only transaction.
2486        if !ctx.session_mut().transaction().allows_writes() {
2487            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2488            return;
2489        }
2490
2491        // The structure of this code originates from a time where
2492        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2493        // optimized `MirRelationExpr`.
2494        //
2495        // Ideally, we would like to make the `selection.as_const().is_some()`
2496        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2497        // are planned as a Wrap($n, $vals) call, so until we can reduce
2498        // HirRelationExpr this will always returns false.
2499        //
2500        // Unfortunately, hitting the default path of the match below also
2501        // causes a lot of tests to fail, so we opted to go with the extra
2502        // `plan.values.clone()` statements when producing the `optimized_mir`
2503        // and re-optimize the values in the `sequence_read_then_write` call.
2504        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2505            // We don't perform any optimizations on an expression that is already
2506            // a constant for writes, as we want to maximize bulk-insert throughput.
2507            let expr = return_if_err!(
2508                plan.values
2509                    .clone()
2510                    .lower(self.catalog().system_config(), None),
2511                ctx
2512            );
2513            OptimizedMirRelationExpr(expr)
2514        } else {
2515            // Collect optimizer parameters.
2516            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2517
2518            // (`optimize::view::Optimizer` has a special case for constant queries.)
2519            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2520
2521            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2522            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2523        };
2524
2525        match optimized_mir.into_inner() {
2526            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2527                let catalog = self.owned_catalog();
2528                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2529                    let result =
2530                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2531                    ctx.retire(result);
2532                });
2533            }
2534            // All non-constant values must be planned as read-then-writes.
2535            _ => {
2536                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2537                    Some(table) => {
2538                        // Inserts always occur at the latest version of the table.
2539                        let desc = table.relation_desc_latest().expect("table has a desc");
2540                        desc.arity()
2541                    }
2542                    None => {
2543                        ctx.retire(Err(AdapterError::Catalog(
2544                            mz_catalog::memory::error::Error {
2545                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2546                                    plan.id.to_string(),
2547                                )),
2548                            },
2549                        )));
2550                        return;
2551                    }
2552                };
2553
2554                let finishing = RowSetFinishing {
2555                    order_by: vec![],
2556                    limit: None,
2557                    offset: 0,
2558                    project: (0..desc_arity).collect(),
2559                };
2560
2561                let read_then_write_plan = plan::ReadThenWritePlan {
2562                    id: plan.id,
2563                    selection: plan.values,
2564                    finishing,
2565                    assignments: BTreeMap::new(),
2566                    kind: MutationKind::Insert,
2567                    returning: plan.returning,
2568                };
2569
2570                self.sequence_read_then_write(ctx, read_then_write_plan)
2571                    .await;
2572            }
2573        }
2574    }
2575
2576    /// ReadThenWrite is a plan whose writes depend on the results of a
2577    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2578    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2579    /// serializability violation could occur.
2580    #[instrument]
2581    pub(super) async fn sequence_read_then_write(
2582        &mut self,
2583        mut ctx: ExecuteContext,
2584        plan: plan::ReadThenWritePlan,
2585    ) {
2586        let mut source_ids: BTreeSet<_> = plan
2587            .selection
2588            .depends_on()
2589            .into_iter()
2590            .map(|gid| self.catalog().resolve_item_id(&gid))
2591            .collect();
2592        source_ids.insert(plan.id);
2593
2594        // If the transaction doesn't already have write locks, acquire them.
2595        if ctx.session().transaction().write_locks().is_none() {
2596            // Pre-define all of the locks we need.
2597            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2598
2599            // Try acquiring all of our locks.
2600            for id in &source_ids {
2601                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2602                    write_locks.insert_lock(*id, lock);
2603                }
2604            }
2605
2606            // See if we acquired all of the neccessary locks.
2607            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2608                Ok(locks) => locks,
2609                Err(missing) => {
2610                    // Defer our write if we couldn't acquire all of the locks.
2611                    let role_metadata = ctx.session().role_metadata().clone();
2612                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2613                    let plan = DeferredPlan {
2614                        ctx,
2615                        plan: Plan::ReadThenWrite(plan),
2616                        validity: PlanValidity::new(
2617                            self.catalog.transient_revision(),
2618                            source_ids.clone(),
2619                            None,
2620                            None,
2621                            role_metadata,
2622                        ),
2623                        requires_locks: source_ids,
2624                    };
2625                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2626                }
2627            };
2628
2629            ctx.session_mut()
2630                .try_grant_write_locks(write_locks)
2631                .expect("session has already been granted write locks");
2632        }
2633
2634        let plan::ReadThenWritePlan {
2635            id,
2636            kind,
2637            selection,
2638            mut assignments,
2639            finishing,
2640            mut returning,
2641        } = plan;
2642
2643        // Read then writes can be queued, so re-verify the id exists.
2644        let desc = match self.catalog().try_get_entry(&id) {
2645            Some(table) => {
2646                // Inserts always occur at the latest version of the table.
2647                table
2648                    .relation_desc_latest()
2649                    .expect("table has a desc")
2650                    .into_owned()
2651            }
2652            None => {
2653                ctx.retire(Err(AdapterError::Catalog(
2654                    mz_catalog::memory::error::Error {
2655                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2656                    },
2657                )));
2658                return;
2659            }
2660        };
2661
2662        // Disallow mz_now in any position because read time and write time differ.
2663        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2664            || assignments.values().any(|e| e.contains_temporal())
2665            || returning.iter().any(|e| e.contains_temporal());
2666        if contains_temporal {
2667            ctx.retire(Err(AdapterError::Unsupported(
2668                "calls to mz_now in write statements",
2669            )));
2670            return;
2671        }
2672
2673        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2674        //
2675        // - They do not refer to any objects whose notion of time moves differently than that of
2676        // user tables. This limitation is meant to ensure no writes occur between this read and the
2677        // subsequent write.
2678        // - They do not use mz_now(), whose time produced during read will differ from the write
2679        //   timestamp.
2680        fn validate_read_dependencies(
2681            catalog: &Catalog,
2682            id: &CatalogItemId,
2683        ) -> Result<(), AdapterError> {
2684            use CatalogItemType::*;
2685            use mz_catalog::memory::objects;
2686            let mut ids_to_check = Vec::new();
2687            let valid = match catalog.try_get_entry(id) {
2688                Some(entry) => {
2689                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2690                    | CatalogItem::MaterializedView(objects::MaterializedView {
2691                        optimized_expr,
2692                        ..
2693                    }) = entry.item()
2694                    {
2695                        if optimized_expr.contains_temporal() {
2696                            return Err(AdapterError::Unsupported(
2697                                "calls to mz_now in write statements",
2698                            ));
2699                        }
2700                    }
2701                    match entry.item().typ() {
2702                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2703                            ids_to_check.extend(entry.uses());
2704                            let valid_id = id.is_user() || matches!(typ, Func);
2705                            valid_id
2706                        }
2707                        Source | Secret | Connection => false,
2708                        // Cannot select from sinks or indexes.
2709                        Sink | Index => unreachable!(),
2710                        Table => {
2711                            if !id.is_user() {
2712                                // We can't read from non-user tables
2713                                false
2714                            } else {
2715                                // We can't read from tables that are source-exports
2716                                entry.source_export_details().is_none()
2717                            }
2718                        }
2719                        Type => true,
2720                    }
2721                }
2722                None => false,
2723            };
2724            if !valid {
2725                let (object_name, object_type) = match catalog.try_get_entry(id) {
2726                    Some(entry) => {
2727                        let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2728                        let object_type = match entry.item().typ() {
2729                            // We only need the disallowed types here; the allowed types are handled above.
2730                            Source => "source",
2731                            Secret => "secret",
2732                            Connection => "connection",
2733                            Table => {
2734                                if !id.is_user() {
2735                                    "system table"
2736                                } else {
2737                                    "source-export table"
2738                                }
2739                            }
2740                            View => "system view",
2741                            MaterializedView => "system materialized view",
2742                            ContinualTask => "system task",
2743                            _ => "invalid dependency",
2744                        };
2745                        (object_name, object_type.to_string())
2746                    }
2747                    None => (id.to_string(), "unknown".to_string()),
2748                };
2749                return Err(AdapterError::InvalidTableMutationSelection {
2750                    object_name,
2751                    object_type,
2752                });
2753            }
2754            for id in ids_to_check {
2755                validate_read_dependencies(catalog, &id)?;
2756            }
2757            Ok(())
2758        }
2759
2760        for gid in selection.depends_on() {
2761            let item_id = self.catalog().resolve_item_id(&gid);
2762            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2763                ctx.retire(Err(err));
2764                return;
2765            }
2766        }
2767
2768        let (peek_tx, peek_rx) = oneshot::channel();
2769        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2770        let (tx, _, session, extra) = ctx.into_parts();
2771        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2772        // execution context, because this peek does not directly correspond to an execute,
2773        // and so we don't need to take any action on its retirement.
2774        // TODO[btv]: we might consider extending statement logging to log the inner
2775        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2776        // and mint a new "real" execution context here. We'd also have to add some logic to
2777        // make sure such "sub-statements" are always sampled when the top-level statement is
2778        //
2779        // It's debatable whether this makes sense conceptually,
2780        // because the inner fragment here is not actually a
2781        // "statement" in its own right.
2782        let peek_ctx = ExecuteContext::from_parts(
2783            peek_client_tx,
2784            self.internal_cmd_tx.clone(),
2785            session,
2786            Default::default(),
2787        );
2788
2789        self.sequence_peek(
2790            peek_ctx,
2791            plan::SelectPlan {
2792                select: None,
2793                source: selection,
2794                when: QueryWhen::FreshestTableWrite,
2795                finishing,
2796                copy_to: None,
2797            },
2798            TargetCluster::Active,
2799            None,
2800        )
2801        .await;
2802
2803        let internal_cmd_tx = self.internal_cmd_tx.clone();
2804        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2805        let catalog = self.owned_catalog();
2806        let max_result_size = self.catalog().system_config().max_result_size();
2807
2808        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2809            let (peek_response, session) = match peek_rx.await {
2810                Ok(Response {
2811                    result: Ok(resp),
2812                    session,
2813                    otel_ctx,
2814                }) => {
2815                    otel_ctx.attach_as_parent();
2816                    (resp, session)
2817                }
2818                Ok(Response {
2819                    result: Err(e),
2820                    session,
2821                    otel_ctx,
2822                }) => {
2823                    let ctx =
2824                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2825                    otel_ctx.attach_as_parent();
2826                    ctx.retire(Err(e));
2827                    return;
2828                }
2829                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2830                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2831            };
2832            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2833            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2834
2835            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2836            if timeout_dur == Duration::ZERO {
2837                timeout_dur = Duration::MAX;
2838            }
2839
2840            let style = ExprPrepOneShot {
2841                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2842                session: ctx.session(),
2843                catalog_state: catalog.state(),
2844            };
2845            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2846                return_if_err!(style.prep_scalar_expr(expr), ctx);
2847            }
2848
2849            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2850                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2851                    let arena = RowArena::new();
2852                    let mut diffs = Vec::new();
2853                    let mut datum_vec = mz_repr::DatumVec::new();
2854
2855                    while let Some(row) = rows.next() {
2856                        if !assignments.is_empty() {
2857                            assert!(
2858                                matches!(kind, MutationKind::Update),
2859                                "only updates support assignments"
2860                            );
2861                            let mut datums = datum_vec.borrow_with(row);
2862                            let mut updates = vec![];
2863                            for (idx, expr) in &assignments {
2864                                let updated = match expr.eval(&datums, &arena) {
2865                                    Ok(updated) => updated,
2866                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2867                                };
2868                                updates.push((*idx, updated));
2869                            }
2870                            for (idx, new_value) in updates {
2871                                datums[idx] = new_value;
2872                            }
2873                            let updated = Row::pack_slice(&datums);
2874                            diffs.push((updated, Diff::ONE));
2875                        }
2876                        match kind {
2877                            // Updates and deletes always remove the
2878                            // current row. Updates will also add an
2879                            // updated value.
2880                            MutationKind::Update | MutationKind::Delete => {
2881                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2882                            }
2883                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2884                        }
2885                    }
2886
2887                    // Sum of all the rows' byte size, for checking if we go
2888                    // above the max_result_size threshold.
2889                    let mut byte_size: u64 = 0;
2890                    for (row, diff) in &diffs {
2891                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2892                        if diff.is_positive() {
2893                            for (idx, datum) in row.iter().enumerate() {
2894                                desc.constraints_met(idx, &datum)?;
2895                            }
2896                        }
2897                    }
2898                    Ok((diffs, byte_size))
2899                };
2900
2901            let diffs = match peek_response {
2902                ExecuteResponse::SendingRowsStreaming {
2903                    rows: mut rows_stream,
2904                    ..
2905                } => {
2906                    let mut byte_size: u64 = 0;
2907                    let mut diffs = Vec::new();
2908                    let result = loop {
2909                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2910                            Ok(Some(res)) => match res {
2911                                PeekResponseUnary::Rows(new_rows) => {
2912                                    match make_diffs(new_rows) {
2913                                        Ok((mut new_diffs, new_byte_size)) => {
2914                                            byte_size = byte_size.saturating_add(new_byte_size);
2915                                            if byte_size > max_result_size {
2916                                                break Err(AdapterError::ResultSize(format!(
2917                                                    "result exceeds max size of {max_result_size}"
2918                                                )));
2919                                            }
2920                                            diffs.append(&mut new_diffs)
2921                                        }
2922                                        Err(e) => break Err(e),
2923                                    };
2924                                }
2925                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2926                                PeekResponseUnary::Error(e) => {
2927                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2928                                }
2929                            },
2930                            Ok(None) => break Ok(diffs),
2931                            Err(_) => {
2932                                // We timed out, so remove the pending peek. This is
2933                                // best-effort and doesn't guarantee we won't
2934                                // receive a response.
2935                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2936                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2937                                    conn_id: ctx.session().conn_id().clone(),
2938                                });
2939                                if let Err(e) = result {
2940                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2941                                }
2942                                break Err(AdapterError::StatementTimeout);
2943                            }
2944                        }
2945                    };
2946
2947                    result
2948                }
2949                ExecuteResponse::SendingRowsImmediate { rows } => {
2950                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2951                }
2952                resp => Err(AdapterError::Unstructured(anyhow!(
2953                    "unexpected peek response: {resp:?}"
2954                ))),
2955            };
2956
2957            let mut returning_rows = Vec::new();
2958            let mut diff_err: Option<AdapterError> = None;
2959            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2960                let arena = RowArena::new();
2961                for (row, diff) in diffs {
2962                    if !diff.is_positive() {
2963                        continue;
2964                    }
2965                    let mut returning_row = Row::with_capacity(returning.len());
2966                    let mut packer = returning_row.packer();
2967                    for expr in &returning {
2968                        let datums: Vec<_> = row.iter().collect();
2969                        match expr.eval(&datums, &arena) {
2970                            Ok(datum) => {
2971                                packer.push(datum);
2972                            }
2973                            Err(err) => {
2974                                diff_err = Some(err.into());
2975                                break;
2976                            }
2977                        }
2978                    }
2979                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2980                    let diff = match NonZeroUsize::try_from(diff) {
2981                        Ok(diff) => diff,
2982                        Err(err) => {
2983                            diff_err = Some(err.into());
2984                            break;
2985                        }
2986                    };
2987                    returning_rows.push((returning_row, diff));
2988                    if diff_err.is_some() {
2989                        break;
2990                    }
2991                }
2992            }
2993            let diffs = if let Some(err) = diff_err {
2994                Err(err)
2995            } else {
2996                diffs
2997            };
2998
2999            // We need to clear out the timestamp context so the write doesn't fail due to a
3000            // read only transaction.
3001            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3002            // No matter what isolation level the client is using, we must linearize this
3003            // read. The write will be performed right after this, as part of a single
3004            // transaction, so the write must have a timestamp greater than or equal to the
3005            // read.
3006            //
3007            // Note: It's only OK for the write to have a greater timestamp than the read
3008            // because the write lock prevents any other writes from happening in between
3009            // the read and write.
3010            if let Some(timestamp_context) = timestamp_context {
3011                let (tx, rx) = tokio::sync::oneshot::channel();
3012                let conn_id = ctx.session().conn_id().clone();
3013                let pending_read_txn = PendingReadTxn {
3014                    txn: PendingRead::ReadThenWrite { ctx, tx },
3015                    timestamp_context,
3016                    created: Instant::now(),
3017                    num_requeues: 0,
3018                    otel_ctx: OpenTelemetryContext::obtain(),
3019                };
3020                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3021                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3022                if let Err(e) = result {
3023                    warn!(
3024                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3025                        e
3026                    );
3027                    return;
3028                }
3029                let result = rx.await;
3030                // It is not an error for these results to be ready after `tx` has been dropped.
3031                ctx = match result {
3032                    Ok(Some(ctx)) => ctx,
3033                    Ok(None) => {
3034                        // Coordinator took our context and will handle responding to the client.
3035                        // This usually indicates that our transaction was aborted.
3036                        return;
3037                    }
3038                    Err(e) => {
3039                        warn!(
3040                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3041                            e
3042                        );
3043                        return;
3044                    }
3045                };
3046            }
3047
3048            match diffs {
3049                Ok(diffs) => {
3050                    let result = Self::send_diffs(
3051                        ctx.session_mut(),
3052                        plan::SendDiffsPlan {
3053                            id,
3054                            updates: diffs,
3055                            kind,
3056                            returning: returning_rows,
3057                            max_result_size,
3058                        },
3059                    );
3060                    ctx.retire(result);
3061                }
3062                Err(e) => {
3063                    ctx.retire(Err(e));
3064                }
3065            }
3066        });
3067    }
3068
3069    #[instrument]
3070    pub(super) async fn sequence_alter_item_rename(
3071        &mut self,
3072        ctx: &mut ExecuteContext,
3073        plan: plan::AlterItemRenamePlan,
3074    ) -> Result<ExecuteResponse, AdapterError> {
3075        let op = catalog::Op::RenameItem {
3076            id: plan.id,
3077            current_full_name: plan.current_full_name,
3078            to_name: plan.to_name,
3079        };
3080        match self
3081            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3082            .await
3083        {
3084            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3085            Err(err) => Err(err),
3086        }
3087    }
3088
3089    #[instrument]
3090    pub(super) async fn sequence_alter_retain_history(
3091        &mut self,
3092        ctx: &mut ExecuteContext,
3093        plan: plan::AlterRetainHistoryPlan,
3094    ) -> Result<ExecuteResponse, AdapterError> {
3095        soft_assert_or_log!(
3096            !matches!(
3097                self.catalog().get_entry(&plan.id).item(),
3098                CatalogItem::ContinualTask(_)
3099            ),
3100            "RETAIN HISTORY is not supported on continual tasks"
3101        );
3102        let ops = vec![catalog::Op::AlterRetainHistory {
3103            id: plan.id,
3104            value: plan.value,
3105            window: plan.window,
3106        }];
3107        self.catalog_transact_with_context(None, Some(ctx), ops)
3108            .await?;
3109        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3110    }
3111
3112    #[instrument]
3113    pub(super) async fn sequence_alter_source_timestamp_interval(
3114        &mut self,
3115        ctx: &mut ExecuteContext,
3116        plan: plan::AlterSourceTimestampIntervalPlan,
3117    ) -> Result<ExecuteResponse, AdapterError> {
3118        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3119            id: plan.id,
3120            value: plan.value,
3121            interval: plan.interval,
3122        }];
3123        self.catalog_transact_with_context(None, Some(ctx), ops)
3124            .await?;
3125        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3126    }
3127
3128    #[instrument]
3129    pub(super) async fn sequence_alter_schema_rename(
3130        &mut self,
3131        ctx: &mut ExecuteContext,
3132        plan: plan::AlterSchemaRenamePlan,
3133    ) -> Result<ExecuteResponse, AdapterError> {
3134        let (database_spec, schema_spec) = plan.cur_schema_spec;
3135        let op = catalog::Op::RenameSchema {
3136            database_spec,
3137            schema_spec,
3138            new_name: plan.new_schema_name,
3139            check_reserved_names: true,
3140        };
3141        match self
3142            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3143            .await
3144        {
3145            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3146            Err(err) => Err(err),
3147        }
3148    }
3149
3150    #[instrument]
3151    pub(super) async fn sequence_alter_schema_swap(
3152        &mut self,
3153        ctx: &mut ExecuteContext,
3154        plan: plan::AlterSchemaSwapPlan,
3155    ) -> Result<ExecuteResponse, AdapterError> {
3156        let plan::AlterSchemaSwapPlan {
3157            schema_a_spec: (schema_a_db, schema_a),
3158            schema_a_name,
3159            schema_b_spec: (schema_b_db, schema_b),
3160            schema_b_name,
3161            name_temp,
3162        } = plan;
3163
3164        let op_a = catalog::Op::RenameSchema {
3165            database_spec: schema_a_db,
3166            schema_spec: schema_a,
3167            new_name: name_temp,
3168            check_reserved_names: false,
3169        };
3170        let op_b = catalog::Op::RenameSchema {
3171            database_spec: schema_b_db,
3172            schema_spec: schema_b,
3173            new_name: schema_a_name,
3174            check_reserved_names: false,
3175        };
3176        let op_c = catalog::Op::RenameSchema {
3177            database_spec: schema_a_db,
3178            schema_spec: schema_a,
3179            new_name: schema_b_name,
3180            check_reserved_names: false,
3181        };
3182
3183        match self
3184            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3185                Box::pin(async {})
3186            })
3187            .await
3188        {
3189            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3190            Err(err) => Err(err),
3191        }
3192    }
3193
3194    #[instrument]
3195    pub(super) async fn sequence_alter_role(
3196        &mut self,
3197        session: &Session,
3198        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3199    ) -> Result<ExecuteResponse, AdapterError> {
3200        let catalog = self.catalog().for_session(session);
3201        let role = catalog.get_role(&id);
3202
3203        // We'll send these notices to the user, if the operation is successful.
3204        let mut notices = vec![];
3205
3206        // Get the attributes and variables from the role, as they currently are.
3207        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3208        let mut vars = role.vars().clone();
3209
3210        // Whether to set the password to NULL. This is a special case since the existing
3211        // password is not stored in the role attributes.
3212        let mut nopassword = false;
3213
3214        // Apply our updates.
3215        match option {
3216            PlannedAlterRoleOption::Attributes(attrs) => {
3217                self.validate_role_attributes(&attrs.clone().into())?;
3218
3219                if let Some(inherit) = attrs.inherit {
3220                    attributes.inherit = inherit;
3221                }
3222
3223                if let Some(password) = attrs.password {
3224                    attributes.password = Some(password);
3225                    attributes.scram_iterations =
3226                        Some(self.catalog().system_config().scram_iterations())
3227                }
3228
3229                if let Some(superuser) = attrs.superuser {
3230                    attributes.superuser = Some(superuser);
3231                }
3232
3233                if let Some(login) = attrs.login {
3234                    attributes.login = Some(login);
3235                }
3236
3237                if attrs.nopassword.unwrap_or(false) {
3238                    nopassword = true;
3239                }
3240
3241                if let Some(notice) = self.should_emit_rbac_notice(session) {
3242                    notices.push(notice);
3243                }
3244            }
3245            PlannedAlterRoleOption::Variable(variable) => {
3246                // Get the variable to make sure it's valid and visible.
3247                let session_var = session.vars().inspect(variable.name())?;
3248                // Return early if it's not visible.
3249                session_var.visible(session.user(), catalog.system_vars())?;
3250
3251                // Emit a warning when deprecated variables are used.
3252                // TODO(database-issues#8069) remove this after sufficient time has passed
3253                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3254                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3255                } else if let PlannedRoleVariable::Set {
3256                    name,
3257                    value: VariableValue::Values(vals),
3258                } = &variable
3259                {
3260                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3261                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3262                    }
3263                }
3264
3265                let var_name = match variable {
3266                    PlannedRoleVariable::Set { name, value } => {
3267                        // Update our persisted set.
3268                        match &value {
3269                            VariableValue::Default => {
3270                                vars.remove(&name);
3271                            }
3272                            VariableValue::Values(vals) => {
3273                                let var = match &vals[..] {
3274                                    [val] => OwnedVarInput::Flat(val.clone()),
3275                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3276                                };
3277                                // Make sure the input is valid.
3278                                session_var.check(var.borrow())?;
3279
3280                                vars.insert(name.clone(), var);
3281                            }
3282                        };
3283                        name
3284                    }
3285                    PlannedRoleVariable::Reset { name } => {
3286                        // Remove it from our persisted values.
3287                        vars.remove(&name);
3288                        name
3289                    }
3290                };
3291
3292                // Emit a notice that they need to reconnect to see the change take effect.
3293                notices.push(AdapterNotice::VarDefaultUpdated {
3294                    role: Some(name.clone()),
3295                    var_name: Some(var_name),
3296                });
3297            }
3298        }
3299
3300        let op = catalog::Op::AlterRole {
3301            id,
3302            name,
3303            attributes,
3304            nopassword,
3305            vars: RoleVars { map: vars },
3306        };
3307        let response = self
3308            .catalog_transact(Some(session), vec![op])
3309            .await
3310            .map(|_| ExecuteResponse::AlteredRole)?;
3311
3312        // Send all of our queued notices.
3313        session.add_notices(notices);
3314
3315        Ok(response)
3316    }
3317
3318    #[instrument]
3319    pub(super) async fn sequence_alter_sink_prepare(
3320        &mut self,
3321        ctx: ExecuteContext,
3322        plan: plan::AlterSinkPlan,
3323    ) {
3324        // Put a read hold on the new relation
3325        let id_bundle = crate::CollectionIdBundle {
3326            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3327            compute_ids: BTreeMap::new(),
3328        };
3329        let read_hold = self.acquire_read_holds(&id_bundle);
3330
3331        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3332            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3333            return;
3334        };
3335
3336        let otel_ctx = OpenTelemetryContext::obtain();
3337        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3338
3339        let plan_validity = PlanValidity::new(
3340            self.catalog().transient_revision(),
3341            BTreeSet::from_iter([plan.item_id, from_item_id]),
3342            Some(plan.in_cluster),
3343            None,
3344            ctx.session().role_metadata().clone(),
3345        );
3346
3347        info!(
3348            "preparing alter sink for {}: frontiers={:?} export={:?}",
3349            plan.global_id,
3350            self.controller
3351                .storage_collections
3352                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3353            self.controller.storage.export(plan.global_id)
3354        );
3355
3356        // Now we must wait for the sink to make enough progress such that there is overlap between
3357        // the new `from` collection's read hold and the sink's write frontier.
3358        //
3359        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3360        // the watch set never completes and neither does the `ALTER SINK` command.
3361        self.install_storage_watch_set(
3362            ctx.session().conn_id().clone(),
3363            BTreeSet::from_iter([plan.global_id]),
3364            read_ts,
3365            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3366                ctx: Some(ctx),
3367                otel_ctx,
3368                plan,
3369                plan_validity,
3370                read_hold,
3371            }),
3372        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3373    }
3374
3375    #[instrument]
3376    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3377        ctx.otel_ctx.attach_as_parent();
3378
3379        let plan::AlterSinkPlan {
3380            item_id,
3381            global_id,
3382            sink: sink_plan,
3383            with_snapshot,
3384            in_cluster,
3385        } = ctx.plan.clone();
3386
3387        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3388        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3389        // arbitrarily changed since we performed planning, and we must re-assert that it still
3390        // matches our requirements.
3391        //
3392        // The `PlanValidity` check ensures that both the sink and the new source relation still
3393        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3394        // time, which we do by comparing the catalog sink version to the one in the plan.
3395        match ctx.plan_validity.check(self.catalog()) {
3396            Ok(()) => {}
3397            Err(err) => {
3398                ctx.retire(Err(err));
3399                return;
3400            }
3401        }
3402
3403        let entry = self.catalog().get_entry(&item_id);
3404        let CatalogItem::Sink(old_sink) = entry.item() else {
3405            panic!("invalid item kind for `AlterSinkPlan`");
3406        };
3407
3408        if sink_plan.version != old_sink.version + 1 {
3409            ctx.retire(Err(AdapterError::ChangedPlan(
3410                "sink was altered concurrently".into(),
3411            )));
3412            return;
3413        }
3414
3415        info!(
3416            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3417            self.controller
3418                .storage_collections
3419                .collections_frontiers(vec![global_id, sink_plan.from]),
3420            self.controller.storage.export(global_id),
3421        );
3422
3423        // Assert that we can recover the updates that happened at the timestamps of the write
3424        // frontier. This must be true in this call.
3425        let write_frontier = &self
3426            .controller
3427            .storage
3428            .export(global_id)
3429            .expect("sink known to exist")
3430            .write_frontier;
3431        let as_of = ctx.read_hold.least_valid_read();
3432        assert!(
3433            write_frontier.iter().all(|t| as_of.less_than(t)),
3434            "{:?} should be strictly less than {:?}",
3435            &*as_of,
3436            &**write_frontier
3437        );
3438
3439        // Parse the `create_sql` so we can update it to the new sink definition.
3440        //
3441        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3442        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3443        // names in the `create_sql` may have changed, for example due to a schema swap.
3444        let create_sql = &old_sink.create_sql;
3445        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3446        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3447            unreachable!("invalid statement kind for sink");
3448        };
3449
3450        // Update the sink version.
3451        stmt.with_options
3452            .retain(|o| o.name != CreateSinkOptionName::Version);
3453        stmt.with_options.push(CreateSinkOption {
3454            name: CreateSinkOptionName::Version,
3455            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3456                sink_plan.version.to_string(),
3457            ))),
3458        });
3459
3460        let conn_catalog = self.catalog().for_system_session();
3461        let (mut stmt, resolved_ids) =
3462            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3463
3464        // Update the `from` relation.
3465        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3466        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3467        stmt.from = ResolvedItemName::Item {
3468            id: from_entry.id(),
3469            qualifiers: from_entry.name.qualifiers.clone(),
3470            full_name,
3471            print_id: true,
3472            version: from_entry.version,
3473        };
3474
3475        let new_sink = Sink {
3476            create_sql: stmt.to_ast_string_stable(),
3477            global_id,
3478            from: sink_plan.from,
3479            connection: sink_plan.connection.clone(),
3480            envelope: sink_plan.envelope,
3481            version: sink_plan.version,
3482            with_snapshot,
3483            resolved_ids: resolved_ids.clone(),
3484            cluster_id: in_cluster,
3485            commit_interval: sink_plan.commit_interval,
3486        };
3487
3488        let ops = vec![catalog::Op::UpdateItem {
3489            id: item_id,
3490            name: entry.name().clone(),
3491            to_item: CatalogItem::Sink(new_sink),
3492        }];
3493
3494        match self
3495            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3496            .await
3497        {
3498            Ok(()) => {}
3499            Err(err) => {
3500                ctx.retire(Err(err));
3501                return;
3502            }
3503        }
3504
3505        let storage_sink_desc = StorageSinkDesc {
3506            from: sink_plan.from,
3507            from_desc: from_entry
3508                .relation_desc()
3509                .expect("sinks can only be built on items with descs")
3510                .into_owned(),
3511            connection: sink_plan
3512                .connection
3513                .clone()
3514                .into_inline_connection(self.catalog().state()),
3515            envelope: sink_plan.envelope,
3516            as_of,
3517            with_snapshot,
3518            version: sink_plan.version,
3519            from_storage_metadata: (),
3520            to_storage_metadata: (),
3521            commit_interval: sink_plan.commit_interval,
3522        };
3523
3524        self.controller
3525            .storage
3526            .alter_export(
3527                global_id,
3528                ExportDescription {
3529                    sink: storage_sink_desc,
3530                    instance_id: in_cluster,
3531                },
3532            )
3533            .await
3534            .unwrap_or_terminate("cannot fail to alter source desc");
3535
3536        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3537    }
3538
3539    #[instrument]
3540    pub(super) async fn sequence_alter_connection(
3541        &mut self,
3542        ctx: ExecuteContext,
3543        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3544    ) {
3545        match action {
3546            AlterConnectionAction::RotateKeys => {
3547                self.sequence_rotate_keys(ctx, id).await;
3548            }
3549            AlterConnectionAction::AlterOptions {
3550                set_options,
3551                drop_options,
3552                validate,
3553            } => {
3554                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3555                    .await
3556            }
3557        }
3558    }
3559
3560    #[instrument]
3561    async fn sequence_alter_connection_options(
3562        &mut self,
3563        mut ctx: ExecuteContext,
3564        id: CatalogItemId,
3565        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3566        drop_options: BTreeSet<ConnectionOptionName>,
3567        validate: bool,
3568    ) {
3569        let cur_entry = self.catalog().get_entry(&id);
3570        let cur_conn = cur_entry.connection().expect("known to be connection");
3571        let connection_gid = cur_conn.global_id();
3572
3573        let inner = || -> Result<Connection, AdapterError> {
3574            // Parse statement.
3575            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3576                .expect("invalid create sql persisted to catalog")
3577                .into_element()
3578                .ast
3579            {
3580                Statement::CreateConnection(stmt) => stmt,
3581                _ => unreachable!("proved type is source"),
3582            };
3583
3584            let catalog = self.catalog().for_system_session();
3585
3586            // Resolve items in statement
3587            let (mut create_conn_stmt, resolved_ids) =
3588                mz_sql::names::resolve(&catalog, create_conn_stmt)
3589                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3590
3591            // Retain options that are neither set nor dropped.
3592            create_conn_stmt
3593                .values
3594                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3595
3596            // Set new values
3597            create_conn_stmt.values.extend(
3598                set_options
3599                    .into_iter()
3600                    .map(|(name, value)| ConnectionOption { name, value }),
3601            );
3602
3603            // Open a new catalog, which we will use to re-plan our
3604            // statement with the desired config.
3605            let mut catalog = self.catalog().for_system_session();
3606            catalog.mark_id_unresolvable_for_replanning(id);
3607
3608            // Re-define our source in terms of the amended statement
3609            let plan = match mz_sql::plan::plan(
3610                None,
3611                &catalog,
3612                Statement::CreateConnection(create_conn_stmt),
3613                &Params::empty(),
3614                &resolved_ids,
3615            )
3616            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3617            {
3618                Plan::CreateConnection(plan) => plan,
3619                _ => unreachable!("create source plan is only valid response"),
3620            };
3621
3622            // Parse statement.
3623            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3624                .expect("invalid create sql persisted to catalog")
3625                .into_element()
3626                .ast
3627            {
3628                Statement::CreateConnection(stmt) => stmt,
3629                _ => unreachable!("proved type is source"),
3630            };
3631
3632            let catalog = self.catalog().for_system_session();
3633
3634            // Resolve items in statement
3635            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3636                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3637
3638            Ok(Connection {
3639                create_sql: plan.connection.create_sql,
3640                global_id: cur_conn.global_id,
3641                details: plan.connection.details,
3642                resolved_ids: new_deps,
3643            })
3644        };
3645
3646        let conn = match inner() {
3647            Ok(conn) => conn,
3648            Err(e) => {
3649                return ctx.retire(Err(e));
3650            }
3651        };
3652
3653        if validate {
3654            let connection = conn
3655                .details
3656                .to_connection()
3657                .into_inline_connection(self.catalog().state());
3658
3659            let internal_cmd_tx = self.internal_cmd_tx.clone();
3660            let transient_revision = self.catalog().transient_revision();
3661            let conn_id = ctx.session().conn_id().clone();
3662            let otel_ctx = OpenTelemetryContext::obtain();
3663            let role_metadata = ctx.session().role_metadata().clone();
3664            let current_storage_parameters = self.controller.storage.config().clone();
3665
3666            task::spawn(
3667                || format!("validate_alter_connection:{conn_id}"),
3668                async move {
3669                    let resolved_ids = conn.resolved_ids.clone();
3670                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3671                    let result = match connection.validate(id, &current_storage_parameters).await {
3672                        Ok(()) => Ok(conn),
3673                        Err(err) => Err(err.into()),
3674                    };
3675
3676                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3677                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3678                        AlterConnectionValidationReady {
3679                            ctx,
3680                            result,
3681                            connection_id: id,
3682                            connection_gid,
3683                            plan_validity: PlanValidity::new(
3684                                transient_revision,
3685                                dependency_ids.clone(),
3686                                None,
3687                                None,
3688                                role_metadata,
3689                            ),
3690                            otel_ctx,
3691                            resolved_ids,
3692                        },
3693                    ));
3694                    if let Err(e) = result {
3695                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3696                    }
3697                },
3698            );
3699        } else {
3700            let result = self
3701                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3702                .await;
3703            ctx.retire(result);
3704        }
3705    }
3706
3707    #[instrument]
3708    pub(crate) async fn sequence_alter_connection_stage_finish(
3709        &mut self,
3710        session: &Session,
3711        id: CatalogItemId,
3712        connection: Connection,
3713    ) -> Result<ExecuteResponse, AdapterError> {
3714        match self.catalog.get_entry(&id).item() {
3715            CatalogItem::Connection(curr_conn) => {
3716                curr_conn
3717                    .details
3718                    .to_connection()
3719                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3720                    .map_err(StorageError::from)?;
3721            }
3722            _ => unreachable!("known to be a connection"),
3723        };
3724
3725        let ops = vec![catalog::Op::UpdateItem {
3726            id,
3727            name: self.catalog.get_entry(&id).name().clone(),
3728            to_item: CatalogItem::Connection(connection.clone()),
3729        }];
3730
3731        self.catalog_transact(Some(session), ops).await?;
3732
3733        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3734        // and propagating connection changes to dependent sources, sinks, and
3735        // tables) is handled in `apply_catalog_implications` via
3736        // `handle_alter_connection`. The catalog transact above triggers that
3737        // code path.
3738
3739        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3740    }
3741
3742    #[instrument]
3743    pub(super) async fn sequence_alter_source(
3744        &mut self,
3745        session: &Session,
3746        plan::AlterSourcePlan {
3747            item_id,
3748            ingestion_id,
3749            action,
3750        }: plan::AlterSourcePlan,
3751    ) -> Result<ExecuteResponse, AdapterError> {
3752        let cur_entry = self.catalog().get_entry(&item_id);
3753        let cur_source = cur_entry.source().expect("known to be source");
3754
3755        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3756            // Parse statement.
3757            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3758                .expect("invalid create sql persisted to catalog")
3759                .into_element()
3760                .ast
3761            {
3762                Statement::CreateSource(stmt) => stmt,
3763                _ => unreachable!("proved type is source"),
3764            };
3765
3766            let catalog = coord.catalog().for_system_session();
3767
3768            // Resolve items in statement
3769            mz_sql::names::resolve(&catalog, create_source_stmt)
3770                .map_err(|e| AdapterError::internal(err_cx, e))
3771        };
3772
3773        match action {
3774            plan::AlterSourceAction::AddSubsourceExports {
3775                subsources,
3776                options,
3777            } => {
3778                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3779
3780                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3781                    text_columns: mut new_text_columns,
3782                    exclude_columns: mut new_exclude_columns,
3783                    ..
3784                } = options.try_into()?;
3785
3786                // Resolve items in statement
3787                let (mut create_source_stmt, resolved_ids) =
3788                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3789
3790                // Get all currently referred-to items
3791                let catalog = self.catalog();
3792                let curr_references: BTreeSet<_> = catalog
3793                    .get_entry(&item_id)
3794                    .used_by()
3795                    .into_iter()
3796                    .filter_map(|subsource| {
3797                        catalog
3798                            .get_entry(subsource)
3799                            .subsource_details()
3800                            .map(|(_id, reference, _details)| reference)
3801                    })
3802                    .collect();
3803
3804                // We are doing a lot of unwrapping, so just make an error to reference; all of
3805                // these invariants are guaranteed to be true because of how we plan subsources.
3806                let purification_err =
3807                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3808
3809                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3810                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3811                // statement.
3812                match &mut create_source_stmt.connection {
3813                    CreateSourceConnection::Postgres {
3814                        options: curr_options,
3815                        ..
3816                    } => {
3817                        let mz_sql::plan::PgConfigOptionExtracted {
3818                            mut text_columns, ..
3819                        } = curr_options.clone().try_into()?;
3820
3821                        // Drop text columns; we will add them back in
3822                        // as appropriate below.
3823                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3824
3825                        // Drop all text columns that are not currently referred to.
3826                        text_columns.retain(|column_qualified_reference| {
3827                            mz_ore::soft_assert_eq_or_log!(
3828                                column_qualified_reference.0.len(),
3829                                4,
3830                                "all TEXT COLUMNS values must be column-qualified references"
3831                            );
3832                            let mut table = column_qualified_reference.clone();
3833                            table.0.truncate(3);
3834                            curr_references.contains(&table)
3835                        });
3836
3837                        // Merge the current text columns into the new text columns.
3838                        new_text_columns.extend(text_columns);
3839
3840                        // If we have text columns, add them to the options.
3841                        if !new_text_columns.is_empty() {
3842                            new_text_columns.sort();
3843                            let new_text_columns = new_text_columns
3844                                .into_iter()
3845                                .map(WithOptionValue::UnresolvedItemName)
3846                                .collect();
3847
3848                            curr_options.push(PgConfigOption {
3849                                name: PgConfigOptionName::TextColumns,
3850                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3851                            });
3852                        }
3853                    }
3854                    CreateSourceConnection::MySql {
3855                        options: curr_options,
3856                        ..
3857                    } => {
3858                        let mz_sql::plan::MySqlConfigOptionExtracted {
3859                            mut text_columns,
3860                            mut exclude_columns,
3861                            ..
3862                        } = curr_options.clone().try_into()?;
3863
3864                        // Drop both ignore and text columns; we will add them back in
3865                        // as appropriate below.
3866                        curr_options.retain(|o| {
3867                            !matches!(
3868                                o.name,
3869                                MySqlConfigOptionName::TextColumns
3870                                    | MySqlConfigOptionName::ExcludeColumns
3871                            )
3872                        });
3873
3874                        // Drop all text / exclude columns that are not currently referred to.
3875                        let column_referenced =
3876                            |column_qualified_reference: &UnresolvedItemName| {
3877                                mz_ore::soft_assert_eq_or_log!(
3878                                    column_qualified_reference.0.len(),
3879                                    3,
3880                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3881                                );
3882                                let mut table = column_qualified_reference.clone();
3883                                table.0.truncate(2);
3884                                curr_references.contains(&table)
3885                            };
3886                        text_columns.retain(column_referenced);
3887                        exclude_columns.retain(column_referenced);
3888
3889                        // Merge the current text / exclude columns into the new text / exclude columns.
3890                        new_text_columns.extend(text_columns);
3891                        new_exclude_columns.extend(exclude_columns);
3892
3893                        // If we have text columns, add them to the options.
3894                        if !new_text_columns.is_empty() {
3895                            new_text_columns.sort();
3896                            let new_text_columns = new_text_columns
3897                                .into_iter()
3898                                .map(WithOptionValue::UnresolvedItemName)
3899                                .collect();
3900
3901                            curr_options.push(MySqlConfigOption {
3902                                name: MySqlConfigOptionName::TextColumns,
3903                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3904                            });
3905                        }
3906                        // If we have exclude columns, add them to the options.
3907                        if !new_exclude_columns.is_empty() {
3908                            new_exclude_columns.sort();
3909                            let new_exclude_columns = new_exclude_columns
3910                                .into_iter()
3911                                .map(WithOptionValue::UnresolvedItemName)
3912                                .collect();
3913
3914                            curr_options.push(MySqlConfigOption {
3915                                name: MySqlConfigOptionName::ExcludeColumns,
3916                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3917                            });
3918                        }
3919                    }
3920                    CreateSourceConnection::SqlServer {
3921                        options: curr_options,
3922                        ..
3923                    } => {
3924                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3925                            mut text_columns,
3926                            mut exclude_columns,
3927                            ..
3928                        } = curr_options.clone().try_into()?;
3929
3930                        // Drop both ignore and text columns; we will add them back in
3931                        // as appropriate below.
3932                        curr_options.retain(|o| {
3933                            !matches!(
3934                                o.name,
3935                                SqlServerConfigOptionName::TextColumns
3936                                    | SqlServerConfigOptionName::ExcludeColumns
3937                            )
3938                        });
3939
3940                        // Drop all text / exclude columns that are not currently referred to.
3941                        let column_referenced =
3942                            |column_qualified_reference: &UnresolvedItemName| {
3943                                mz_ore::soft_assert_eq_or_log!(
3944                                    column_qualified_reference.0.len(),
3945                                    3,
3946                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3947                                );
3948                                let mut table = column_qualified_reference.clone();
3949                                table.0.truncate(2);
3950                                curr_references.contains(&table)
3951                            };
3952                        text_columns.retain(column_referenced);
3953                        exclude_columns.retain(column_referenced);
3954
3955                        // Merge the current text / exclude columns into the new text / exclude columns.
3956                        new_text_columns.extend(text_columns);
3957                        new_exclude_columns.extend(exclude_columns);
3958
3959                        // If we have text columns, add them to the options.
3960                        if !new_text_columns.is_empty() {
3961                            new_text_columns.sort();
3962                            let new_text_columns = new_text_columns
3963                                .into_iter()
3964                                .map(WithOptionValue::UnresolvedItemName)
3965                                .collect();
3966
3967                            curr_options.push(SqlServerConfigOption {
3968                                name: SqlServerConfigOptionName::TextColumns,
3969                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3970                            });
3971                        }
3972                        // If we have exclude columns, add them to the options.
3973                        if !new_exclude_columns.is_empty() {
3974                            new_exclude_columns.sort();
3975                            let new_exclude_columns = new_exclude_columns
3976                                .into_iter()
3977                                .map(WithOptionValue::UnresolvedItemName)
3978                                .collect();
3979
3980                            curr_options.push(SqlServerConfigOption {
3981                                name: SqlServerConfigOptionName::ExcludeColumns,
3982                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3983                            });
3984                        }
3985                    }
3986                    _ => return Err(purification_err()),
3987                };
3988
3989                let mut catalog = self.catalog().for_system_session();
3990                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3991
3992                // Re-define our source in terms of the amended statement
3993                let planned = mz_sql::plan::plan(
3994                    None,
3995                    &catalog,
3996                    Statement::CreateSource(create_source_stmt),
3997                    &Params::empty(),
3998                    &resolved_ids,
3999                )
4000                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4001                let plan = match planned {
4002                    Plan::CreateSource(plan) => plan,
4003                    _ => unreachable!("create source plan is only valid response"),
4004                };
4005
4006                // Asserting that we've done the right thing with dependencies
4007                // here requires mocking out objects in the catalog, which is a
4008                // large task for an operation we have to cover in tests anyway.
4009                let source = Source::new(
4010                    plan,
4011                    cur_source.global_id,
4012                    resolved_ids,
4013                    cur_source.custom_logical_compaction_window,
4014                    cur_source.is_retained_metrics_object,
4015                );
4016
4017                // Get new ingestion description for storage.
4018                let desc = match &source.data_source {
4019                    DataSourceDesc::Ingestion { desc, .. }
4020                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4021                        desc.clone().into_inline_connection(self.catalog().state())
4022                    }
4023                    _ => unreachable!("already verified of type ingestion"),
4024                };
4025
4026                self.controller
4027                    .storage
4028                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4029                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4030
4031                // Redefine source. This must be done before we create any new
4032                // subsources so that it has the right ingestion.
4033                let mut ops = vec![catalog::Op::UpdateItem {
4034                    id: item_id,
4035                    // Look this up again so we don't have to hold an immutable reference to the
4036                    // entry for so long.
4037                    name: self.catalog.get_entry(&item_id).name().clone(),
4038                    to_item: CatalogItem::Source(source),
4039                }];
4040
4041                let CreateSourceInner {
4042                    ops: new_ops,
4043                    sources: _,
4044                    if_not_exists_ids,
4045                } = self.create_source_inner(session, subsources).await?;
4046
4047                ops.extend(new_ops.into_iter());
4048
4049                assert!(
4050                    if_not_exists_ids.is_empty(),
4051                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4052                );
4053
4054                self.catalog_transact(Some(session), ops).await?;
4055            }
4056            plan::AlterSourceAction::RefreshReferences { references } => {
4057                self.catalog_transact(
4058                    Some(session),
4059                    vec![catalog::Op::UpdateSourceReferences {
4060                        source_id: item_id,
4061                        references: references.into(),
4062                    }],
4063                )
4064                .await?;
4065            }
4066        }
4067
4068        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4069    }
4070
4071    #[instrument]
4072    pub(super) async fn sequence_alter_system_set(
4073        &mut self,
4074        session: &Session,
4075        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4076    ) -> Result<ExecuteResponse, AdapterError> {
4077        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4078        // We want to ensure that the network policy we're switching too actually exists.
4079        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4080            self.validate_alter_system_network_policy(session, &value)?;
4081        }
4082
4083        let op = match value {
4084            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4085                name: name.clone(),
4086                value: OwnedVarInput::SqlSet(values),
4087            },
4088            plan::VariableValue::Default => {
4089                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4090            }
4091        };
4092        self.catalog_transact(Some(session), vec![op]).await?;
4093
4094        session.add_notice(AdapterNotice::VarDefaultUpdated {
4095            role: None,
4096            var_name: Some(name),
4097        });
4098        Ok(ExecuteResponse::AlteredSystemConfiguration)
4099    }
4100
4101    #[instrument]
4102    pub(super) async fn sequence_alter_system_reset(
4103        &mut self,
4104        session: &Session,
4105        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4106    ) -> Result<ExecuteResponse, AdapterError> {
4107        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4108        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4109        self.catalog_transact(Some(session), vec![op]).await?;
4110        session.add_notice(AdapterNotice::VarDefaultUpdated {
4111            role: None,
4112            var_name: Some(name),
4113        });
4114        Ok(ExecuteResponse::AlteredSystemConfiguration)
4115    }
4116
4117    #[instrument]
4118    pub(super) async fn sequence_alter_system_reset_all(
4119        &mut self,
4120        session: &Session,
4121        _: plan::AlterSystemResetAllPlan,
4122    ) -> Result<ExecuteResponse, AdapterError> {
4123        self.is_user_allowed_to_alter_system(session, None)?;
4124        let op = catalog::Op::ResetAllSystemConfiguration;
4125        self.catalog_transact(Some(session), vec![op]).await?;
4126        session.add_notice(AdapterNotice::VarDefaultUpdated {
4127            role: None,
4128            var_name: None,
4129        });
4130        Ok(ExecuteResponse::AlteredSystemConfiguration)
4131    }
4132
4133    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4134    fn is_user_allowed_to_alter_system(
4135        &self,
4136        session: &Session,
4137        var_name: Option<&str>,
4138    ) -> Result<(), AdapterError> {
4139        match (session.user().kind(), var_name) {
4140            // Only internal superusers can reset all system variables.
4141            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4142            // Whether or not a variable can be modified depends if we're an internal superuser.
4143            (UserKind::Superuser, Some(name))
4144                if session.user().is_internal()
4145                    || self.catalog().system_config().user_modifiable(name) =>
4146            {
4147                // In lieu of plumbing the user to all system config functions, just check that
4148                // the var is visible.
4149                let var = self.catalog().system_config().get(name)?;
4150                var.visible(session.user(), self.catalog().system_config())?;
4151                Ok(())
4152            }
4153            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4154            // session variables.
4155            (UserKind::Regular, Some(name))
4156                if self.catalog().system_config().user_modifiable(name) =>
4157            {
4158                Err(AdapterError::Unauthorized(
4159                    rbac::UnauthorizedError::Superuser {
4160                        action: format!("toggle the '{name}' system configuration parameter"),
4161                    },
4162                ))
4163            }
4164            _ => Err(AdapterError::Unauthorized(
4165                rbac::UnauthorizedError::MzSystem {
4166                    action: "alter system".into(),
4167                },
4168            )),
4169        }
4170    }
4171
4172    fn validate_alter_system_network_policy(
4173        &self,
4174        session: &Session,
4175        policy_value: &plan::VariableValue,
4176    ) -> Result<(), AdapterError> {
4177        let policy_name = match &policy_value {
4178            // Make sure the compiled in default still exists.
4179            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4180            plan::VariableValue::Values(values) if values.len() == 1 => {
4181                values.iter().next().cloned()
4182            }
4183            plan::VariableValue::Values(values) => {
4184                tracing::warn!(?values, "can't set multiple network policies at once");
4185                None
4186            }
4187        };
4188        let maybe_network_policy = policy_name
4189            .as_ref()
4190            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4191        let Some(network_policy) = maybe_network_policy else {
4192            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4193                VarError::InvalidParameterValue {
4194                    name: NETWORK_POLICY.name(),
4195                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4196                    reason: "no network policy with such name exists".to_string(),
4197                },
4198            )));
4199        };
4200        self.validate_alter_network_policy(session, &network_policy.rules)
4201    }
4202
4203    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4204    ///
4205    /// This helps prevent users from modifying network policies in a way that would lock out their
4206    /// current connection.
4207    fn validate_alter_network_policy(
4208        &self,
4209        session: &Session,
4210        policy_rules: &Vec<NetworkPolicyRule>,
4211    ) -> Result<(), AdapterError> {
4212        // If the user is not an internal user attempt to protect them from
4213        // blocking themselves.
4214        if session.user().is_internal() {
4215            return Ok(());
4216        }
4217        if let Some(ip) = session.meta().client_ip() {
4218            validate_ip_with_policy_rules(ip, policy_rules)
4219                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4220        } else {
4221            // Sessions without IPs are only temporarily constructed for default values
4222            // they should not be permitted here.
4223            return Err(AdapterError::NetworkPolicyDenied(
4224                NetworkPolicyError::MissingIp,
4225            ));
4226        }
4227        Ok(())
4228    }
4229
4230    // Returns the name of the portal to execute.
4231    #[instrument]
4232    pub(super) fn sequence_execute(
4233        &self,
4234        session: &mut Session,
4235        plan: plan::ExecutePlan,
4236    ) -> Result<String, AdapterError> {
4237        // Verify the stmt is still valid.
4238        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4239        let ps = session
4240            .get_prepared_statement_unverified(&plan.name)
4241            .expect("known to exist");
4242        let stmt = ps.stmt().cloned();
4243        let desc = ps.desc().clone();
4244        let state_revision = ps.state_revision;
4245        let logging = Arc::clone(ps.logging());
4246        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4247    }
4248
4249    #[instrument]
4250    pub(super) async fn sequence_grant_privileges(
4251        &mut self,
4252        session: &Session,
4253        plan::GrantPrivilegesPlan {
4254            update_privileges,
4255            grantees,
4256        }: plan::GrantPrivilegesPlan,
4257    ) -> Result<ExecuteResponse, AdapterError> {
4258        self.sequence_update_privileges(
4259            session,
4260            update_privileges,
4261            grantees,
4262            UpdatePrivilegeVariant::Grant,
4263        )
4264        .await
4265    }
4266
4267    #[instrument]
4268    pub(super) async fn sequence_revoke_privileges(
4269        &mut self,
4270        session: &Session,
4271        plan::RevokePrivilegesPlan {
4272            update_privileges,
4273            revokees,
4274        }: plan::RevokePrivilegesPlan,
4275    ) -> Result<ExecuteResponse, AdapterError> {
4276        self.sequence_update_privileges(
4277            session,
4278            update_privileges,
4279            revokees,
4280            UpdatePrivilegeVariant::Revoke,
4281        )
4282        .await
4283    }
4284
4285    #[instrument]
4286    async fn sequence_update_privileges(
4287        &mut self,
4288        session: &Session,
4289        update_privileges: Vec<UpdatePrivilege>,
4290        grantees: Vec<RoleId>,
4291        variant: UpdatePrivilegeVariant,
4292    ) -> Result<ExecuteResponse, AdapterError> {
4293        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4294        let mut warnings = Vec::new();
4295        let catalog = self.catalog().for_session(session);
4296
4297        for UpdatePrivilege {
4298            acl_mode,
4299            target_id,
4300            grantor,
4301        } in update_privileges
4302        {
4303            let actual_object_type = catalog.get_system_object_type(&target_id);
4304            // For all relations we allow all applicable table privileges, but send a warning if the
4305            // privilege isn't actually applicable to the object type.
4306            if actual_object_type.is_relation() {
4307                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4308                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4309                if !non_applicable_privileges.is_empty() {
4310                    let object_description =
4311                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4312                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4313                        non_applicable_privileges,
4314                        object_description,
4315                    })
4316                }
4317            }
4318
4319            if let SystemObjectId::Object(object_id) = &target_id {
4320                self.catalog()
4321                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4322            }
4323
4324            let privileges = self
4325                .catalog()
4326                .get_privileges(&target_id, session.conn_id())
4327                // Should be unreachable since the parser will refuse to parse grant/revoke
4328                // statements on objects without privileges.
4329                .ok_or(AdapterError::Unsupported(
4330                    "GRANTs/REVOKEs on an object type with no privileges",
4331                ))?;
4332
4333            for grantee in &grantees {
4334                self.catalog().ensure_not_system_role(grantee)?;
4335                self.catalog().ensure_not_predefined_role(grantee)?;
4336                let existing_privilege = privileges
4337                    .get_acl_item(grantee, &grantor)
4338                    .map(Cow::Borrowed)
4339                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4340
4341                match variant {
4342                    UpdatePrivilegeVariant::Grant
4343                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4344                    {
4345                        ops.push(catalog::Op::UpdatePrivilege {
4346                            target_id: target_id.clone(),
4347                            privilege: MzAclItem {
4348                                grantee: *grantee,
4349                                grantor,
4350                                acl_mode,
4351                            },
4352                            variant,
4353                        });
4354                    }
4355                    UpdatePrivilegeVariant::Revoke
4356                        if !existing_privilege
4357                            .acl_mode
4358                            .intersection(acl_mode)
4359                            .is_empty() =>
4360                    {
4361                        ops.push(catalog::Op::UpdatePrivilege {
4362                            target_id: target_id.clone(),
4363                            privilege: MzAclItem {
4364                                grantee: *grantee,
4365                                grantor,
4366                                acl_mode,
4367                            },
4368                            variant,
4369                        });
4370                    }
4371                    // no-op
4372                    _ => {}
4373                }
4374            }
4375        }
4376
4377        if ops.is_empty() {
4378            session.add_notices(warnings);
4379            return Ok(variant.into());
4380        }
4381
4382        let res = self
4383            .catalog_transact(Some(session), ops)
4384            .await
4385            .map(|_| match variant {
4386                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4387                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4388            });
4389        if res.is_ok() {
4390            session.add_notices(warnings);
4391        }
4392        res
4393    }
4394
4395    #[instrument]
4396    pub(super) async fn sequence_alter_default_privileges(
4397        &mut self,
4398        session: &Session,
4399        plan::AlterDefaultPrivilegesPlan {
4400            privilege_objects,
4401            privilege_acl_items,
4402            is_grant,
4403        }: plan::AlterDefaultPrivilegesPlan,
4404    ) -> Result<ExecuteResponse, AdapterError> {
4405        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4406        let variant = if is_grant {
4407            UpdatePrivilegeVariant::Grant
4408        } else {
4409            UpdatePrivilegeVariant::Revoke
4410        };
4411        for privilege_object in &privilege_objects {
4412            self.catalog()
4413                .ensure_not_system_role(&privilege_object.role_id)?;
4414            self.catalog()
4415                .ensure_not_predefined_role(&privilege_object.role_id)?;
4416            if let Some(database_id) = privilege_object.database_id {
4417                self.catalog()
4418                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4419            }
4420            if let Some(schema_id) = privilege_object.schema_id {
4421                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4422                let schema_spec: SchemaSpecifier = schema_id.into();
4423
4424                self.catalog().ensure_not_reserved_object(
4425                    &(database_spec, schema_spec).into(),
4426                    session.conn_id(),
4427                )?;
4428            }
4429            for privilege_acl_item in &privilege_acl_items {
4430                self.catalog()
4431                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4432                self.catalog()
4433                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4434                ops.push(catalog::Op::UpdateDefaultPrivilege {
4435                    privilege_object: privilege_object.clone(),
4436                    privilege_acl_item: privilege_acl_item.clone(),
4437                    variant,
4438                })
4439            }
4440        }
4441
4442        self.catalog_transact(Some(session), ops).await?;
4443        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4444    }
4445
4446    #[instrument]
4447    pub(super) async fn sequence_grant_role(
4448        &mut self,
4449        session: &Session,
4450        plan::GrantRolePlan {
4451            role_ids,
4452            member_ids,
4453            grantor_id,
4454        }: plan::GrantRolePlan,
4455    ) -> Result<ExecuteResponse, AdapterError> {
4456        let catalog = self.catalog();
4457        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4458        for role_id in role_ids {
4459            for member_id in &member_ids {
4460                let member_membership: BTreeSet<_> =
4461                    catalog.get_role(member_id).membership().keys().collect();
4462                if member_membership.contains(&role_id) {
4463                    let role_name = catalog.get_role(&role_id).name().to_string();
4464                    let member_name = catalog.get_role(member_id).name().to_string();
4465                    // We need this check so we don't accidentally return a success on a reserved role.
4466                    catalog.ensure_not_reserved_role(member_id)?;
4467                    catalog.ensure_grantable_role(&role_id)?;
4468                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4469                        role_name,
4470                        member_name,
4471                    });
4472                } else {
4473                    ops.push(catalog::Op::GrantRole {
4474                        role_id,
4475                        member_id: *member_id,
4476                        grantor_id,
4477                    });
4478                }
4479            }
4480        }
4481
4482        if ops.is_empty() {
4483            return Ok(ExecuteResponse::GrantedRole);
4484        }
4485
4486        self.catalog_transact(Some(session), ops)
4487            .await
4488            .map(|_| ExecuteResponse::GrantedRole)
4489    }
4490
4491    #[instrument]
4492    pub(super) async fn sequence_revoke_role(
4493        &mut self,
4494        session: &Session,
4495        plan::RevokeRolePlan {
4496            role_ids,
4497            member_ids,
4498            grantor_id,
4499        }: plan::RevokeRolePlan,
4500    ) -> Result<ExecuteResponse, AdapterError> {
4501        let catalog = self.catalog();
4502        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4503        for role_id in role_ids {
4504            for member_id in &member_ids {
4505                let member_membership: BTreeSet<_> =
4506                    catalog.get_role(member_id).membership().keys().collect();
4507                if !member_membership.contains(&role_id) {
4508                    let role_name = catalog.get_role(&role_id).name().to_string();
4509                    let member_name = catalog.get_role(member_id).name().to_string();
4510                    // We need this check so we don't accidentally return a success on a reserved role.
4511                    catalog.ensure_not_reserved_role(member_id)?;
4512                    catalog.ensure_grantable_role(&role_id)?;
4513                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4514                        role_name,
4515                        member_name,
4516                    });
4517                } else {
4518                    ops.push(catalog::Op::RevokeRole {
4519                        role_id,
4520                        member_id: *member_id,
4521                        grantor_id,
4522                    });
4523                }
4524            }
4525        }
4526
4527        if ops.is_empty() {
4528            return Ok(ExecuteResponse::RevokedRole);
4529        }
4530
4531        self.catalog_transact(Some(session), ops)
4532            .await
4533            .map(|_| ExecuteResponse::RevokedRole)
4534    }
4535
4536    #[instrument]
4537    pub(super) async fn sequence_alter_owner(
4538        &mut self,
4539        session: &Session,
4540        plan::AlterOwnerPlan {
4541            id,
4542            object_type,
4543            new_owner,
4544        }: plan::AlterOwnerPlan,
4545    ) -> Result<ExecuteResponse, AdapterError> {
4546        let mut ops = vec![catalog::Op::UpdateOwner {
4547            id: id.clone(),
4548            new_owner,
4549        }];
4550
4551        match &id {
4552            ObjectId::Item(global_id) => {
4553                let entry = self.catalog().get_entry(global_id);
4554
4555                // Cannot directly change the owner of an index.
4556                if entry.is_index() {
4557                    let name = self
4558                        .catalog()
4559                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4560                        .to_string();
4561                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4562                    return Ok(ExecuteResponse::AlteredObject(object_type));
4563                }
4564
4565                // Alter owner cascades down to dependent indexes.
4566                let dependent_index_ops = entry
4567                    .used_by()
4568                    .into_iter()
4569                    .filter(|id| self.catalog().get_entry(id).is_index())
4570                    .map(|id| catalog::Op::UpdateOwner {
4571                        id: ObjectId::Item(*id),
4572                        new_owner,
4573                    });
4574                ops.extend(dependent_index_ops);
4575
4576                // Alter owner cascades down to progress collections.
4577                let dependent_subsources =
4578                    entry
4579                        .progress_id()
4580                        .into_iter()
4581                        .map(|item_id| catalog::Op::UpdateOwner {
4582                            id: ObjectId::Item(item_id),
4583                            new_owner,
4584                        });
4585                ops.extend(dependent_subsources);
4586            }
4587            ObjectId::Cluster(cluster_id) => {
4588                let cluster = self.catalog().get_cluster(*cluster_id);
4589                // Alter owner cascades down to cluster replicas.
4590                let managed_cluster_replica_ops =
4591                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4592                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4593                        new_owner,
4594                    });
4595                ops.extend(managed_cluster_replica_ops);
4596            }
4597            _ => {}
4598        }
4599
4600        self.catalog_transact(Some(session), ops)
4601            .await
4602            .map(|_| ExecuteResponse::AlteredObject(object_type))
4603    }
4604
4605    #[instrument]
4606    pub(super) async fn sequence_reassign_owned(
4607        &mut self,
4608        session: &Session,
4609        plan::ReassignOwnedPlan {
4610            old_roles,
4611            new_role,
4612            reassign_ids,
4613        }: plan::ReassignOwnedPlan,
4614    ) -> Result<ExecuteResponse, AdapterError> {
4615        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4616            self.catalog().ensure_not_reserved_role(role_id)?;
4617        }
4618
4619        let ops = reassign_ids
4620            .into_iter()
4621            .map(|id| catalog::Op::UpdateOwner {
4622                id,
4623                new_owner: new_role,
4624            })
4625            .collect();
4626
4627        self.catalog_transact(Some(session), ops)
4628            .await
4629            .map(|_| ExecuteResponse::ReassignOwned)
4630    }
4631
4632    #[instrument]
4633    pub(crate) async fn handle_deferred_statement(&mut self) {
4634        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4635        // was processed, removing the single element from deferred_statements, so it is expected
4636        // that this is sometimes empty.
4637        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4638            return;
4639        };
4640        match ps {
4641            crate::coord::PlanStatement::Statement { stmt, params } => {
4642                self.handle_execute_inner(stmt, params, ctx).await;
4643            }
4644            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4645                self.sequence_plan(ctx, plan, resolved_ids).await;
4646            }
4647        }
4648    }
4649
4650    #[instrument]
4651    // TODO(parkmycar): Remove this once we have an actual implementation.
4652    #[allow(clippy::unused_async)]
4653    pub(super) async fn sequence_alter_table(
4654        &mut self,
4655        ctx: &mut ExecuteContext,
4656        plan: plan::AlterTablePlan,
4657    ) -> Result<ExecuteResponse, AdapterError> {
4658        let plan::AlterTablePlan {
4659            relation_id,
4660            column_name,
4661            column_type,
4662            raw_sql_type,
4663        } = plan;
4664
4665        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4666        let (_, new_global_id) = self.allocate_user_id().await?;
4667        let ops = vec![catalog::Op::AlterAddColumn {
4668            id: relation_id,
4669            new_global_id,
4670            name: column_name,
4671            typ: column_type,
4672            sql: raw_sql_type,
4673        }];
4674
4675        self.catalog_transact_with_context(None, Some(ctx), ops)
4676            .await?;
4677
4678        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4679    }
4680
4681    /// Prepares to apply a replacement materialized view.
4682    #[instrument]
4683    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4684        &mut self,
4685        ctx: ExecuteContext,
4686        plan: AlterMaterializedViewApplyReplacementPlan,
4687    ) {
4688        // To ensure there is no time gap in the output, we can only apply a replacement if the
4689        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4690        // might not be the case initially, so we have to wait. To this end, we install a watch set
4691        // waiting for the target MV's write frontier to advance sufficiently.
4692        //
4693        // Note that the replacement's dataflow is not performing any writes, so it can only be
4694        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4695        // replacement's write frontier is always <= the target's.
4696
4697        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4698
4699        let plan_validity = PlanValidity::new(
4700            self.catalog().transient_revision(),
4701            BTreeSet::from_iter([id, replacement_id]),
4702            None,
4703            None,
4704            ctx.session().role_metadata().clone(),
4705        );
4706
4707        let target = self.catalog.get_entry(&id);
4708        let target_gid = target.latest_global_id();
4709
4710        let replacement = self.catalog.get_entry(&replacement_id);
4711        let replacement_gid = replacement.latest_global_id();
4712
4713        let target_upper = self
4714            .controller
4715            .storage_collections
4716            .collection_frontiers(target_gid)
4717            .expect("target MV exists")
4718            .write_frontier;
4719        let replacement_upper = self
4720            .controller
4721            .compute
4722            .collection_frontiers(replacement_gid, replacement.cluster_id())
4723            .expect("replacement MV exists")
4724            .write_frontier;
4725
4726        info!(
4727            %id, %replacement_id, ?target_upper, ?replacement_upper,
4728            "preparing materialized view replacement application",
4729        );
4730
4731        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4732            // A replacement's write frontier can only become empty if the target's write frontier
4733            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4734            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4735            // the user by returning an error, rather than applying the useless replacement.
4736            //
4737            // Note that we can't assert on `target_upper` being empty here, because the reporting
4738            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4739            // from persist, which we cannot do without incurring I/O.
4740            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4741                name: target.name().item.clone(),
4742            }));
4743            return;
4744        };
4745
4746        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4747        // specified timestamp. Since we only need to wait until the target frontier is >= the
4748        // replacement's frontier, we can step back the timestamp.
4749        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4750
4751        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4752        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4753        // command.
4754        self.install_storage_watch_set(
4755            ctx.session().conn_id().clone(),
4756            BTreeSet::from_iter([target_gid]),
4757            replacement_upper_ts,
4758            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4759                ctx: Some(ctx),
4760                otel_ctx: OpenTelemetryContext::obtain(),
4761                plan,
4762                plan_validity,
4763            }),
4764        )
4765        .expect("target collection exists");
4766    }
4767
4768    /// Finishes applying a replacement materialized view after the frontier wait completed.
4769    #[instrument]
4770    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4771        &mut self,
4772        mut ctx: AlterMaterializedViewReadyContext,
4773    ) {
4774        ctx.otel_ctx.attach_as_parent();
4775
4776        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4777
4778        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4779        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4780        // world has arbitrarily changed since we performed planning, and we must re-assert
4781        // that it still matches our requirements.
4782        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4783            ctx.retire(Err(err));
4784            return;
4785        }
4786
4787        info!(
4788            %id, %replacement_id,
4789            "finishing materialized view replacement application",
4790        );
4791
4792        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4793        match self
4794            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4795            .await
4796        {
4797            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4798                ObjectType::MaterializedView,
4799            ))),
4800            Err(err) => ctx.retire(Err(err)),
4801        }
4802    }
4803
4804    pub(super) async fn statistics_oracle(
4805        &self,
4806        session: &Session,
4807        source_ids: &BTreeSet<GlobalId>,
4808        query_as_of: &Antichain<Timestamp>,
4809        is_oneshot: bool,
4810    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4811        super::statistics_oracle(
4812            session,
4813            source_ids,
4814            query_as_of,
4815            is_oneshot,
4816            self.catalog().system_config(),
4817            self.controller.storage_collections.as_ref(),
4818        )
4819        .await
4820    }
4821}
4822
4823impl Coordinator {
4824    /// Process the metainfo from a newly created non-transient dataflow.
4825    async fn process_dataflow_metainfo(
4826        &mut self,
4827        df_meta: DataflowMetainfo,
4828        export_id: GlobalId,
4829        ctx: Option<&mut ExecuteContext>,
4830        notice_ids: Vec<GlobalId>,
4831    ) -> Option<BuiltinTableAppendNotify> {
4832        // Emit raw notices to the user.
4833        if let Some(ctx) = ctx {
4834            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4835        }
4836
4837        // Create a metainfo with rendered notices.
4838        let df_meta = self
4839            .catalog()
4840            .render_notices(df_meta, notice_ids, Some(export_id));
4841
4842        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4843        // in-memory state.
4844        if self.catalog().state().system_config().enable_mz_notices()
4845            && !df_meta.optimizer_notices.is_empty()
4846        {
4847            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4848            self.catalog().state().pack_optimizer_notices(
4849                &mut builtin_table_updates,
4850                df_meta.optimizer_notices.iter(),
4851                Diff::ONE,
4852            );
4853
4854            // Save the metainfo.
4855            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4856
4857            Some(
4858                self.builtin_table_update()
4859                    .execute(builtin_table_updates)
4860                    .await
4861                    .0,
4862            )
4863        } else {
4864            // Save the metainfo.
4865            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4866
4867            None
4868        }
4869    }
4870}