Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument, soft_assert_or_log};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44    RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49    SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
55    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59    SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63    StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_storage_types::sources::GenericSourceConnection;
68// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
69use mz_sql::plan::{
70    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
78    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85    WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::ExportDescription;
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_transform::dataflow::DataflowMetainfo;
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
101use crate::coord::sequencer::emit_optimizer_notices;
102use crate::coord::{
103    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
104    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
105    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
106    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
107    WatchSetResponse, validate_ip_with_policy_rules,
108};
109use crate::error::AdapterError;
110use crate::notice::{AdapterNotice, DroppedInUseIndex};
111use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
112use crate::optimize::{self, Optimize};
113use crate::session::{
114    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
115    WriteLocks, WriteOp,
116};
117use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
118use crate::{PeekResponseUnary, ReadHolds};
119
120/// A future that resolves to a real-time recency timestamp.
121type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
122
123mod cluster;
124mod copy_from;
125mod create_continual_task;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134/// Attempts to evaluate an expression. If an error is returned then the error is sent
135/// to the client and the function is exited.
136macro_rules! return_if_err {
137    ($expr:expr, $ctx:expr) => {
138        match $expr {
139            Ok(v) => v,
140            Err(e) => return $ctx.retire(Err(e.into())),
141        }
142    };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148    ops: Vec<catalog::Op>,
149    dropped_active_db: bool,
150    dropped_active_cluster: bool,
151    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154// A bundle of values returned from create_source_inner
155struct CreateSourceInner {
156    ops: Vec<catalog::Op>,
157    sources: Vec<(CatalogItemId, Source)>,
158    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
163    /// execute both on and off of the coordinator thread. Stages can either produce another stage
164    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
165    /// tracing.
166    pub(crate) async fn sequence_staged<S>(
167        &mut self,
168        mut ctx: S::Ctx,
169        parent_span: Span,
170        mut stage: S,
171    ) where
172        S: Staged + 'static,
173        S::Ctx: Send + 'static,
174    {
175        return_if_err!(stage.validity().check(self.catalog()), ctx);
176        loop {
177            let mut cancel_enabled = stage.cancel_enabled();
178            if let Some(session) = ctx.session() {
179                if cancel_enabled {
180                    // Channel to await cancellation. Insert a new channel, but check if the previous one
181                    // was already canceled.
182                    if let Some((_prev_tx, prev_rx)) = self
183                        .staged_cancellation
184                        .insert(session.conn_id().clone(), watch::channel(false))
185                    {
186                        let was_canceled = *prev_rx.borrow();
187                        if was_canceled {
188                            ctx.retire(Err(AdapterError::Canceled));
189                            return;
190                        }
191                    }
192                } else {
193                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
194                    // when cancel_enabled may have been true on an earlier stage.
195                    self.staged_cancellation.remove(session.conn_id());
196                }
197            } else {
198                cancel_enabled = false
199            };
200            let next = stage
201                .stage(self, &mut ctx)
202                .instrument(parent_span.clone())
203                .await;
204            let res = return_if_err!(next, ctx);
205            stage = match res {
206                StageResult::Handle(handle) => {
207                    let internal_cmd_tx = self.internal_cmd_tx.clone();
208                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210                    });
211                    return;
212                }
213                StageResult::HandleRetire(handle) => {
214                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215                        ctx.retire(Ok(resp));
216                    });
217                    return;
218                }
219                StageResult::Response(resp) => {
220                    ctx.retire(Ok(resp));
221                    return;
222                }
223                StageResult::Immediate(stage) => *stage,
224            }
225        }
226    }
227
228    fn handle_spawn<C, T, F>(
229        &self,
230        ctx: C,
231        handle: JoinHandle<Result<T, AdapterError>>,
232        cancel_enabled: bool,
233        f: F,
234    ) where
235        C: StagedContext + Send + 'static,
236        T: Send + 'static,
237        F: FnOnce(C, T) + Send + 'static,
238    {
239        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240            .session()
241            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242        {
243            let mut rx = rx.clone();
244            Box::pin(async move {
245                // Wait for true or dropped sender.
246                let _ = rx.wait_for(|v| *v).await;
247                ()
248            })
249        } else {
250            Box::pin(future::pending())
251        };
252        spawn(|| "sequence_staged", async move {
253            tokio::select! {
254                res = handle => {
255                    let next = return_if_err!(res, ctx);
256                    f(ctx, next);
257                }
258                _ = rx, if cancel_enabled => {
259                    ctx.retire(Err(AdapterError::Canceled));
260                }
261            }
262        });
263    }
264
265    async fn create_source_inner(
266        &self,
267        session: &Session,
268        plans: Vec<plan::CreateSourcePlanBundle>,
269    ) -> Result<CreateSourceInner, AdapterError> {
270        let mut ops = vec![];
271        let mut sources = vec![];
272
273        let if_not_exists_ids = plans
274            .iter()
275            .filter_map(
276                |plan::CreateSourcePlanBundle {
277                     item_id,
278                     global_id: _,
279                     plan,
280                     resolved_ids: _,
281                     available_source_references: _,
282                 }| {
283                    if plan.if_not_exists {
284                        Some((*item_id, plan.name.clone()))
285                    } else {
286                        None
287                    }
288                },
289            )
290            .collect::<BTreeMap<_, _>>();
291
292        for plan::CreateSourcePlanBundle {
293            item_id,
294            global_id,
295            mut plan,
296            resolved_ids,
297            available_source_references,
298        } in plans
299        {
300            let name = plan.name.clone();
301
302            match plan.source.data_source {
303                plan::DataSourceDesc::Ingestion(ref desc)
304                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
305                    let cluster_id = plan
306                        .in_cluster
307                        .expect("ingestion plans must specify cluster");
308                    match desc.connection {
309                        GenericSourceConnection::Postgres(_)
310                        | GenericSourceConnection::MySql(_)
311                        | GenericSourceConnection::SqlServer(_)
312                        | GenericSourceConnection::Kafka(_)
313                        | GenericSourceConnection::LoadGenerator(_) => {
314                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
315                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
316                                    .get(self.catalog().system_config().dyncfgs());
317
318                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
319                                {
320                                    return Err(AdapterError::Unsupported(
321                                        "sources in clusters with >1 replicas",
322                                    ));
323                                }
324                            }
325                        }
326                    }
327                }
328                plan::DataSourceDesc::Webhook { .. } => {
329                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
330                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
331                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
332                            .get(self.catalog().system_config().dyncfgs());
333
334                        if !enable_multi_replica_sources {
335                            if cluster.replica_ids().len() > 1 {
336                                return Err(AdapterError::Unsupported(
337                                    "webhook sources in clusters with >1 replicas",
338                                ));
339                            }
340                        }
341                    }
342                }
343                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
344            }
345
346            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
347            if let mz_sql::plan::DataSourceDesc::Webhook {
348                validate_using: Some(validate),
349                ..
350            } = &mut plan.source.data_source
351            {
352                if let Err(reason) = validate.reduce_expression().await {
353                    self.metrics
354                        .webhook_validation_reduce_failures
355                        .with_label_values(&[reason])
356                        .inc();
357                    return Err(AdapterError::Internal(format!(
358                        "failed to reduce check expression, {reason}"
359                    )));
360                }
361            }
362
363            // If this source contained a set of available source references, update the
364            // source references catalog table.
365            let mut reference_ops = vec![];
366            if let Some(references) = &available_source_references {
367                reference_ops.push(catalog::Op::UpdateSourceReferences {
368                    source_id: item_id,
369                    references: references.clone().into(),
370                });
371            }
372
373            let source = Source::new(plan, global_id, resolved_ids, None, false);
374            ops.push(catalog::Op::CreateItem {
375                id: item_id,
376                name,
377                item: CatalogItem::Source(source.clone()),
378                owner_id: *session.current_role_id(),
379            });
380            sources.push((item_id, source));
381            // These operations must be executed after the source is added to the catalog.
382            ops.extend(reference_ops);
383        }
384
385        Ok(CreateSourceInner {
386            ops,
387            sources,
388            if_not_exists_ids,
389        })
390    }
391
392    /// Subsources are planned differently from other statements because they
393    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
394    /// Because of this, we have usually "missed" the opportunity to plan them
395    /// through the normal statement execution life cycle (the exception being
396    /// during bootstrapping).
397    ///
398    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
399    pub(crate) fn plan_subsource(
400        &self,
401        session: &Session,
402        params: &mz_sql::plan::Params,
403        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
404        item_id: CatalogItemId,
405        global_id: GlobalId,
406    ) -> Result<CreateSourcePlanBundle, AdapterError> {
407        let catalog = self.catalog().for_session(session);
408        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
409
410        let plan = self.plan_statement(
411            session,
412            Statement::CreateSubsource(subsource_stmt),
413            params,
414            &resolved_ids,
415        )?;
416        let plan = match plan {
417            Plan::CreateSource(plan) => plan,
418            _ => unreachable!(),
419        };
420        Ok(CreateSourcePlanBundle {
421            item_id,
422            global_id,
423            plan,
424            resolved_ids,
425            available_source_references: None,
426        })
427    }
428
429    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
430    pub(crate) async fn plan_purified_alter_source_add_subsource(
431        &mut self,
432        session: &Session,
433        params: Params,
434        source_name: ResolvedItemName,
435        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
436        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
437    ) -> Result<(Plan, ResolvedIds), AdapterError> {
438        let mut subsource_plans = Vec::with_capacity(subsources.len());
439
440        // Generate subsource statements
441        let conn_catalog = self.catalog().for_system_session();
442        let pcx = plan::PlanContext::zero();
443        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
444
445        let entry = self.catalog().get_entry(source_name.item_id());
446        let source = entry.source().ok_or_else(|| {
447            AdapterError::internal(
448                "plan alter source",
449                format!("expected Source found {entry:?}"),
450            )
451        })?;
452
453        let item_id = entry.id();
454        let ingestion_id = source.global_id();
455        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
456
457        let ids = self
458            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
459            .await?;
460        for (subsource_stmt, (item_id, global_id)) in
461            subsource_stmts.into_iter().zip_eq(ids.into_iter())
462        {
463            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
464            subsource_plans.push(s);
465        }
466
467        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
468            subsources: subsource_plans,
469            options,
470        };
471
472        Ok((
473            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
474                item_id,
475                ingestion_id,
476                action,
477            }),
478            ResolvedIds::empty(),
479        ))
480    }
481
482    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
483    pub(crate) fn plan_purified_alter_source_refresh_references(
484        &self,
485        _session: &Session,
486        _params: Params,
487        source_name: ResolvedItemName,
488        available_source_references: plan::SourceReferences,
489    ) -> Result<(Plan, ResolvedIds), AdapterError> {
490        let entry = self.catalog().get_entry(source_name.item_id());
491        let source = entry.source().ok_or_else(|| {
492            AdapterError::internal(
493                "plan alter source",
494                format!("expected Source found {entry:?}"),
495            )
496        })?;
497        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
498            references: available_source_references,
499        };
500
501        Ok((
502            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
503                item_id: entry.id(),
504                ingestion_id: source.global_id(),
505                action,
506            }),
507            ResolvedIds::empty(),
508        ))
509    }
510
511    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
512    /// the primary source, and any ingestion export subsources (e.g. PG
513    /// tables).
514    pub(crate) async fn plan_purified_create_source(
515        &mut self,
516        ctx: &ExecuteContext,
517        params: Params,
518        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
519        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
520        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
521        available_source_references: plan::SourceReferences,
522    ) -> Result<(Plan, ResolvedIds), AdapterError> {
523        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
524
525        // 1. First plan the progress subsource, if any.
526        if let Some(progress_stmt) = progress_stmt {
527            // The primary source depends on this subsource because the primary
528            // source needs to know its shard ID, and the easiest way of
529            // guaranteeing that the shard ID is discoverable is to create this
530            // collection first.
531            assert_none!(progress_stmt.of_source);
532            let (item_id, global_id) = self.allocate_user_id().await?;
533            let progress_plan =
534                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
535            let progress_full_name = self
536                .catalog()
537                .resolve_full_name(&progress_plan.plan.name, None);
538            let progress_subsource = ResolvedItemName::Item {
539                id: progress_plan.item_id,
540                qualifiers: progress_plan.plan.name.qualifiers.clone(),
541                full_name: progress_full_name,
542                print_id: true,
543                version: RelationVersionSelector::Latest,
544            };
545
546            create_source_plans.push(progress_plan);
547
548            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
549        }
550
551        let catalog = self.catalog().for_session(ctx.session());
552        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
553
554        let propagated_with_options: Vec<_> = source_stmt
555            .with_options
556            .iter()
557            .filter_map(|opt| match opt.name {
558                CreateSourceOptionName::TimestampInterval => None,
559                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
560                    name: CreateSubsourceOptionName::RetainHistory,
561                    value: opt.value.clone(),
562                }),
563            })
564            .collect();
565
566        // 2. Then plan the main source.
567        let source_plan = match self.plan_statement(
568            ctx.session(),
569            Statement::CreateSource(source_stmt),
570            &params,
571            &resolved_ids,
572        )? {
573            Plan::CreateSource(plan) => plan,
574            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
575        };
576
577        let (item_id, global_id) = self.allocate_user_id().await?;
578
579        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
580        let of_source = ResolvedItemName::Item {
581            id: item_id,
582            qualifiers: source_plan.name.qualifiers.clone(),
583            full_name: source_full_name,
584            print_id: true,
585            version: RelationVersionSelector::Latest,
586        };
587
588        // Generate subsource statements
589        let conn_catalog = self.catalog().for_system_session();
590        let pcx = plan::PlanContext::zero();
591        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
592
593        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
594
595        for subsource_stmt in subsource_stmts.iter_mut() {
596            subsource_stmt
597                .with_options
598                .extend(propagated_with_options.iter().cloned())
599        }
600
601        create_source_plans.push(CreateSourcePlanBundle {
602            item_id,
603            global_id,
604            plan: source_plan,
605            resolved_ids: resolved_ids.clone(),
606            available_source_references: Some(available_source_references),
607        });
608
609        // 3. Finally, plan all the subsources
610        let ids = self
611            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
612            .await?;
613        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
614            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
615            create_source_plans.push(plan);
616        }
617
618        Ok((
619            Plan::CreateSources(create_source_plans),
620            ResolvedIds::empty(),
621        ))
622    }
623
624    #[instrument]
625    pub(super) async fn sequence_create_source(
626        &mut self,
627        ctx: &mut ExecuteContext,
628        plans: Vec<plan::CreateSourcePlanBundle>,
629    ) -> Result<ExecuteResponse, AdapterError> {
630        let CreateSourceInner {
631            ops,
632            sources,
633            if_not_exists_ids,
634        } = self.create_source_inner(ctx.session(), plans).await?;
635
636        let transact_result = self
637            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
638            .await;
639
640        // Check if any sources are webhook sources and report them as created.
641        for (item_id, source) in &sources {
642            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
643                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
644                    ctx.session()
645                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
646                }
647            }
648        }
649
650        match transact_result {
651            Ok(()) => Ok(ExecuteResponse::CreatedSource),
652            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
653                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
654            })) if if_not_exists_ids.contains_key(&id) => {
655                ctx.session()
656                    .add_notice(AdapterNotice::ObjectAlreadyExists {
657                        name: if_not_exists_ids[&id].item.clone(),
658                        ty: "source",
659                    });
660                Ok(ExecuteResponse::CreatedSource)
661            }
662            Err(err) => Err(err),
663        }
664    }
665
666    #[instrument]
667    pub(super) async fn sequence_create_connection(
668        &mut self,
669        mut ctx: ExecuteContext,
670        plan: plan::CreateConnectionPlan,
671        resolved_ids: ResolvedIds,
672    ) {
673        let (connection_id, connection_gid) = match self.allocate_user_id().await {
674            Ok(item_id) => item_id,
675            Err(err) => return ctx.retire(Err(err)),
676        };
677
678        match &plan.connection.details {
679            ConnectionDetails::Ssh { key_1, key_2, .. } => {
680                let key_1 = match key_1.as_key_pair() {
681                    Some(key_1) => key_1.clone(),
682                    None => {
683                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
684                            "the PUBLIC KEY 1 option cannot be explicitly specified"
685                        ))));
686                    }
687                };
688
689                let key_2 = match key_2.as_key_pair() {
690                    Some(key_2) => key_2.clone(),
691                    None => {
692                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
693                            "the PUBLIC KEY 2 option cannot be explicitly specified"
694                        ))));
695                    }
696                };
697
698                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
699                let secret = key_set.to_bytes();
700                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
701                    return ctx.retire(Err(err.into()));
702                }
703                self.caching_secrets_reader.invalidate(connection_id);
704            }
705            _ => (),
706        };
707
708        if plan.validate {
709            let internal_cmd_tx = self.internal_cmd_tx.clone();
710            let transient_revision = self.catalog().transient_revision();
711            let conn_id = ctx.session().conn_id().clone();
712            let otel_ctx = OpenTelemetryContext::obtain();
713            let role_metadata = ctx.session().role_metadata().clone();
714
715            let connection = plan
716                .connection
717                .details
718                .to_connection()
719                .into_inline_connection(self.catalog().state());
720
721            let current_storage_parameters = self.controller.storage.config().clone();
722            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
723                let result = match std::panic::AssertUnwindSafe(
724                    connection.validate(connection_id, &current_storage_parameters),
725                )
726                .ore_catch_unwind()
727                .await
728                {
729                    Ok(Ok(())) => Ok(plan),
730                    Ok(Err(err)) => Err(err.into()),
731                    Err(_panic) => {
732                        tracing::error!("connection validation panicked");
733                        Err(AdapterError::Internal(
734                            "connection validation panicked".into(),
735                        ))
736                    }
737                };
738
739                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
740                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
741                    CreateConnectionValidationReady {
742                        ctx,
743                        result,
744                        connection_id,
745                        connection_gid,
746                        plan_validity: PlanValidity::new(
747                            transient_revision,
748                            resolved_ids.items().copied().collect(),
749                            None,
750                            None,
751                            role_metadata,
752                        ),
753                        otel_ctx,
754                        resolved_ids: resolved_ids.clone(),
755                    },
756                ));
757                if let Err(e) = result {
758                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
759                }
760            });
761        } else {
762            let result = self
763                .sequence_create_connection_stage_finish(
764                    &mut ctx,
765                    connection_id,
766                    connection_gid,
767                    plan,
768                    resolved_ids,
769                )
770                .await;
771            ctx.retire(result);
772        }
773    }
774
775    #[instrument]
776    pub(crate) async fn sequence_create_connection_stage_finish(
777        &mut self,
778        ctx: &mut ExecuteContext,
779        connection_id: CatalogItemId,
780        connection_gid: GlobalId,
781        plan: plan::CreateConnectionPlan,
782        resolved_ids: ResolvedIds,
783    ) -> Result<ExecuteResponse, AdapterError> {
784        let ops = vec![catalog::Op::CreateItem {
785            id: connection_id,
786            name: plan.name.clone(),
787            item: CatalogItem::Connection(Connection {
788                create_sql: plan.connection.create_sql,
789                global_id: connection_gid,
790                details: plan.connection.details.clone(),
791                resolved_ids,
792            }),
793            owner_id: *ctx.session().current_role_id(),
794        }];
795
796        // VPC endpoint creation for AWS PrivateLink connections is now handled
797        // in apply_catalog_implications.
798        let conn_id = ctx.session().conn_id().clone();
799        let transact_result = self
800            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
801            .await;
802
803        match transact_result {
804            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
805            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
806                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
807            })) if plan.if_not_exists => {
808                // Clean up SSH key material if it was persisted, since the
809                // catalog item was not created.
810                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
811                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
812                        tracing::warn!(
813                            "Dropping SSH secret for existing connection has encountered an error: {}",
814                            e
815                        );
816                    } else {
817                        self.caching_secrets_reader.invalidate(connection_id);
818                    }
819                }
820                ctx.session()
821                    .add_notice(AdapterNotice::ObjectAlreadyExists {
822                        name: plan.name.item,
823                        ty: "connection",
824                    });
825                Ok(ExecuteResponse::CreatedConnection)
826            }
827            Err(err) => {
828                // Clean up SSH key material if it was persisted, since the
829                // catalog item was not created.
830                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
831                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
832                        tracing::warn!(
833                            "Dropping SSH secret for failed connection has encountered an error: {}",
834                            e
835                        );
836                    } else {
837                        self.caching_secrets_reader.invalidate(connection_id);
838                    }
839                }
840                Err(err)
841            }
842        }
843    }
844
845    #[instrument]
846    pub(super) async fn sequence_create_database(
847        &mut self,
848        session: &Session,
849        plan: plan::CreateDatabasePlan,
850    ) -> Result<ExecuteResponse, AdapterError> {
851        let ops = vec![catalog::Op::CreateDatabase {
852            name: plan.name.clone(),
853            owner_id: *session.current_role_id(),
854        }];
855        match self.catalog_transact(Some(session), ops).await {
856            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
857            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
858                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
859            })) if plan.if_not_exists => {
860                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
861                Ok(ExecuteResponse::CreatedDatabase)
862            }
863            Err(err) => Err(err),
864        }
865    }
866
867    #[instrument]
868    pub(super) async fn sequence_create_schema(
869        &mut self,
870        session: &Session,
871        plan: plan::CreateSchemaPlan,
872    ) -> Result<ExecuteResponse, AdapterError> {
873        let op = catalog::Op::CreateSchema {
874            database_id: plan.database_spec,
875            schema_name: plan.schema_name.clone(),
876            owner_id: *session.current_role_id(),
877        };
878        match self.catalog_transact(Some(session), vec![op]).await {
879            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
880            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
881                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
882            })) if plan.if_not_exists => {
883                session.add_notice(AdapterNotice::SchemaAlreadyExists {
884                    name: plan.schema_name,
885                });
886                Ok(ExecuteResponse::CreatedSchema)
887            }
888            Err(err) => Err(err),
889        }
890    }
891
892    /// Validates the role attributes for a `CREATE ROLE` statement.
893    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
894        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
895            if attributes.superuser.is_some() || attributes.password.is_some() {
896                return Err(AdapterError::UnavailableFeature {
897                    feature: "SUPERUSER and PASSWORD attributes".to_string(),
898                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
899                });
900            }
901        }
902        Ok(())
903    }
904
905    #[instrument]
906    pub(super) async fn sequence_create_role(
907        &mut self,
908        conn_id: Option<&ConnectionId>,
909        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
910    ) -> Result<ExecuteResponse, AdapterError> {
911        self.validate_role_attributes(&attributes.clone())?;
912        let op = catalog::Op::CreateRole { name, attributes };
913        self.catalog_transact_with_context(conn_id, None, vec![op])
914            .await
915            .map(|_| ExecuteResponse::CreatedRole)
916    }
917
918    #[instrument]
919    pub(super) async fn sequence_create_network_policy(
920        &mut self,
921        session: &Session,
922        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
923    ) -> Result<ExecuteResponse, AdapterError> {
924        let op = catalog::Op::CreateNetworkPolicy {
925            rules,
926            name,
927            owner_id: *session.current_role_id(),
928        };
929        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
930            .await
931            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
932    }
933
934    #[instrument]
935    pub(super) async fn sequence_alter_network_policy(
936        &mut self,
937        session: &Session,
938        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
939    ) -> Result<ExecuteResponse, AdapterError> {
940        // TODO(network_policy): Consider role based network policies here.
941        let current_network_policy_name =
942            self.catalog().system_config().default_network_policy_name();
943        // Check if the way we're alerting the policy is still valid for the current connection.
944        if current_network_policy_name == name {
945            self.validate_alter_network_policy(session, &rules)?;
946        }
947
948        let op = catalog::Op::AlterNetworkPolicy {
949            id,
950            rules,
951            name,
952            owner_id: *session.current_role_id(),
953        };
954        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
955            .await
956            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
957    }
958
959    #[instrument]
960    pub(super) async fn sequence_create_table(
961        &mut self,
962        ctx: &mut ExecuteContext,
963        plan: plan::CreateTablePlan,
964        resolved_ids: ResolvedIds,
965    ) -> Result<ExecuteResponse, AdapterError> {
966        let plan::CreateTablePlan {
967            name,
968            table,
969            if_not_exists,
970        } = plan;
971
972        let conn_id = if table.temporary {
973            Some(ctx.session().conn_id())
974        } else {
975            None
976        };
977        let (table_id, global_id) = self.allocate_user_id().await?;
978        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
979
980        let data_source = match table.data_source {
981            plan::TableDataSource::TableWrites { defaults } => {
982                TableDataSource::TableWrites { defaults }
983            }
984            plan::TableDataSource::DataSource {
985                desc: data_source_plan,
986                timeline,
987            } => match data_source_plan {
988                plan::DataSourceDesc::IngestionExport {
989                    ingestion_id,
990                    external_reference,
991                    details,
992                    data_config,
993                } => TableDataSource::DataSource {
994                    desc: DataSourceDesc::IngestionExport {
995                        ingestion_id,
996                        external_reference,
997                        details,
998                        data_config,
999                    },
1000                    timeline,
1001                },
1002                plan::DataSourceDesc::Webhook {
1003                    validate_using,
1004                    body_format,
1005                    headers,
1006                    cluster_id,
1007                } => TableDataSource::DataSource {
1008                    desc: DataSourceDesc::Webhook {
1009                        validate_using,
1010                        body_format,
1011                        headers,
1012                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1013                    },
1014                    timeline,
1015                },
1016                o => {
1017                    unreachable!("CREATE TABLE data source got {:?}", o)
1018                }
1019            },
1020        };
1021
1022        let is_webhook = if let TableDataSource::DataSource {
1023            desc: DataSourceDesc::Webhook { .. },
1024            timeline: _,
1025        } = &data_source
1026        {
1027            true
1028        } else {
1029            false
1030        };
1031
1032        let table = Table {
1033            create_sql: Some(table.create_sql),
1034            desc: table.desc,
1035            collections,
1036            conn_id: conn_id.cloned(),
1037            resolved_ids,
1038            custom_logical_compaction_window: table.compaction_window,
1039            is_retained_metrics_object: false,
1040            data_source,
1041        };
1042        let ops = vec![catalog::Op::CreateItem {
1043            id: table_id,
1044            name: name.clone(),
1045            item: CatalogItem::Table(table.clone()),
1046            owner_id: *ctx.session().current_role_id(),
1047        }];
1048
1049        let catalog_result = self
1050            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1051            .await;
1052
1053        if is_webhook {
1054            // try_get_webhook_url will make up a URL for things that are not
1055            // webhooks, so we guard against that here.
1056            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1057                ctx.session()
1058                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1059            }
1060        }
1061
1062        match catalog_result {
1063            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1064            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1065                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1066            })) if if_not_exists => {
1067                ctx.session_mut()
1068                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1069                        name: name.item,
1070                        ty: "table",
1071                    });
1072                Ok(ExecuteResponse::CreatedTable)
1073            }
1074            Err(err) => Err(err),
1075        }
1076    }
1077
1078    #[instrument]
1079    pub(super) async fn sequence_create_sink(
1080        &mut self,
1081        ctx: ExecuteContext,
1082        plan: plan::CreateSinkPlan,
1083        resolved_ids: ResolvedIds,
1084    ) {
1085        let plan::CreateSinkPlan {
1086            name,
1087            sink,
1088            with_snapshot,
1089            if_not_exists,
1090            in_cluster,
1091        } = plan;
1092
1093        // First try to allocate an ID and an OID. If either fails, we're done.
1094        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1095
1096        let catalog_sink = Sink {
1097            create_sql: sink.create_sql,
1098            global_id,
1099            from: sink.from,
1100            connection: sink.connection,
1101            envelope: sink.envelope,
1102            version: sink.version,
1103            with_snapshot,
1104            resolved_ids,
1105            cluster_id: in_cluster,
1106            commit_interval: sink.commit_interval,
1107        };
1108
1109        let ops = vec![catalog::Op::CreateItem {
1110            id: item_id,
1111            name: name.clone(),
1112            item: CatalogItem::Sink(catalog_sink.clone()),
1113            owner_id: *ctx.session().current_role_id(),
1114        }];
1115
1116        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1117
1118        match result {
1119            Ok(()) => {}
1120            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1121                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1122            })) if if_not_exists => {
1123                ctx.session()
1124                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1125                        name: name.item,
1126                        ty: "sink",
1127                    });
1128                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1129                return;
1130            }
1131            Err(e) => {
1132                ctx.retire(Err(e));
1133                return;
1134            }
1135        };
1136
1137        self.create_storage_export(global_id, &catalog_sink)
1138            .await
1139            .unwrap_or_terminate("cannot fail to create exports");
1140
1141        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1142            .await;
1143
1144        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1145    }
1146
1147    /// Validates that a view definition does not contain any expressions that may lead to
1148    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1149    ///
1150    /// We prevent these expressions so that we can add columns to system tables without
1151    /// changing the definition of the view.
1152    ///
1153    /// Here is a bit of a hand wavy proof as to why we only need to check the
1154    /// immediate view definition for system objects and ambiguous column
1155    /// references, and not the entire dependency tree:
1156    ///
1157    ///   - A view with no object references cannot have any ambiguous column
1158    ///   references to a system object, because it has no system objects.
1159    ///   - A view with a direct reference to a system object and a * or
1160    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1161    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1162    ///   any ambiguous column references to a system object, because all column
1163    ///   references are explicitly named.
1164    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1165    ///   system object cannot have any ambiguous column references to a system
1166    ///   object, because there are no system objects in the top level view and
1167    ///   all sub-views are guaranteed to have no ambiguous column references to
1168    ///   system objects.
1169    pub(super) fn validate_system_column_references(
1170        &self,
1171        uses_ambiguous_columns: bool,
1172        depends_on: &BTreeSet<GlobalId>,
1173    ) -> Result<(), AdapterError> {
1174        if uses_ambiguous_columns
1175            && depends_on
1176                .iter()
1177                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1178        {
1179            Err(AdapterError::AmbiguousSystemColumnReference)
1180        } else {
1181            Ok(())
1182        }
1183    }
1184
1185    #[instrument]
1186    pub(super) async fn sequence_create_type(
1187        &mut self,
1188        session: &Session,
1189        plan: plan::CreateTypePlan,
1190        resolved_ids: ResolvedIds,
1191    ) -> Result<ExecuteResponse, AdapterError> {
1192        let (item_id, global_id) = self.allocate_user_id().await?;
1193        // Validate the type definition (e.g., composite columns) before storing.
1194        plan.typ
1195            .inner
1196            .desc(&self.catalog().for_session(session))
1197            .map_err(AdapterError::from)?;
1198        let typ = Type {
1199            create_sql: Some(plan.typ.create_sql),
1200            global_id,
1201            details: CatalogTypeDetails {
1202                array_id: None,
1203                typ: plan.typ.inner,
1204                pg_metadata: None,
1205            },
1206            resolved_ids,
1207        };
1208        let op = catalog::Op::CreateItem {
1209            id: item_id,
1210            name: plan.name,
1211            item: CatalogItem::Type(typ),
1212            owner_id: *session.current_role_id(),
1213        };
1214        match self.catalog_transact(Some(session), vec![op]).await {
1215            Ok(()) => Ok(ExecuteResponse::CreatedType),
1216            Err(err) => Err(err),
1217        }
1218    }
1219
1220    #[instrument]
1221    pub(super) async fn sequence_comment_on(
1222        &mut self,
1223        session: &Session,
1224        plan: plan::CommentPlan,
1225    ) -> Result<ExecuteResponse, AdapterError> {
1226        let op = catalog::Op::Comment {
1227            object_id: plan.object_id,
1228            sub_component: plan.sub_component,
1229            comment: plan.comment,
1230        };
1231        self.catalog_transact(Some(session), vec![op]).await?;
1232        Ok(ExecuteResponse::Comment)
1233    }
1234
1235    #[instrument]
1236    pub(super) async fn sequence_drop_objects(
1237        &mut self,
1238        ctx: &mut ExecuteContext,
1239        plan::DropObjectsPlan {
1240            drop_ids,
1241            object_type,
1242            referenced_ids,
1243        }: plan::DropObjectsPlan,
1244    ) -> Result<ExecuteResponse, AdapterError> {
1245        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1246        let mut objects = Vec::new();
1247        for obj_id in &drop_ids {
1248            if !referenced_ids_hashset.contains(obj_id) {
1249                let object_info = ErrorMessageObjectDescription::from_id(
1250                    obj_id,
1251                    &self.catalog().for_session(ctx.session()),
1252                )
1253                .to_string();
1254                objects.push(object_info);
1255            }
1256        }
1257
1258        if !objects.is_empty() {
1259            ctx.session()
1260                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1261        }
1262
1263        // Collect GlobalIds for expression cache invalidation.
1264        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1265            .iter()
1266            .filter_map(|id| match id {
1267                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1268                _ => None,
1269            })
1270            .flatten()
1271            .collect();
1272
1273        let DropOps {
1274            ops,
1275            dropped_active_db,
1276            dropped_active_cluster,
1277            dropped_in_use_indexes,
1278        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1279
1280        self.catalog_transact_with_context(None, Some(ctx), ops)
1281            .await?;
1282
1283        // Invalidate expression cache entries for dropped objects.
1284        if !expr_cache_invalidate_ids.is_empty() {
1285            let _fut = self.catalog().update_expression_cache(
1286                Default::default(),
1287                Default::default(),
1288                expr_cache_invalidate_ids,
1289            );
1290        }
1291
1292        fail::fail_point!("after_sequencer_drop_replica");
1293
1294        if dropped_active_db {
1295            ctx.session()
1296                .add_notice(AdapterNotice::DroppedActiveDatabase {
1297                    name: ctx.session().vars().database().to_string(),
1298                });
1299        }
1300        if dropped_active_cluster {
1301            ctx.session()
1302                .add_notice(AdapterNotice::DroppedActiveCluster {
1303                    name: ctx.session().vars().cluster().to_string(),
1304                });
1305        }
1306        for dropped_in_use_index in dropped_in_use_indexes {
1307            ctx.session()
1308                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1309            self.metrics
1310                .optimization_notices
1311                .with_label_values(&["DroppedInUseIndex"])
1312                .inc_by(1);
1313        }
1314        Ok(ExecuteResponse::DroppedObject(object_type))
1315    }
1316
1317    fn validate_dropped_role_ownership(
1318        &self,
1319        session: &Session,
1320        dropped_roles: &BTreeMap<RoleId, &str>,
1321    ) -> Result<(), AdapterError> {
1322        fn privilege_check(
1323            privileges: &PrivilegeMap,
1324            dropped_roles: &BTreeMap<RoleId, &str>,
1325            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1326            object_id: &SystemObjectId,
1327            catalog: &ConnCatalog,
1328        ) {
1329            for privilege in privileges.all_values() {
1330                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1331                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1332                    let object_description =
1333                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1334                    dependent_objects
1335                        .entry(role_name.to_string())
1336                        .or_default()
1337                        .push(format!(
1338                            "privileges on {object_description} granted by {grantor_name}",
1339                        ));
1340                }
1341                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1342                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1343                    let object_description =
1344                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1345                    dependent_objects
1346                        .entry(role_name.to_string())
1347                        .or_default()
1348                        .push(format!(
1349                            "privileges granted on {object_description} to {grantee_name}"
1350                        ));
1351                }
1352            }
1353        }
1354
1355        let catalog = self.catalog().for_session(session);
1356        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1357        for entry in self.catalog.entries() {
1358            let id = SystemObjectId::Object(entry.id().into());
1359            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1360                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1361                dependent_objects
1362                    .entry(role_name.to_string())
1363                    .or_default()
1364                    .push(format!("owner of {object_description}"));
1365            }
1366            privilege_check(
1367                entry.privileges(),
1368                dropped_roles,
1369                &mut dependent_objects,
1370                &id,
1371                &catalog,
1372            );
1373        }
1374        for database in self.catalog.databases() {
1375            let database_id = SystemObjectId::Object(database.id().into());
1376            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1377                let object_description =
1378                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1379                dependent_objects
1380                    .entry(role_name.to_string())
1381                    .or_default()
1382                    .push(format!("owner of {object_description}"));
1383            }
1384            privilege_check(
1385                &database.privileges,
1386                dropped_roles,
1387                &mut dependent_objects,
1388                &database_id,
1389                &catalog,
1390            );
1391            for schema in database.schemas_by_id.values() {
1392                let schema_id = SystemObjectId::Object(
1393                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1394                );
1395                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1396                    let object_description =
1397                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1398                    dependent_objects
1399                        .entry(role_name.to_string())
1400                        .or_default()
1401                        .push(format!("owner of {object_description}"));
1402                }
1403                privilege_check(
1404                    &schema.privileges,
1405                    dropped_roles,
1406                    &mut dependent_objects,
1407                    &schema_id,
1408                    &catalog,
1409                );
1410            }
1411        }
1412        for cluster in self.catalog.clusters() {
1413            let cluster_id = SystemObjectId::Object(cluster.id().into());
1414            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1415                let object_description =
1416                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1417                dependent_objects
1418                    .entry(role_name.to_string())
1419                    .or_default()
1420                    .push(format!("owner of {object_description}"));
1421            }
1422            privilege_check(
1423                &cluster.privileges,
1424                dropped_roles,
1425                &mut dependent_objects,
1426                &cluster_id,
1427                &catalog,
1428            );
1429            for replica in cluster.replicas() {
1430                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1431                    let replica_id =
1432                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1433                    let object_description =
1434                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1435                    dependent_objects
1436                        .entry(role_name.to_string())
1437                        .or_default()
1438                        .push(format!("owner of {object_description}"));
1439                }
1440            }
1441        }
1442        privilege_check(
1443            self.catalog().system_privileges(),
1444            dropped_roles,
1445            &mut dependent_objects,
1446            &SystemObjectId::System,
1447            &catalog,
1448        );
1449        for (default_privilege_object, default_privilege_acl_items) in
1450            self.catalog.default_privileges()
1451        {
1452            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1453                dependent_objects
1454                    .entry(role_name.to_string())
1455                    .or_default()
1456                    .push(format!(
1457                        "default privileges on {}S created by {}",
1458                        default_privilege_object.object_type, role_name
1459                    ));
1460            }
1461            for default_privilege_acl_item in default_privilege_acl_items {
1462                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1463                    dependent_objects
1464                        .entry(role_name.to_string())
1465                        .or_default()
1466                        .push(format!(
1467                            "default privileges on {}S granted to {}",
1468                            default_privilege_object.object_type, role_name
1469                        ));
1470                }
1471            }
1472        }
1473
1474        if !dependent_objects.is_empty() {
1475            Err(AdapterError::DependentObject(dependent_objects))
1476        } else {
1477            Ok(())
1478        }
1479    }
1480
1481    #[instrument]
1482    pub(super) async fn sequence_drop_owned(
1483        &mut self,
1484        session: &Session,
1485        plan: plan::DropOwnedPlan,
1486    ) -> Result<ExecuteResponse, AdapterError> {
1487        for role_id in &plan.role_ids {
1488            self.catalog().ensure_not_reserved_role(role_id)?;
1489        }
1490
1491        let mut privilege_revokes = plan.privilege_revokes;
1492
1493        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1494        let session_catalog = self.catalog().for_session(session);
1495        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1496            && !session.is_superuser()
1497        {
1498            // Obtain all roles that the current session is a member of.
1499            let role_membership =
1500                session_catalog.collect_role_membership(session.current_role_id());
1501            let invalid_revokes: BTreeSet<_> = privilege_revokes
1502                .extract_if(.., |(_, privilege)| {
1503                    !role_membership.contains(&privilege.grantor)
1504                })
1505                .map(|(object_id, _)| object_id)
1506                .collect();
1507            for invalid_revoke in invalid_revokes {
1508                let object_description =
1509                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1510                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1511            }
1512        }
1513
1514        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1515            catalog::Op::UpdatePrivilege {
1516                target_id: object_id,
1517                privilege,
1518                variant: UpdatePrivilegeVariant::Revoke,
1519            }
1520        });
1521        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1522            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1523                privilege_object,
1524                privilege_acl_item,
1525                variant: UpdatePrivilegeVariant::Revoke,
1526            },
1527        );
1528        let DropOps {
1529            ops: drop_ops,
1530            dropped_active_db,
1531            dropped_active_cluster,
1532            dropped_in_use_indexes,
1533        } = self.sequence_drop_common(session, plan.drop_ids)?;
1534
1535        let ops = privilege_revoke_ops
1536            .chain(default_privilege_revoke_ops)
1537            .chain(drop_ops.into_iter())
1538            .collect();
1539
1540        self.catalog_transact(Some(session), ops).await?;
1541
1542        if dropped_active_db {
1543            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1544                name: session.vars().database().to_string(),
1545            });
1546        }
1547        if dropped_active_cluster {
1548            session.add_notice(AdapterNotice::DroppedActiveCluster {
1549                name: session.vars().cluster().to_string(),
1550            });
1551        }
1552        for dropped_in_use_index in dropped_in_use_indexes {
1553            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1554        }
1555        Ok(ExecuteResponse::DroppedOwned)
1556    }
1557
1558    fn sequence_drop_common(
1559        &self,
1560        session: &Session,
1561        ids: Vec<ObjectId>,
1562    ) -> Result<DropOps, AdapterError> {
1563        let mut dropped_active_db = false;
1564        let mut dropped_active_cluster = false;
1565        let mut dropped_in_use_indexes = Vec::new();
1566        let mut dropped_roles = BTreeMap::new();
1567        let mut dropped_databases = BTreeSet::new();
1568        let mut dropped_schemas = BTreeSet::new();
1569        // Dropping either the group role or the member role of a role membership will trigger a
1570        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1571        // role membership twice.
1572        let mut role_revokes = BTreeSet::new();
1573        // Dropping a database or a schema will revoke all default roles associated with that
1574        // database or schema.
1575        let mut default_privilege_revokes = BTreeSet::new();
1576
1577        // Clusters we're dropping
1578        let mut clusters_to_drop = BTreeSet::new();
1579
1580        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1581        for id in &ids {
1582            match id {
1583                ObjectId::Database(id) => {
1584                    let name = self.catalog().get_database(id).name();
1585                    if name == session.vars().database() {
1586                        dropped_active_db = true;
1587                    }
1588                    dropped_databases.insert(id);
1589                }
1590                ObjectId::Schema((_, spec)) => {
1591                    if let SchemaSpecifier::Id(id) = spec {
1592                        dropped_schemas.insert(id);
1593                    }
1594                }
1595                ObjectId::Cluster(id) => {
1596                    clusters_to_drop.insert(*id);
1597                    if let Some(active_id) = self
1598                        .catalog()
1599                        .active_cluster(session)
1600                        .ok()
1601                        .map(|cluster| cluster.id())
1602                    {
1603                        if id == &active_id {
1604                            dropped_active_cluster = true;
1605                        }
1606                    }
1607                }
1608                ObjectId::Role(id) => {
1609                    let role = self.catalog().get_role(id);
1610                    let name = role.name();
1611                    dropped_roles.insert(*id, name);
1612                    // We must revoke all role memberships that the dropped roles belongs to.
1613                    for (group_id, grantor_id) in &role.membership.map {
1614                        role_revokes.insert((*group_id, *id, *grantor_id));
1615                    }
1616                }
1617                ObjectId::Item(id) => {
1618                    if let Some(index) = self.catalog().get_entry(id).index() {
1619                        let humanizer = self.catalog().for_session(session);
1620                        let dependants = self
1621                            .controller
1622                            .compute
1623                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1624                            .ok()
1625                            .into_iter()
1626                            .flatten()
1627                            .filter(|dependant_id| {
1628                                // Transient Ids belong to Peeks. We are not interested for now in
1629                                // peeks depending on a dropped index.
1630                                // TODO: show a different notice in this case. Something like
1631                                // "There is an in-progress ad hoc SELECT that uses the dropped
1632                                // index. The resources used by the index will be freed when all
1633                                // such SELECTs complete."
1634                                if dependant_id.is_transient() {
1635                                    return false;
1636                                }
1637                                // The item should exist, but don't panic if it doesn't.
1638                                let Some(dependent_id) = humanizer
1639                                    .try_get_item_by_global_id(dependant_id)
1640                                    .map(|item| item.id())
1641                                else {
1642                                    return false;
1643                                };
1644                                // If the dependent object is also being dropped, then there is no
1645                                // problem, so we don't want a notice.
1646                                !ids_set.contains(&ObjectId::Item(dependent_id))
1647                            })
1648                            .flat_map(|dependant_id| {
1649                                // If we are not able to find a name for this ID it probably means
1650                                // we have already dropped the compute collection, in which case we
1651                                // can ignore it.
1652                                humanizer.humanize_id(dependant_id)
1653                            })
1654                            .collect_vec();
1655                        if !dependants.is_empty() {
1656                            dropped_in_use_indexes.push(DroppedInUseIndex {
1657                                index_name: humanizer
1658                                    .humanize_id(index.global_id())
1659                                    .unwrap_or_else(|| id.to_string()),
1660                                dependant_objects: dependants,
1661                            });
1662                        }
1663                    }
1664                }
1665                _ => {}
1666            }
1667        }
1668
1669        for id in &ids {
1670            match id {
1671                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1672                // unless they are internal replicas, which exist outside the scope
1673                // of managed clusters.
1674                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1675                    if !clusters_to_drop.contains(cluster_id) {
1676                        let cluster = self.catalog.get_cluster(*cluster_id);
1677                        if cluster.is_managed() {
1678                            let replica =
1679                                cluster.replica(*replica_id).expect("Catalog out of sync");
1680                            if !replica.config.location.internal() {
1681                                coord_bail!("cannot drop replica of managed cluster");
1682                            }
1683                        }
1684                    }
1685                }
1686                _ => {}
1687            }
1688        }
1689
1690        for role_id in dropped_roles.keys() {
1691            self.catalog().ensure_not_reserved_role(role_id)?;
1692        }
1693        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1694        // If any role is a member of a dropped role, then we must revoke that membership.
1695        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1696        for role in self.catalog().user_roles() {
1697            for dropped_role_id in
1698                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1699            {
1700                role_revokes.insert((
1701                    **dropped_role_id,
1702                    role.id(),
1703                    *role
1704                        .membership
1705                        .map
1706                        .get(*dropped_role_id)
1707                        .expect("included in keys above"),
1708                ));
1709            }
1710        }
1711
1712        for (default_privilege_object, default_privilege_acls) in
1713            self.catalog().default_privileges()
1714        {
1715            if matches!(
1716                &default_privilege_object.database_id,
1717                Some(database_id) if dropped_databases.contains(database_id),
1718            ) || matches!(
1719                &default_privilege_object.schema_id,
1720                Some(schema_id) if dropped_schemas.contains(schema_id),
1721            ) {
1722                for default_privilege_acl in default_privilege_acls {
1723                    default_privilege_revokes.insert((
1724                        default_privilege_object.clone(),
1725                        default_privilege_acl.clone(),
1726                    ));
1727                }
1728            }
1729        }
1730
1731        let ops = role_revokes
1732            .into_iter()
1733            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1734                role_id,
1735                member_id,
1736                grantor_id,
1737            })
1738            .chain(default_privilege_revokes.into_iter().map(
1739                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1740                    privilege_object,
1741                    privilege_acl_item,
1742                    variant: UpdatePrivilegeVariant::Revoke,
1743                },
1744            ))
1745            .chain(iter::once(catalog::Op::DropObjects(
1746                ids.into_iter()
1747                    .map(DropObjectInfo::manual_drop_from_object_id)
1748                    .collect(),
1749            )))
1750            .collect();
1751
1752        Ok(DropOps {
1753            ops,
1754            dropped_active_db,
1755            dropped_active_cluster,
1756            dropped_in_use_indexes,
1757        })
1758    }
1759
1760    pub(super) fn sequence_explain_schema(
1761        &self,
1762        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1763    ) -> Result<ExecuteResponse, AdapterError> {
1764        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1765            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1766        })?;
1767
1768        let json_string = json_string(&json_value);
1769        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1770        Ok(Self::send_immediate_rows(row))
1771    }
1772
1773    pub(super) fn sequence_show_all_variables(
1774        &self,
1775        session: &Session,
1776    ) -> Result<ExecuteResponse, AdapterError> {
1777        let mut rows = viewable_variables(self.catalog().state(), session)
1778            .map(|v| (v.name(), v.value(), v.description()))
1779            .collect::<Vec<_>>();
1780        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1781
1782        // TODO(parkmycar): Pack all of these into a single RowCollection.
1783        let rows: Vec<_> = rows
1784            .into_iter()
1785            .map(|(name, val, desc)| {
1786                Row::pack_slice(&[
1787                    Datum::String(name),
1788                    Datum::String(&val),
1789                    Datum::String(desc),
1790                ])
1791            })
1792            .collect();
1793        Ok(Self::send_immediate_rows(rows))
1794    }
1795
1796    pub(super) fn sequence_show_variable(
1797        &self,
1798        session: &Session,
1799        plan: plan::ShowVariablePlan,
1800    ) -> Result<ExecuteResponse, AdapterError> {
1801        if &plan.name == SCHEMA_ALIAS {
1802            let schemas = self.catalog.resolve_search_path(session);
1803            let schema = schemas.first();
1804            return match schema {
1805                Some((database_spec, schema_spec)) => {
1806                    let schema_name = &self
1807                        .catalog
1808                        .get_schema(database_spec, schema_spec, session.conn_id())
1809                        .name()
1810                        .schema;
1811                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1812                    Ok(Self::send_immediate_rows(row))
1813                }
1814                None => {
1815                    if session.vars().current_object_missing_warnings() {
1816                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1817                            search_path: session
1818                                .vars()
1819                                .search_path()
1820                                .into_iter()
1821                                .map(|schema| schema.to_string())
1822                                .collect(),
1823                        });
1824                    }
1825                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1826                }
1827            };
1828        }
1829
1830        let variable = session
1831            .vars()
1832            .get(self.catalog().system_config(), &plan.name)
1833            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1834
1835        // In lieu of plumbing the user to all system config functions, just check that the var is
1836        // visible.
1837        variable.visible(session.user(), self.catalog().system_config())?;
1838
1839        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1840        if variable.name() == vars::DATABASE.name()
1841            && matches!(
1842                self.catalog().resolve_database(&variable.value()),
1843                Err(CatalogError::UnknownDatabase(_))
1844            )
1845            && session.vars().current_object_missing_warnings()
1846        {
1847            let name = variable.value();
1848            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1849        } else if variable.name() == vars::CLUSTER.name()
1850            && matches!(
1851                self.catalog().resolve_cluster(&variable.value()),
1852                Err(CatalogError::UnknownCluster(_))
1853            )
1854            && session.vars().current_object_missing_warnings()
1855        {
1856            let name = variable.value();
1857            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1858        }
1859        Ok(Self::send_immediate_rows(row))
1860    }
1861
1862    #[instrument]
1863    pub(super) async fn sequence_inspect_shard(
1864        &self,
1865        session: &Session,
1866        plan: plan::InspectShardPlan,
1867    ) -> Result<ExecuteResponse, AdapterError> {
1868        // TODO: Not thrilled about this rbac special case here, but probably
1869        // sufficient for now.
1870        if !session.user().is_internal() {
1871            return Err(AdapterError::Unauthorized(
1872                rbac::UnauthorizedError::MzSystem {
1873                    action: "inspect".into(),
1874                },
1875            ));
1876        }
1877        let state = self
1878            .controller
1879            .storage
1880            .inspect_persist_state(plan.id)
1881            .await?;
1882        let jsonb = Jsonb::from_serde_json(state)?;
1883        Ok(Self::send_immediate_rows(jsonb.into_row()))
1884    }
1885
1886    #[instrument]
1887    pub(super) fn sequence_set_variable(
1888        &self,
1889        session: &mut Session,
1890        plan: plan::SetVariablePlan,
1891    ) -> Result<ExecuteResponse, AdapterError> {
1892        let (name, local) = (plan.name, plan.local);
1893        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1894            self.validate_set_isolation_level(session)?;
1895        }
1896        if &name == vars::CLUSTER.name() {
1897            self.validate_set_cluster(session)?;
1898        }
1899
1900        let vars = session.vars_mut();
1901        let values = match plan.value {
1902            plan::VariableValue::Default => None,
1903            plan::VariableValue::Values(values) => Some(values),
1904        };
1905
1906        match values {
1907            Some(values) => {
1908                vars.set(
1909                    self.catalog().system_config(),
1910                    &name,
1911                    VarInput::SqlSet(&values),
1912                    local,
1913                )?;
1914
1915                let vars = session.vars();
1916
1917                // Emit a warning when deprecated variables are used.
1918                // TODO(database-issues#8069) remove this after sufficient time has passed
1919                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1920                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1921                } else if name == vars::CLUSTER.name()
1922                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1923                {
1924                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1925                }
1926
1927                // Database or cluster value does not correspond to a catalog item.
1928                if name.as_str() == vars::DATABASE.name()
1929                    && matches!(
1930                        self.catalog().resolve_database(vars.database()),
1931                        Err(CatalogError::UnknownDatabase(_))
1932                    )
1933                    && session.vars().current_object_missing_warnings()
1934                {
1935                    let name = vars.database().to_string();
1936                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1937                } else if name.as_str() == vars::CLUSTER.name()
1938                    && matches!(
1939                        self.catalog().resolve_cluster(vars.cluster()),
1940                        Err(CatalogError::UnknownCluster(_))
1941                    )
1942                    && session.vars().current_object_missing_warnings()
1943                {
1944                    let name = vars.cluster().to_string();
1945                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1946                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1947                    let v = values.into_first().to_lowercase();
1948                    if v == IsolationLevel::ReadUncommitted.as_str()
1949                        || v == IsolationLevel::ReadCommitted.as_str()
1950                        || v == IsolationLevel::RepeatableRead.as_str()
1951                    {
1952                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1953                            isolation_level: v,
1954                        });
1955                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1956                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1957                    }
1958                }
1959            }
1960            None => vars.reset(self.catalog().system_config(), &name, local)?,
1961        }
1962
1963        Ok(ExecuteResponse::SetVariable { name, reset: false })
1964    }
1965
1966    pub(super) fn sequence_reset_variable(
1967        &self,
1968        session: &mut Session,
1969        plan: plan::ResetVariablePlan,
1970    ) -> Result<ExecuteResponse, AdapterError> {
1971        let name = plan.name;
1972        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1973            self.validate_set_isolation_level(session)?;
1974        }
1975        if &name == vars::CLUSTER.name() {
1976            self.validate_set_cluster(session)?;
1977        }
1978        session
1979            .vars_mut()
1980            .reset(self.catalog().system_config(), &name, false)?;
1981        Ok(ExecuteResponse::SetVariable { name, reset: true })
1982    }
1983
1984    pub(super) fn sequence_set_transaction(
1985        &self,
1986        session: &mut Session,
1987        plan: plan::SetTransactionPlan,
1988    ) -> Result<ExecuteResponse, AdapterError> {
1989        // TODO(jkosh44) Only supports isolation levels for now.
1990        for mode in plan.modes {
1991            match mode {
1992                TransactionMode::AccessMode(_) => {
1993                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1994                }
1995                TransactionMode::IsolationLevel(isolation_level) => {
1996                    self.validate_set_isolation_level(session)?;
1997
1998                    session.vars_mut().set(
1999                        self.catalog().system_config(),
2000                        TRANSACTION_ISOLATION_VAR_NAME,
2001                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2002                        plan.local,
2003                    )?
2004                }
2005            }
2006        }
2007        Ok(ExecuteResponse::SetVariable {
2008            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2009            reset: false,
2010        })
2011    }
2012
2013    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2014        if session.transaction().contains_ops() {
2015            Err(AdapterError::InvalidSetIsolationLevel)
2016        } else {
2017            Ok(())
2018        }
2019    }
2020
2021    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2022        if session.transaction().contains_ops() {
2023            Err(AdapterError::InvalidSetCluster)
2024        } else {
2025            Ok(())
2026        }
2027    }
2028
2029    #[instrument]
2030    pub(super) async fn sequence_end_transaction(
2031        &mut self,
2032        mut ctx: ExecuteContext,
2033        mut action: EndTransactionAction,
2034    ) {
2035        // If the transaction has failed, we can only rollback.
2036        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2037            (&action, ctx.session().transaction())
2038        {
2039            action = EndTransactionAction::Rollback;
2040        }
2041        let response = match action {
2042            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2043                params: BTreeMap::new(),
2044            }),
2045            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2046                params: BTreeMap::new(),
2047            }),
2048        };
2049
2050        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2051
2052        let (response, action) = match result {
2053            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2054                (response, action)
2055            }
2056            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2057                // Make sure we have the correct set of write locks for this transaction.
2058                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2059                // transactions.
2060                let validated_locks = match write_lock_guards {
2061                    None => None,
2062                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2063                        Ok(locks) => Some(locks),
2064                        Err(missing) => {
2065                            tracing::error!(?missing, "programming error, missing write locks");
2066                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2067                        }
2068                    },
2069                };
2070
2071                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2072                for WriteOp { id, rows } in writes {
2073                    let total_rows = collected_writes.entry(id).or_default();
2074                    total_rows.push(rows);
2075                }
2076
2077                self.submit_write(PendingWriteTxn::User {
2078                    span: Span::current(),
2079                    writes: collected_writes,
2080                    write_locks: validated_locks,
2081                    pending_txn: PendingTxn {
2082                        ctx,
2083                        response,
2084                        action,
2085                    },
2086                });
2087                return;
2088            }
2089            Ok((
2090                Some(TransactionOps::Peeks {
2091                    determination,
2092                    requires_linearization: RequireLinearization::Required,
2093                    ..
2094                }),
2095                _,
2096            )) if ctx.session().vars().transaction_isolation()
2097                == &IsolationLevel::StrictSerializable =>
2098            {
2099                let conn_id = ctx.session().conn_id().clone();
2100                let pending_read_txn = PendingReadTxn {
2101                    txn: PendingRead::Read {
2102                        txn: PendingTxn {
2103                            ctx,
2104                            response,
2105                            action,
2106                        },
2107                    },
2108                    timestamp_context: determination.timestamp_context,
2109                    created: Instant::now(),
2110                    num_requeues: 0,
2111                    otel_ctx: OpenTelemetryContext::obtain(),
2112                };
2113                self.strict_serializable_reads_tx
2114                    .send((conn_id, pending_read_txn))
2115                    .expect("sending to strict_serializable_reads_tx cannot fail");
2116                return;
2117            }
2118            Ok((
2119                Some(TransactionOps::Peeks {
2120                    determination,
2121                    requires_linearization: RequireLinearization::Required,
2122                    ..
2123                }),
2124                _,
2125            )) if ctx.session().vars().transaction_isolation()
2126                == &IsolationLevel::StrongSessionSerializable =>
2127            {
2128                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2129                    ctx.session_mut()
2130                        .ensure_timestamp_oracle(timeline.clone())
2131                        .apply_write(*ts);
2132                }
2133                (response, action)
2134            }
2135            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2136                self.internal_cmd_tx
2137                    .send(Message::ExecuteSingleStatementTransaction {
2138                        ctx,
2139                        otel_ctx: OpenTelemetryContext::obtain(),
2140                        stmt,
2141                        params,
2142                    })
2143                    .expect("must send");
2144                return;
2145            }
2146            Ok((_, _)) => (response, action),
2147            Err(err) => (Err(err), EndTransactionAction::Rollback),
2148        };
2149        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2150        // Append any parameters that changed to the response.
2151        let response = response.map(|mut r| {
2152            r.extend_params(changed);
2153            ExecuteResponse::from(r)
2154        });
2155
2156        ctx.retire(response);
2157    }
2158
2159    #[instrument]
2160    async fn sequence_end_transaction_inner(
2161        &mut self,
2162        ctx: &mut ExecuteContext,
2163        action: EndTransactionAction,
2164    ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2165        let txn = self.clear_transaction(ctx.session_mut()).await;
2166
2167        if let EndTransactionAction::Commit = action {
2168            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2169                match &mut ops {
2170                    TransactionOps::Writes(writes) => {
2171                        for WriteOp { id, .. } in &mut writes.iter() {
2172                            // Re-verify this id exists.
2173                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2174                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2175                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2176                                })
2177                            })?;
2178                        }
2179
2180                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2181                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2182                    }
2183                    TransactionOps::DDL {
2184                        ops,
2185                        state: _,
2186                        side_effects,
2187                        revision,
2188                        snapshot: _,
2189                    } => {
2190                        // Make sure our catalog hasn't changed.
2191                        if *revision != self.catalog().transient_revision() {
2192                            return Err(AdapterError::DDLTransactionRace);
2193                        }
2194                        // Commit all of our queued ops.
2195                        let ops = std::mem::take(ops);
2196                        let side_effects = std::mem::take(side_effects);
2197                        self.catalog_transact_with_side_effects(
2198                            Some(ctx),
2199                            ops,
2200                            move |a, mut ctx| {
2201                                Box::pin(async move {
2202                                    for side_effect in side_effects {
2203                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2204                                    }
2205                                })
2206                            },
2207                        )
2208                        .await?;
2209                    }
2210                    _ => (),
2211                }
2212                return Ok((Some(ops), write_lock_guards));
2213            }
2214        }
2215
2216        Ok((None, None))
2217    }
2218
2219    pub(super) async fn sequence_side_effecting_func(
2220        &mut self,
2221        ctx: ExecuteContext,
2222        plan: SideEffectingFunc,
2223    ) {
2224        match plan {
2225            SideEffectingFunc::PgCancelBackend { connection_id } => {
2226                if ctx.session().conn_id().unhandled() == connection_id {
2227                    // As a special case, if we're canceling ourselves, we send
2228                    // back a canceled resposne to the client issuing the query,
2229                    // and so we need to do no further processing of the cancel.
2230                    ctx.retire(Err(AdapterError::Canceled));
2231                    return;
2232                }
2233
2234                let res = if let Some((id_handle, _conn_meta)) =
2235                    self.active_conns.get_key_value(&connection_id)
2236                {
2237                    // check_plan already verified role membership.
2238                    self.handle_privileged_cancel(id_handle.clone()).await;
2239                    Datum::True
2240                } else {
2241                    Datum::False
2242                };
2243                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2244            }
2245        }
2246    }
2247
2248    /// Execute a side-effecting function from the frontend peek path.
2249    /// This is separate from `sequence_side_effecting_func` because
2250    /// - It doesn't have an ExecuteContext.
2251    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2252    ///   peek sequencing can't look at `active_conns`.
2253    ///
2254    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2255    /// sequencing.
2256    pub(crate) async fn execute_side_effecting_func(
2257        &mut self,
2258        plan: SideEffectingFunc,
2259        conn_id: ConnectionId,
2260        current_role: RoleId,
2261    ) -> Result<ExecuteResponse, AdapterError> {
2262        match plan {
2263            SideEffectingFunc::PgCancelBackend { connection_id } => {
2264                if conn_id.unhandled() == connection_id {
2265                    // As a special case, if we're canceling ourselves, we return
2266                    // a canceled response to the client issuing the query,
2267                    // and so we need to do no further processing of the cancel.
2268                    return Err(AdapterError::Canceled);
2269                }
2270
2271                // Perform RBAC check: the current user must be a member of the role
2272                // that owns the connection being cancelled.
2273                if let Some((_id_handle, conn_meta)) =
2274                    self.active_conns.get_key_value(&connection_id)
2275                {
2276                    let target_role = *conn_meta.authenticated_role_id();
2277                    let role_membership = self
2278                        .catalog()
2279                        .state()
2280                        .collect_role_membership(&current_role);
2281                    if !role_membership.contains(&target_role) {
2282                        let target_role_name = self
2283                            .catalog()
2284                            .try_get_role(&target_role)
2285                            .map(|role| role.name().to_string())
2286                            .unwrap_or_else(|| target_role.to_string());
2287                        return Err(AdapterError::Unauthorized(
2288                            rbac::UnauthorizedError::RoleMembership {
2289                                role_names: vec![target_role_name],
2290                            },
2291                        ));
2292                    }
2293
2294                    // RBAC check passed, proceed with cancellation.
2295                    let id_handle = self
2296                        .active_conns
2297                        .get_key_value(&connection_id)
2298                        .map(|(id, _)| id.clone())
2299                        .expect("checked above");
2300                    self.handle_privileged_cancel(id_handle).await;
2301                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2302                } else {
2303                    // Connection not found, return false.
2304                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2305                }
2306            }
2307        }
2308    }
2309
2310    /// Inner method that performs the actual real-time recency timestamp determination.
2311    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2312    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2313    pub(crate) async fn determine_real_time_recent_timestamp(
2314        &self,
2315        source_ids: impl Iterator<Item = GlobalId>,
2316        real_time_recency_timeout: Duration,
2317    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2318        let item_ids = source_ids
2319            .map(|gid| {
2320                self.catalog
2321                    .try_resolve_item_id(&gid)
2322                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2323            })
2324            .collect::<Result<Vec<_>, _>>()?;
2325
2326        // Find all dependencies transitively because we need to ensure that
2327        // RTR queries determine the timestamp from the sources' (i.e.
2328        // storage objects that ingest data from external systems) remap
2329        // data. We "cheat" a little bit and filter out any IDs that aren't
2330        // user objects because we know they are not a RTR source.
2331        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2332        // If none of the sources are user objects, we don't need to provide
2333        // a RTR timestamp.
2334        if to_visit.is_empty() {
2335            return Ok(None);
2336        }
2337
2338        let mut timestamp_objects = BTreeSet::new();
2339
2340        while let Some(id) = to_visit.pop_front() {
2341            timestamp_objects.insert(id);
2342            to_visit.extend(
2343                self.catalog()
2344                    .get_entry(&id)
2345                    .uses()
2346                    .into_iter()
2347                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2348            );
2349        }
2350        let timestamp_objects = timestamp_objects
2351            .into_iter()
2352            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2353            .collect();
2354
2355        let r = self
2356            .controller
2357            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2358            .await?;
2359
2360        Ok(Some(r))
2361    }
2362
2363    /// Checks to see if the session needs a real time recency timestamp and if so returns
2364    /// a future that will return the timestamp.
2365    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2366        &self,
2367        session: &Session,
2368        source_ids: impl Iterator<Item = GlobalId>,
2369    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2370        let vars = session.vars();
2371
2372        if vars.real_time_recency()
2373            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2374            && !session.contains_read_timestamp()
2375        {
2376            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2377                .await
2378        } else {
2379            Ok(None)
2380        }
2381    }
2382
2383    #[instrument]
2384    pub(super) async fn sequence_explain_plan(
2385        &mut self,
2386        ctx: ExecuteContext,
2387        plan: plan::ExplainPlanPlan,
2388        target_cluster: TargetCluster,
2389    ) {
2390        match &plan.explainee {
2391            plan::Explainee::Statement(stmt) => match stmt {
2392                plan::ExplaineeStatement::CreateView { .. } => {
2393                    self.explain_create_view(ctx, plan).await;
2394                }
2395                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2396                    self.explain_create_materialized_view(ctx, plan).await;
2397                }
2398                plan::ExplaineeStatement::CreateIndex { .. } => {
2399                    self.explain_create_index(ctx, plan).await;
2400                }
2401                plan::ExplaineeStatement::Select { .. } => {
2402                    self.explain_peek(ctx, plan, target_cluster).await;
2403                }
2404                plan::ExplaineeStatement::Subscribe { .. } => {
2405                    self.explain_subscribe(ctx, plan, target_cluster).await;
2406                }
2407            },
2408            plan::Explainee::View(_) => {
2409                let result = self.explain_view(&ctx, plan);
2410                ctx.retire(result);
2411            }
2412            plan::Explainee::MaterializedView(_) => {
2413                let result = self.explain_materialized_view(&ctx, plan);
2414                ctx.retire(result);
2415            }
2416            plan::Explainee::Index(_) => {
2417                let result = self.explain_index(&ctx, plan);
2418                ctx.retire(result);
2419            }
2420            plan::Explainee::ReplanView(_) => {
2421                self.explain_replan_view(ctx, plan).await;
2422            }
2423            plan::Explainee::ReplanMaterializedView(_) => {
2424                self.explain_replan_materialized_view(ctx, plan).await;
2425            }
2426            plan::Explainee::ReplanIndex(_) => {
2427                self.explain_replan_index(ctx, plan).await;
2428            }
2429        };
2430    }
2431
2432    pub(super) async fn sequence_explain_pushdown(
2433        &mut self,
2434        ctx: ExecuteContext,
2435        plan: plan::ExplainPushdownPlan,
2436        target_cluster: TargetCluster,
2437    ) {
2438        match plan.explainee {
2439            Explainee::Statement(ExplaineeStatement::Select {
2440                broken: false,
2441                plan,
2442                desc: _,
2443            }) => {
2444                let stage = return_if_err!(
2445                    self.peek_validate(
2446                        ctx.session(),
2447                        plan,
2448                        target_cluster,
2449                        None,
2450                        ExplainContext::Pushdown,
2451                        Some(ctx.session().vars().max_query_result_size()),
2452                    ),
2453                    ctx
2454                );
2455                self.sequence_staged(ctx, Span::current(), stage).await;
2456            }
2457            Explainee::MaterializedView(item_id) => {
2458                self.explain_pushdown_materialized_view(ctx, item_id).await;
2459            }
2460            _ => {
2461                ctx.retire(Err(AdapterError::Unsupported(
2462                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2463                )));
2464            }
2465        };
2466    }
2467
2468    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2469    async fn execute_explain_pushdown_with_read_holds(
2470        &self,
2471        ctx: ExecuteContext,
2472        as_of: Antichain<Timestamp>,
2473        mz_now: ResultSpec<'static>,
2474        read_holds: Option<ReadHolds>,
2475        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2476    ) {
2477        let fut = self
2478            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2479            .await;
2480        task::spawn(|| "render explain pushdown", async move {
2481            // Transfer the necessary read holds over to the background task
2482            let _read_holds = read_holds;
2483            let res = fut.await;
2484            ctx.retire(res);
2485        });
2486    }
2487
2488    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2489    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2490        &self,
2491        session: &Session,
2492        as_of: Antichain<Timestamp>,
2493        mz_now: ResultSpec<'static>,
2494        imports: I,
2495    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2496        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2497        super::explain_pushdown_future_inner(
2498            session,
2499            &self.catalog,
2500            &self.controller.storage_collections,
2501            as_of,
2502            mz_now,
2503            imports,
2504        )
2505        .await
2506    }
2507
2508    #[instrument]
2509    pub(super) async fn sequence_insert(
2510        &mut self,
2511        mut ctx: ExecuteContext,
2512        plan: plan::InsertPlan,
2513    ) {
2514        // Normally, this would get checked when trying to add "write ops" to
2515        // the transaction but we go down diverging paths below, based on
2516        // whether the INSERT is only constant values or not.
2517        //
2518        // For the non-constant case we sequence an implicit read-then-write,
2519        // which messes with the transaction ops and would allow an implicit
2520        // read-then-write to sneak into a read-only transaction.
2521        if !ctx.session_mut().transaction().allows_writes() {
2522            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2523            return;
2524        }
2525
2526        // The structure of this code originates from a time where
2527        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2528        // optimized `MirRelationExpr`.
2529        //
2530        // Ideally, we would like to make the `selection.as_const().is_some()`
2531        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2532        // are planned as a Wrap($n, $vals) call, so until we can reduce
2533        // HirRelationExpr this will always returns false.
2534        //
2535        // Unfortunately, hitting the default path of the match below also
2536        // causes a lot of tests to fail, so we opted to go with the extra
2537        // `plan.values.clone()` statements when producing the `optimized_mir`
2538        // and re-optimize the values in the `sequence_read_then_write` call.
2539        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2540            // We don't perform any optimizations on an expression that is already
2541            // a constant for writes, as we want to maximize bulk-insert throughput.
2542            let expr = return_if_err!(
2543                plan.values
2544                    .clone()
2545                    .lower(self.catalog().system_config(), None),
2546                ctx
2547            );
2548            OptimizedMirRelationExpr(expr)
2549        } else {
2550            // Collect optimizer parameters.
2551            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2552
2553            // (`optimize::view::Optimizer` has a special case for constant queries.)
2554            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2555
2556            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2557            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2558        };
2559
2560        match optimized_mir.into_inner() {
2561            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2562                let catalog = self.owned_catalog();
2563                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2564                    let result =
2565                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2566                    ctx.retire(result);
2567                });
2568            }
2569            // All non-constant values must be planned as read-then-writes.
2570            _ => {
2571                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2572                    Some(table) => {
2573                        // Inserts always occur at the latest version of the table.
2574                        let desc = table.relation_desc_latest().expect("table has a desc");
2575                        desc.arity()
2576                    }
2577                    None => {
2578                        ctx.retire(Err(AdapterError::Catalog(
2579                            mz_catalog::memory::error::Error {
2580                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2581                                    plan.id.to_string(),
2582                                )),
2583                            },
2584                        )));
2585                        return;
2586                    }
2587                };
2588
2589                let finishing = RowSetFinishing {
2590                    order_by: vec![],
2591                    limit: None,
2592                    offset: 0,
2593                    project: (0..desc_arity).collect(),
2594                };
2595
2596                let read_then_write_plan = plan::ReadThenWritePlan {
2597                    id: plan.id,
2598                    selection: plan.values,
2599                    finishing,
2600                    assignments: BTreeMap::new(),
2601                    kind: MutationKind::Insert,
2602                    returning: plan.returning,
2603                };
2604
2605                self.sequence_read_then_write(ctx, read_then_write_plan)
2606                    .await;
2607            }
2608        }
2609    }
2610
2611    /// ReadThenWrite is a plan whose writes depend on the results of a
2612    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2613    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2614    /// serializability violation could occur.
2615    #[instrument]
2616    pub(super) async fn sequence_read_then_write(
2617        &mut self,
2618        mut ctx: ExecuteContext,
2619        plan: plan::ReadThenWritePlan,
2620    ) {
2621        let mut source_ids: BTreeSet<_> = plan
2622            .selection
2623            .depends_on()
2624            .into_iter()
2625            .map(|gid| self.catalog().resolve_item_id(&gid))
2626            .collect();
2627        source_ids.insert(plan.id);
2628
2629        // If the transaction doesn't already have write locks, acquire them.
2630        if ctx.session().transaction().write_locks().is_none() {
2631            // Pre-define all of the locks we need.
2632            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2633
2634            // Try acquiring all of our locks.
2635            for id in &source_ids {
2636                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2637                    write_locks.insert_lock(*id, lock);
2638                }
2639            }
2640
2641            // See if we acquired all of the neccessary locks.
2642            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2643                Ok(locks) => locks,
2644                Err(missing) => {
2645                    // Defer our write if we couldn't acquire all of the locks.
2646                    let role_metadata = ctx.session().role_metadata().clone();
2647                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2648                    let plan = DeferredPlan {
2649                        ctx,
2650                        plan: Plan::ReadThenWrite(plan),
2651                        validity: PlanValidity::new(
2652                            self.catalog.transient_revision(),
2653                            source_ids.clone(),
2654                            None,
2655                            None,
2656                            role_metadata,
2657                        ),
2658                        requires_locks: source_ids,
2659                    };
2660                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2661                }
2662            };
2663
2664            ctx.session_mut()
2665                .try_grant_write_locks(write_locks)
2666                .expect("session has already been granted write locks");
2667        }
2668
2669        let plan::ReadThenWritePlan {
2670            id,
2671            kind,
2672            selection,
2673            mut assignments,
2674            finishing,
2675            mut returning,
2676        } = plan;
2677
2678        // Read then writes can be queued, so re-verify the id exists.
2679        let desc = match self.catalog().try_get_entry(&id) {
2680            Some(table) => {
2681                // Inserts always occur at the latest version of the table.
2682                table
2683                    .relation_desc_latest()
2684                    .expect("table has a desc")
2685                    .into_owned()
2686            }
2687            None => {
2688                ctx.retire(Err(AdapterError::Catalog(
2689                    mz_catalog::memory::error::Error {
2690                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2691                    },
2692                )));
2693                return;
2694            }
2695        };
2696
2697        // Disallow mz_now in any position because read time and write time differ.
2698        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2699            || assignments.values().any(|e| e.contains_temporal())
2700            || returning.iter().any(|e| e.contains_temporal());
2701        if contains_temporal {
2702            ctx.retire(Err(AdapterError::Unsupported(
2703                "calls to mz_now in write statements",
2704            )));
2705            return;
2706        }
2707
2708        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2709        //
2710        // - They do not refer to any objects whose notion of time moves differently than that of
2711        // user tables. This limitation is meant to ensure no writes occur between this read and the
2712        // subsequent write.
2713        // - They do not use mz_now(), whose time produced during read will differ from the write
2714        //   timestamp.
2715        fn validate_read_dependencies(
2716            catalog: &Catalog,
2717            id: &CatalogItemId,
2718        ) -> Result<(), AdapterError> {
2719            use CatalogItemType::*;
2720            use mz_catalog::memory::objects;
2721            let mut ids_to_check = Vec::new();
2722            let valid = match catalog.try_get_entry(id) {
2723                Some(entry) => {
2724                    if let CatalogItem::View(objects::View {
2725                        locally_optimized_expr: optimized_expr,
2726                        ..
2727                    })
2728                    | CatalogItem::MaterializedView(objects::MaterializedView {
2729                        locally_optimized_expr: optimized_expr,
2730                        ..
2731                    }) = entry.item()
2732                    {
2733                        if optimized_expr.contains_temporal() {
2734                            return Err(AdapterError::Unsupported(
2735                                "calls to mz_now in write statements",
2736                            ));
2737                        }
2738                    }
2739                    match entry.item().typ() {
2740                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2741                            ids_to_check.extend(entry.uses());
2742                            let valid_id = id.is_user() || matches!(typ, Func);
2743                            valid_id
2744                        }
2745                        Source | Secret | Connection => false,
2746                        // Cannot select from sinks or indexes.
2747                        Sink | Index => unreachable!(),
2748                        Table => {
2749                            if !id.is_user() {
2750                                // We can't read from non-user tables
2751                                false
2752                            } else {
2753                                // We can't read from tables that are source-exports
2754                                entry.source_export_details().is_none()
2755                            }
2756                        }
2757                        Type => true,
2758                    }
2759                }
2760                None => false,
2761            };
2762            if !valid {
2763                let (object_name, object_type) = match catalog.try_get_entry(id) {
2764                    Some(entry) => {
2765                        let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2766                        let object_type = match entry.item().typ() {
2767                            // We only need the disallowed types here; the allowed types are handled above.
2768                            Source => "source",
2769                            Secret => "secret",
2770                            Connection => "connection",
2771                            Table => {
2772                                if !id.is_user() {
2773                                    "system table"
2774                                } else {
2775                                    "source-export table"
2776                                }
2777                            }
2778                            View => "system view",
2779                            MaterializedView => "system materialized view",
2780                            ContinualTask => "system task",
2781                            _ => "invalid dependency",
2782                        };
2783                        (object_name, object_type.to_string())
2784                    }
2785                    None => (id.to_string(), "unknown".to_string()),
2786                };
2787                return Err(AdapterError::InvalidTableMutationSelection {
2788                    object_name,
2789                    object_type,
2790                });
2791            }
2792            for id in ids_to_check {
2793                validate_read_dependencies(catalog, &id)?;
2794            }
2795            Ok(())
2796        }
2797
2798        for gid in selection.depends_on() {
2799            let item_id = self.catalog().resolve_item_id(&gid);
2800            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2801                ctx.retire(Err(err));
2802                return;
2803            }
2804        }
2805
2806        let (peek_tx, peek_rx) = oneshot::channel();
2807        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2808        let (tx, _, session, extra) = ctx.into_parts();
2809        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2810        // execution context, because this peek does not directly correspond to an execute,
2811        // and so we don't need to take any action on its retirement.
2812        // TODO[btv]: we might consider extending statement logging to log the inner
2813        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2814        // and mint a new "real" execution context here. We'd also have to add some logic to
2815        // make sure such "sub-statements" are always sampled when the top-level statement is
2816        //
2817        // It's debatable whether this makes sense conceptually,
2818        // because the inner fragment here is not actually a
2819        // "statement" in its own right.
2820        let peek_ctx = ExecuteContext::from_parts(
2821            peek_client_tx,
2822            self.internal_cmd_tx.clone(),
2823            session,
2824            Default::default(),
2825        );
2826
2827        self.sequence_peek(
2828            peek_ctx,
2829            plan::SelectPlan {
2830                select: None,
2831                source: selection,
2832                when: QueryWhen::FreshestTableWrite,
2833                finishing,
2834                copy_to: None,
2835            },
2836            TargetCluster::Active,
2837            None,
2838        )
2839        .await;
2840
2841        let internal_cmd_tx = self.internal_cmd_tx.clone();
2842        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2843        let catalog = self.owned_catalog();
2844        let max_result_size = self.catalog().system_config().max_result_size();
2845
2846        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2847            let (peek_response, session) = match peek_rx.await {
2848                Ok(Response {
2849                    result: Ok(resp),
2850                    session,
2851                    otel_ctx,
2852                }) => {
2853                    otel_ctx.attach_as_parent();
2854                    (resp, session)
2855                }
2856                Ok(Response {
2857                    result: Err(e),
2858                    session,
2859                    otel_ctx,
2860                }) => {
2861                    let ctx =
2862                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2863                    otel_ctx.attach_as_parent();
2864                    ctx.retire(Err(e));
2865                    return;
2866                }
2867                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2868                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2869            };
2870            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2871            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2872
2873            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2874            if timeout_dur == Duration::ZERO {
2875                timeout_dur = Duration::MAX;
2876            }
2877
2878            let style = ExprPrepOneShot {
2879                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2880                session: ctx.session(),
2881                catalog_state: catalog.state(),
2882            };
2883            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2884                return_if_err!(style.prep_scalar_expr(expr), ctx);
2885            }
2886
2887            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2888                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2889                    let arena = RowArena::new();
2890                    let mut diffs = Vec::new();
2891                    let mut datum_vec = mz_repr::DatumVec::new();
2892
2893                    while let Some(row) = rows.next() {
2894                        if !assignments.is_empty() {
2895                            assert!(
2896                                matches!(kind, MutationKind::Update),
2897                                "only updates support assignments"
2898                            );
2899                            let mut datums = datum_vec.borrow_with(row);
2900                            let mut updates = vec![];
2901                            for (idx, expr) in &assignments {
2902                                let updated = match expr.eval(&datums, &arena) {
2903                                    Ok(updated) => updated,
2904                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2905                                };
2906                                updates.push((*idx, updated));
2907                            }
2908                            for (idx, new_value) in updates {
2909                                datums[idx] = new_value;
2910                            }
2911                            let updated = Row::pack_slice(&datums);
2912                            diffs.push((updated, Diff::ONE));
2913                        }
2914                        match kind {
2915                            // Updates and deletes always remove the
2916                            // current row. Updates will also add an
2917                            // updated value.
2918                            MutationKind::Update | MutationKind::Delete => {
2919                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2920                            }
2921                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2922                        }
2923                    }
2924
2925                    // Sum of all the rows' byte size, for checking if we go
2926                    // above the max_result_size threshold.
2927                    let mut byte_size: u64 = 0;
2928                    for (row, diff) in &diffs {
2929                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2930                        if diff.is_positive() {
2931                            for (idx, datum) in row.iter().enumerate() {
2932                                desc.constraints_met(idx, &datum)?;
2933                            }
2934                        }
2935                    }
2936                    Ok((diffs, byte_size))
2937                };
2938
2939            let diffs = match peek_response {
2940                ExecuteResponse::SendingRowsStreaming {
2941                    rows: mut rows_stream,
2942                    ..
2943                } => {
2944                    let mut byte_size: u64 = 0;
2945                    let mut diffs = Vec::new();
2946                    let result = loop {
2947                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2948                            Ok(Some(res)) => match res {
2949                                PeekResponseUnary::Rows(new_rows) => {
2950                                    match make_diffs(new_rows) {
2951                                        Ok((mut new_diffs, new_byte_size)) => {
2952                                            byte_size = byte_size.saturating_add(new_byte_size);
2953                                            if byte_size > max_result_size {
2954                                                break Err(AdapterError::ResultSize(format!(
2955                                                    "result exceeds max size of {max_result_size}"
2956                                                )));
2957                                            }
2958                                            diffs.append(&mut new_diffs)
2959                                        }
2960                                        Err(e) => break Err(e),
2961                                    };
2962                                }
2963                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2964                                PeekResponseUnary::Error(e) => {
2965                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2966                                }
2967                            },
2968                            Ok(None) => break Ok(diffs),
2969                            Err(_) => {
2970                                // We timed out, so remove the pending peek. This is
2971                                // best-effort and doesn't guarantee we won't
2972                                // receive a response.
2973                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2974                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2975                                    conn_id: ctx.session().conn_id().clone(),
2976                                });
2977                                if let Err(e) = result {
2978                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2979                                }
2980                                break Err(AdapterError::StatementTimeout);
2981                            }
2982                        }
2983                    };
2984
2985                    result
2986                }
2987                ExecuteResponse::SendingRowsImmediate { rows } => {
2988                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2989                }
2990                resp => Err(AdapterError::Unstructured(anyhow!(
2991                    "unexpected peek response: {resp:?}"
2992                ))),
2993            };
2994
2995            let mut returning_rows = Vec::new();
2996            let mut diff_err: Option<AdapterError> = None;
2997            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2998                let arena = RowArena::new();
2999                for (row, diff) in diffs {
3000                    if !diff.is_positive() {
3001                        continue;
3002                    }
3003                    let mut returning_row = Row::with_capacity(returning.len());
3004                    let mut packer = returning_row.packer();
3005                    for expr in &returning {
3006                        let datums: Vec<_> = row.iter().collect();
3007                        match expr.eval(&datums, &arena) {
3008                            Ok(datum) => {
3009                                packer.push(datum);
3010                            }
3011                            Err(err) => {
3012                                diff_err = Some(err.into());
3013                                break;
3014                            }
3015                        }
3016                    }
3017                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3018                    let diff = match NonZeroUsize::try_from(diff) {
3019                        Ok(diff) => diff,
3020                        Err(err) => {
3021                            diff_err = Some(err.into());
3022                            break;
3023                        }
3024                    };
3025                    returning_rows.push((returning_row, diff));
3026                    if diff_err.is_some() {
3027                        break;
3028                    }
3029                }
3030            }
3031            let diffs = if let Some(err) = diff_err {
3032                Err(err)
3033            } else {
3034                diffs
3035            };
3036
3037            // We need to clear out the timestamp context so the write doesn't fail due to a
3038            // read only transaction.
3039            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3040            // No matter what isolation level the client is using, we must linearize this
3041            // read. The write will be performed right after this, as part of a single
3042            // transaction, so the write must have a timestamp greater than or equal to the
3043            // read.
3044            //
3045            // Note: It's only OK for the write to have a greater timestamp than the read
3046            // because the write lock prevents any other writes from happening in between
3047            // the read and write.
3048            if let Some(timestamp_context) = timestamp_context {
3049                let (tx, rx) = tokio::sync::oneshot::channel();
3050                let conn_id = ctx.session().conn_id().clone();
3051                let pending_read_txn = PendingReadTxn {
3052                    txn: PendingRead::ReadThenWrite { ctx, tx },
3053                    timestamp_context,
3054                    created: Instant::now(),
3055                    num_requeues: 0,
3056                    otel_ctx: OpenTelemetryContext::obtain(),
3057                };
3058                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3059                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3060                if let Err(e) = result {
3061                    warn!(
3062                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3063                        e
3064                    );
3065                    return;
3066                }
3067                let result = rx.await;
3068                // It is not an error for these results to be ready after `tx` has been dropped.
3069                ctx = match result {
3070                    Ok(Some(ctx)) => ctx,
3071                    Ok(None) => {
3072                        // Coordinator took our context and will handle responding to the client.
3073                        // This usually indicates that our transaction was aborted.
3074                        return;
3075                    }
3076                    Err(e) => {
3077                        warn!(
3078                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3079                            e
3080                        );
3081                        return;
3082                    }
3083                };
3084            }
3085
3086            match diffs {
3087                Ok(diffs) => {
3088                    let result = Self::send_diffs(
3089                        ctx.session_mut(),
3090                        plan::SendDiffsPlan {
3091                            id,
3092                            updates: diffs,
3093                            kind,
3094                            returning: returning_rows,
3095                            max_result_size,
3096                        },
3097                    );
3098                    ctx.retire(result);
3099                }
3100                Err(e) => {
3101                    ctx.retire(Err(e));
3102                }
3103            }
3104        });
3105    }
3106
3107    #[instrument]
3108    pub(super) async fn sequence_alter_item_rename(
3109        &mut self,
3110        ctx: &mut ExecuteContext,
3111        plan: plan::AlterItemRenamePlan,
3112    ) -> Result<ExecuteResponse, AdapterError> {
3113        let op = catalog::Op::RenameItem {
3114            id: plan.id,
3115            current_full_name: plan.current_full_name,
3116            to_name: plan.to_name,
3117        };
3118        match self
3119            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3120            .await
3121        {
3122            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3123            Err(err) => Err(err),
3124        }
3125    }
3126
3127    #[instrument]
3128    pub(super) async fn sequence_alter_retain_history(
3129        &mut self,
3130        ctx: &mut ExecuteContext,
3131        plan: plan::AlterRetainHistoryPlan,
3132    ) -> Result<ExecuteResponse, AdapterError> {
3133        soft_assert_or_log!(
3134            !matches!(
3135                self.catalog().get_entry(&plan.id).item(),
3136                CatalogItem::ContinualTask(_)
3137            ),
3138            "RETAIN HISTORY is not supported on continual tasks"
3139        );
3140        let ops = vec![catalog::Op::AlterRetainHistory {
3141            id: plan.id,
3142            value: plan.value,
3143            window: plan.window,
3144        }];
3145        self.catalog_transact_with_context(None, Some(ctx), ops)
3146            .await?;
3147        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3148    }
3149
3150    #[instrument]
3151    pub(super) async fn sequence_alter_source_timestamp_interval(
3152        &mut self,
3153        ctx: &mut ExecuteContext,
3154        plan: plan::AlterSourceTimestampIntervalPlan,
3155    ) -> Result<ExecuteResponse, AdapterError> {
3156        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3157            id: plan.id,
3158            value: plan.value,
3159            interval: plan.interval,
3160        }];
3161        self.catalog_transact_with_context(None, Some(ctx), ops)
3162            .await?;
3163        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3164    }
3165
3166    #[instrument]
3167    pub(super) async fn sequence_alter_schema_rename(
3168        &mut self,
3169        ctx: &mut ExecuteContext,
3170        plan: plan::AlterSchemaRenamePlan,
3171    ) -> Result<ExecuteResponse, AdapterError> {
3172        let (database_spec, schema_spec) = plan.cur_schema_spec;
3173        let op = catalog::Op::RenameSchema {
3174            database_spec,
3175            schema_spec,
3176            new_name: plan.new_schema_name,
3177            check_reserved_names: true,
3178        };
3179        match self
3180            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3181            .await
3182        {
3183            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3184            Err(err) => Err(err),
3185        }
3186    }
3187
3188    #[instrument]
3189    pub(super) async fn sequence_alter_schema_swap(
3190        &mut self,
3191        ctx: &mut ExecuteContext,
3192        plan: plan::AlterSchemaSwapPlan,
3193    ) -> Result<ExecuteResponse, AdapterError> {
3194        let plan::AlterSchemaSwapPlan {
3195            schema_a_spec: (schema_a_db, schema_a),
3196            schema_a_name,
3197            schema_b_spec: (schema_b_db, schema_b),
3198            schema_b_name,
3199            name_temp,
3200        } = plan;
3201
3202        let op_a = catalog::Op::RenameSchema {
3203            database_spec: schema_a_db,
3204            schema_spec: schema_a,
3205            new_name: name_temp,
3206            check_reserved_names: false,
3207        };
3208        let op_b = catalog::Op::RenameSchema {
3209            database_spec: schema_b_db,
3210            schema_spec: schema_b,
3211            new_name: schema_a_name,
3212            check_reserved_names: false,
3213        };
3214        let op_c = catalog::Op::RenameSchema {
3215            database_spec: schema_a_db,
3216            schema_spec: schema_a,
3217            new_name: schema_b_name,
3218            check_reserved_names: false,
3219        };
3220
3221        match self
3222            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3223                Box::pin(async {})
3224            })
3225            .await
3226        {
3227            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3228            Err(err) => Err(err),
3229        }
3230    }
3231
3232    #[instrument]
3233    pub(super) async fn sequence_alter_role(
3234        &mut self,
3235        session: &Session,
3236        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3237    ) -> Result<ExecuteResponse, AdapterError> {
3238        let catalog = self.catalog().for_session(session);
3239        let role = catalog.get_role(&id);
3240
3241        // We'll send these notices to the user, if the operation is successful.
3242        let mut notices = vec![];
3243
3244        // Get the attributes and variables from the role, as they currently are.
3245        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3246        let mut vars = role.vars().clone();
3247
3248        // Whether to set the password to NULL. This is a special case since the existing
3249        // password is not stored in the role attributes.
3250        let mut nopassword = false;
3251
3252        // Apply our updates.
3253        match option {
3254            PlannedAlterRoleOption::Attributes(attrs) => {
3255                self.validate_role_attributes(&attrs.clone().into())?;
3256
3257                if let Some(inherit) = attrs.inherit {
3258                    attributes.inherit = inherit;
3259                }
3260
3261                if let Some(password) = attrs.password {
3262                    attributes.password = Some(password);
3263                    attributes.scram_iterations =
3264                        Some(self.catalog().system_config().scram_iterations())
3265                }
3266
3267                if let Some(superuser) = attrs.superuser {
3268                    attributes.superuser = Some(superuser);
3269                }
3270
3271                if let Some(login) = attrs.login {
3272                    attributes.login = Some(login);
3273                }
3274
3275                if attrs.nopassword.unwrap_or(false) {
3276                    nopassword = true;
3277                }
3278
3279                if let Some(notice) = self.should_emit_rbac_notice(session) {
3280                    notices.push(notice);
3281                }
3282            }
3283            PlannedAlterRoleOption::Variable(variable) => {
3284                // Get the variable to make sure it's valid and visible.
3285                let session_var = session.vars().inspect(variable.name())?;
3286                // Return early if it's not visible.
3287                session_var.visible(session.user(), catalog.system_vars())?;
3288
3289                // Emit a warning when deprecated variables are used.
3290                // TODO(database-issues#8069) remove this after sufficient time has passed
3291                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3292                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3293                } else if let PlannedRoleVariable::Set {
3294                    name,
3295                    value: VariableValue::Values(vals),
3296                } = &variable
3297                {
3298                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3299                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3300                    }
3301                }
3302
3303                let var_name = match variable {
3304                    PlannedRoleVariable::Set { name, value } => {
3305                        // Update our persisted set.
3306                        match &value {
3307                            VariableValue::Default => {
3308                                vars.remove(&name);
3309                            }
3310                            VariableValue::Values(vals) => {
3311                                let var = match &vals[..] {
3312                                    [val] => OwnedVarInput::Flat(val.clone()),
3313                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3314                                };
3315                                // Make sure the input is valid.
3316                                session_var.check(var.borrow())?;
3317
3318                                vars.insert(name.clone(), var);
3319                            }
3320                        };
3321                        name
3322                    }
3323                    PlannedRoleVariable::Reset { name } => {
3324                        // Remove it from our persisted values.
3325                        vars.remove(&name);
3326                        name
3327                    }
3328                };
3329
3330                // Emit a notice that they need to reconnect to see the change take effect.
3331                notices.push(AdapterNotice::VarDefaultUpdated {
3332                    role: Some(name.clone()),
3333                    var_name: Some(var_name),
3334                });
3335            }
3336        }
3337
3338        let op = catalog::Op::AlterRole {
3339            id,
3340            name,
3341            attributes,
3342            nopassword,
3343            vars: RoleVars { map: vars },
3344        };
3345        let response = self
3346            .catalog_transact(Some(session), vec![op])
3347            .await
3348            .map(|_| ExecuteResponse::AlteredRole)?;
3349
3350        // Send all of our queued notices.
3351        session.add_notices(notices);
3352
3353        Ok(response)
3354    }
3355
3356    #[instrument]
3357    pub(super) async fn sequence_alter_sink_prepare(
3358        &mut self,
3359        ctx: ExecuteContext,
3360        plan: plan::AlterSinkPlan,
3361    ) {
3362        // Put a read hold on the new relation
3363        let id_bundle = crate::CollectionIdBundle {
3364            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3365            compute_ids: BTreeMap::new(),
3366        };
3367        let read_hold = self.acquire_read_holds(&id_bundle);
3368
3369        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3370            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3371            return;
3372        };
3373
3374        let otel_ctx = OpenTelemetryContext::obtain();
3375        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3376
3377        let plan_validity = PlanValidity::new(
3378            self.catalog().transient_revision(),
3379            BTreeSet::from_iter([plan.item_id, from_item_id]),
3380            Some(plan.in_cluster),
3381            None,
3382            ctx.session().role_metadata().clone(),
3383        );
3384
3385        info!(
3386            "preparing alter sink for {}: frontiers={:?} export={:?}",
3387            plan.global_id,
3388            self.controller
3389                .storage_collections
3390                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3391            self.controller.storage.export(plan.global_id)
3392        );
3393
3394        // Now we must wait for the sink to make enough progress such that there is overlap between
3395        // the new `from` collection's read hold and the sink's write frontier.
3396        //
3397        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3398        // the watch set never completes and neither does the `ALTER SINK` command.
3399        self.install_storage_watch_set(
3400            ctx.session().conn_id().clone(),
3401            BTreeSet::from_iter([plan.global_id]),
3402            read_ts,
3403            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3404                ctx: Some(ctx),
3405                otel_ctx,
3406                plan,
3407                plan_validity,
3408                read_hold,
3409            }),
3410        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3411    }
3412
3413    #[instrument]
3414    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3415        ctx.otel_ctx.attach_as_parent();
3416
3417        let plan::AlterSinkPlan {
3418            item_id,
3419            global_id,
3420            sink: sink_plan,
3421            with_snapshot,
3422            in_cluster,
3423        } = ctx.plan.clone();
3424
3425        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3426        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3427        // arbitrarily changed since we performed planning, and we must re-assert that it still
3428        // matches our requirements.
3429        //
3430        // The `PlanValidity` check ensures that both the sink and the new source relation still
3431        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3432        // time, which we do by comparing the catalog sink version to the one in the plan.
3433        match ctx.plan_validity.check(self.catalog()) {
3434            Ok(()) => {}
3435            Err(err) => {
3436                ctx.retire(Err(err));
3437                return;
3438            }
3439        }
3440
3441        let entry = self.catalog().get_entry(&item_id);
3442        let CatalogItem::Sink(old_sink) = entry.item() else {
3443            panic!("invalid item kind for `AlterSinkPlan`");
3444        };
3445
3446        if sink_plan.version != old_sink.version + 1 {
3447            ctx.retire(Err(AdapterError::ChangedPlan(
3448                "sink was altered concurrently".into(),
3449            )));
3450            return;
3451        }
3452
3453        info!(
3454            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3455            self.controller
3456                .storage_collections
3457                .collections_frontiers(vec![global_id, sink_plan.from]),
3458            self.controller.storage.export(global_id),
3459        );
3460
3461        // Assert that we can recover the updates that happened at the timestamps of the write
3462        // frontier. This must be true in this call.
3463        let write_frontier = &self
3464            .controller
3465            .storage
3466            .export(global_id)
3467            .expect("sink known to exist")
3468            .write_frontier;
3469        let as_of = ctx.read_hold.least_valid_read();
3470        assert!(
3471            write_frontier.iter().all(|t| as_of.less_than(t)),
3472            "{:?} should be strictly less than {:?}",
3473            &*as_of,
3474            &**write_frontier
3475        );
3476
3477        // Parse the `create_sql` so we can update it to the new sink definition.
3478        //
3479        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3480        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3481        // names in the `create_sql` may have changed, for example due to a schema swap.
3482        let create_sql = &old_sink.create_sql;
3483        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3484        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3485            unreachable!("invalid statement kind for sink");
3486        };
3487
3488        // Update the sink version.
3489        stmt.with_options
3490            .retain(|o| o.name != CreateSinkOptionName::Version);
3491        stmt.with_options.push(CreateSinkOption {
3492            name: CreateSinkOptionName::Version,
3493            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3494                sink_plan.version.to_string(),
3495            ))),
3496        });
3497
3498        let conn_catalog = self.catalog().for_system_session();
3499        let (mut stmt, resolved_ids) =
3500            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3501
3502        // Update the `from` relation.
3503        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3504        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3505        stmt.from = ResolvedItemName::Item {
3506            id: from_entry.id(),
3507            qualifiers: from_entry.name.qualifiers.clone(),
3508            full_name,
3509            print_id: true,
3510            version: from_entry.version,
3511        };
3512
3513        let new_sink = Sink {
3514            create_sql: stmt.to_ast_string_stable(),
3515            global_id,
3516            from: sink_plan.from,
3517            connection: sink_plan.connection.clone(),
3518            envelope: sink_plan.envelope,
3519            version: sink_plan.version,
3520            with_snapshot,
3521            resolved_ids: resolved_ids.clone(),
3522            cluster_id: in_cluster,
3523            commit_interval: sink_plan.commit_interval,
3524        };
3525
3526        let ops = vec![catalog::Op::UpdateItem {
3527            id: item_id,
3528            name: entry.name().clone(),
3529            to_item: CatalogItem::Sink(new_sink),
3530        }];
3531
3532        match self
3533            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3534            .await
3535        {
3536            Ok(()) => {}
3537            Err(err) => {
3538                ctx.retire(Err(err));
3539                return;
3540            }
3541        }
3542
3543        let storage_sink_desc = StorageSinkDesc {
3544            from: sink_plan.from,
3545            from_desc: from_entry
3546                .relation_desc()
3547                .expect("sinks can only be built on items with descs")
3548                .into_owned(),
3549            connection: sink_plan
3550                .connection
3551                .clone()
3552                .into_inline_connection(self.catalog().state()),
3553            envelope: sink_plan.envelope,
3554            as_of,
3555            with_snapshot,
3556            version: sink_plan.version,
3557            from_storage_metadata: (),
3558            to_storage_metadata: (),
3559            commit_interval: sink_plan.commit_interval,
3560        };
3561
3562        self.controller
3563            .storage
3564            .alter_export(
3565                global_id,
3566                ExportDescription {
3567                    sink: storage_sink_desc,
3568                    instance_id: in_cluster,
3569                },
3570            )
3571            .await
3572            .unwrap_or_terminate("cannot fail to alter source desc");
3573
3574        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3575    }
3576
3577    #[instrument]
3578    pub(super) async fn sequence_alter_connection(
3579        &mut self,
3580        ctx: ExecuteContext,
3581        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3582    ) {
3583        match action {
3584            AlterConnectionAction::RotateKeys => {
3585                self.sequence_rotate_keys(ctx, id).await;
3586            }
3587            AlterConnectionAction::AlterOptions {
3588                set_options,
3589                drop_options,
3590                validate,
3591            } => {
3592                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3593                    .await
3594            }
3595        }
3596    }
3597
3598    #[instrument]
3599    async fn sequence_alter_connection_options(
3600        &mut self,
3601        mut ctx: ExecuteContext,
3602        id: CatalogItemId,
3603        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3604        drop_options: BTreeSet<ConnectionOptionName>,
3605        validate: bool,
3606    ) {
3607        let cur_entry = self.catalog().get_entry(&id);
3608        let cur_conn = cur_entry.connection().expect("known to be connection");
3609        let connection_gid = cur_conn.global_id();
3610
3611        let inner = || -> Result<Connection, AdapterError> {
3612            // Parse statement.
3613            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3614                .expect("invalid create sql persisted to catalog")
3615                .into_element()
3616                .ast
3617            {
3618                Statement::CreateConnection(stmt) => stmt,
3619                _ => unreachable!("proved type is source"),
3620            };
3621
3622            let catalog = self.catalog().for_system_session();
3623
3624            // Resolve items in statement
3625            let (mut create_conn_stmt, resolved_ids) =
3626                mz_sql::names::resolve(&catalog, create_conn_stmt)
3627                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3628
3629            // Retain options that are neither set nor dropped.
3630            create_conn_stmt
3631                .values
3632                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3633
3634            // Set new values
3635            create_conn_stmt.values.extend(
3636                set_options
3637                    .into_iter()
3638                    .map(|(name, value)| ConnectionOption { name, value }),
3639            );
3640
3641            // Open a new catalog, which we will use to re-plan our
3642            // statement with the desired config.
3643            let mut catalog = self.catalog().for_system_session();
3644            catalog.mark_id_unresolvable_for_replanning(id);
3645
3646            // Re-define our source in terms of the amended statement
3647            let plan = match mz_sql::plan::plan(
3648                None,
3649                &catalog,
3650                Statement::CreateConnection(create_conn_stmt),
3651                &Params::empty(),
3652                &resolved_ids,
3653            )
3654            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3655            {
3656                Plan::CreateConnection(plan) => plan,
3657                _ => unreachable!("create source plan is only valid response"),
3658            };
3659
3660            // Parse statement.
3661            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3662                .expect("invalid create sql persisted to catalog")
3663                .into_element()
3664                .ast
3665            {
3666                Statement::CreateConnection(stmt) => stmt,
3667                _ => unreachable!("proved type is source"),
3668            };
3669
3670            let catalog = self.catalog().for_system_session();
3671
3672            // Resolve items in statement
3673            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3674                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3675
3676            Ok(Connection {
3677                create_sql: plan.connection.create_sql,
3678                global_id: cur_conn.global_id,
3679                details: plan.connection.details,
3680                resolved_ids: new_deps,
3681            })
3682        };
3683
3684        let conn = match inner() {
3685            Ok(conn) => conn,
3686            Err(e) => {
3687                return ctx.retire(Err(e));
3688            }
3689        };
3690
3691        if validate {
3692            let connection = conn
3693                .details
3694                .to_connection()
3695                .into_inline_connection(self.catalog().state());
3696
3697            let internal_cmd_tx = self.internal_cmd_tx.clone();
3698            let transient_revision = self.catalog().transient_revision();
3699            let conn_id = ctx.session().conn_id().clone();
3700            let otel_ctx = OpenTelemetryContext::obtain();
3701            let role_metadata = ctx.session().role_metadata().clone();
3702            let current_storage_parameters = self.controller.storage.config().clone();
3703
3704            task::spawn(
3705                || format!("validate_alter_connection:{conn_id}"),
3706                async move {
3707                    let resolved_ids = conn.resolved_ids.clone();
3708                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3709                    let result = match std::panic::AssertUnwindSafe(
3710                        connection.validate(id, &current_storage_parameters),
3711                    )
3712                    .ore_catch_unwind()
3713                    .await
3714                    {
3715                        Ok(Ok(())) => Ok(conn),
3716                        Ok(Err(err)) => Err(err.into()),
3717                        Err(_panic) => {
3718                            tracing::error!("alter connection validation panicked");
3719                            Err(AdapterError::Internal(
3720                                "connection validation panicked".into(),
3721                            ))
3722                        }
3723                    };
3724
3725                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3726                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3727                        AlterConnectionValidationReady {
3728                            ctx,
3729                            result,
3730                            connection_id: id,
3731                            connection_gid,
3732                            plan_validity: PlanValidity::new(
3733                                transient_revision,
3734                                dependency_ids.clone(),
3735                                None,
3736                                None,
3737                                role_metadata,
3738                            ),
3739                            otel_ctx,
3740                            resolved_ids,
3741                        },
3742                    ));
3743                    if let Err(e) = result {
3744                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3745                    }
3746                },
3747            );
3748        } else {
3749            let result = self
3750                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3751                .await;
3752            ctx.retire(result);
3753        }
3754    }
3755
3756    #[instrument]
3757    pub(crate) async fn sequence_alter_connection_stage_finish(
3758        &mut self,
3759        session: &Session,
3760        id: CatalogItemId,
3761        connection: Connection,
3762    ) -> Result<ExecuteResponse, AdapterError> {
3763        match self.catalog.get_entry(&id).item() {
3764            CatalogItem::Connection(curr_conn) => {
3765                curr_conn
3766                    .details
3767                    .to_connection()
3768                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3769                    .map_err(StorageError::from)?;
3770            }
3771            _ => unreachable!("known to be a connection"),
3772        };
3773
3774        let ops = vec![catalog::Op::UpdateItem {
3775            id,
3776            name: self.catalog.get_entry(&id).name().clone(),
3777            to_item: CatalogItem::Connection(connection.clone()),
3778        }];
3779
3780        self.catalog_transact(Some(session), ops).await?;
3781
3782        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3783        // and propagating connection changes to dependent sources, sinks, and
3784        // tables) is handled in `apply_catalog_implications` via
3785        // `handle_alter_connection`. The catalog transact above triggers that
3786        // code path.
3787
3788        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3789    }
3790
3791    #[instrument]
3792    pub(super) async fn sequence_alter_source(
3793        &mut self,
3794        session: &Session,
3795        plan::AlterSourcePlan {
3796            item_id,
3797            ingestion_id,
3798            action,
3799        }: plan::AlterSourcePlan,
3800    ) -> Result<ExecuteResponse, AdapterError> {
3801        let cur_entry = self.catalog().get_entry(&item_id);
3802        let cur_source = cur_entry.source().expect("known to be source");
3803
3804        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3805            // Parse statement.
3806            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3807                .expect("invalid create sql persisted to catalog")
3808                .into_element()
3809                .ast
3810            {
3811                Statement::CreateSource(stmt) => stmt,
3812                _ => unreachable!("proved type is source"),
3813            };
3814
3815            let catalog = coord.catalog().for_system_session();
3816
3817            // Resolve items in statement
3818            mz_sql::names::resolve(&catalog, create_source_stmt)
3819                .map_err(|e| AdapterError::internal(err_cx, e))
3820        };
3821
3822        match action {
3823            plan::AlterSourceAction::AddSubsourceExports {
3824                subsources,
3825                options,
3826            } => {
3827                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3828
3829                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3830                    text_columns: mut new_text_columns,
3831                    exclude_columns: mut new_exclude_columns,
3832                    ..
3833                } = options.try_into()?;
3834
3835                // Resolve items in statement
3836                let (mut create_source_stmt, resolved_ids) =
3837                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3838
3839                // Get all currently referred-to items
3840                let catalog = self.catalog();
3841                let curr_references: BTreeSet<_> = catalog
3842                    .get_entry(&item_id)
3843                    .used_by()
3844                    .into_iter()
3845                    .filter_map(|subsource| {
3846                        catalog
3847                            .get_entry(subsource)
3848                            .subsource_details()
3849                            .map(|(_id, reference, _details)| reference)
3850                    })
3851                    .collect();
3852
3853                // We are doing a lot of unwrapping, so just make an error to reference; all of
3854                // these invariants are guaranteed to be true because of how we plan subsources.
3855                let purification_err =
3856                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3857
3858                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3859                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3860                // statement.
3861                match &mut create_source_stmt.connection {
3862                    CreateSourceConnection::Postgres {
3863                        options: curr_options,
3864                        ..
3865                    } => {
3866                        let mz_sql::plan::PgConfigOptionExtracted {
3867                            mut text_columns, ..
3868                        } = curr_options.clone().try_into()?;
3869
3870                        // Drop text columns; we will add them back in
3871                        // as appropriate below.
3872                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3873
3874                        // Drop all text columns that are not currently referred to.
3875                        text_columns.retain(|column_qualified_reference| {
3876                            mz_ore::soft_assert_eq_or_log!(
3877                                column_qualified_reference.0.len(),
3878                                4,
3879                                "all TEXT COLUMNS values must be column-qualified references"
3880                            );
3881                            let mut table = column_qualified_reference.clone();
3882                            table.0.truncate(3);
3883                            curr_references.contains(&table)
3884                        });
3885
3886                        // Merge the current text columns into the new text columns.
3887                        new_text_columns.extend(text_columns);
3888
3889                        // If we have text columns, add them to the options.
3890                        if !new_text_columns.is_empty() {
3891                            new_text_columns.sort();
3892                            let new_text_columns = new_text_columns
3893                                .into_iter()
3894                                .map(WithOptionValue::UnresolvedItemName)
3895                                .collect();
3896
3897                            curr_options.push(PgConfigOption {
3898                                name: PgConfigOptionName::TextColumns,
3899                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3900                            });
3901                        }
3902                    }
3903                    CreateSourceConnection::MySql {
3904                        options: curr_options,
3905                        ..
3906                    } => {
3907                        let mz_sql::plan::MySqlConfigOptionExtracted {
3908                            mut text_columns,
3909                            mut exclude_columns,
3910                            ..
3911                        } = curr_options.clone().try_into()?;
3912
3913                        // Drop both ignore and text columns; we will add them back in
3914                        // as appropriate below.
3915                        curr_options.retain(|o| {
3916                            !matches!(
3917                                o.name,
3918                                MySqlConfigOptionName::TextColumns
3919                                    | MySqlConfigOptionName::ExcludeColumns
3920                            )
3921                        });
3922
3923                        // Drop all text / exclude columns that are not currently referred to.
3924                        let column_referenced =
3925                            |column_qualified_reference: &UnresolvedItemName| {
3926                                mz_ore::soft_assert_eq_or_log!(
3927                                    column_qualified_reference.0.len(),
3928                                    3,
3929                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3930                                );
3931                                let mut table = column_qualified_reference.clone();
3932                                table.0.truncate(2);
3933                                curr_references.contains(&table)
3934                            };
3935                        text_columns.retain(column_referenced);
3936                        exclude_columns.retain(column_referenced);
3937
3938                        // Merge the current text / exclude columns into the new text / exclude columns.
3939                        new_text_columns.extend(text_columns);
3940                        new_exclude_columns.extend(exclude_columns);
3941
3942                        // If we have text columns, add them to the options.
3943                        if !new_text_columns.is_empty() {
3944                            new_text_columns.sort();
3945                            let new_text_columns = new_text_columns
3946                                .into_iter()
3947                                .map(WithOptionValue::UnresolvedItemName)
3948                                .collect();
3949
3950                            curr_options.push(MySqlConfigOption {
3951                                name: MySqlConfigOptionName::TextColumns,
3952                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3953                            });
3954                        }
3955                        // If we have exclude columns, add them to the options.
3956                        if !new_exclude_columns.is_empty() {
3957                            new_exclude_columns.sort();
3958                            let new_exclude_columns = new_exclude_columns
3959                                .into_iter()
3960                                .map(WithOptionValue::UnresolvedItemName)
3961                                .collect();
3962
3963                            curr_options.push(MySqlConfigOption {
3964                                name: MySqlConfigOptionName::ExcludeColumns,
3965                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3966                            });
3967                        }
3968                    }
3969                    CreateSourceConnection::SqlServer {
3970                        options: curr_options,
3971                        ..
3972                    } => {
3973                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3974                            mut text_columns,
3975                            mut exclude_columns,
3976                            ..
3977                        } = curr_options.clone().try_into()?;
3978
3979                        // Drop both ignore and text columns; we will add them back in
3980                        // as appropriate below.
3981                        curr_options.retain(|o| {
3982                            !matches!(
3983                                o.name,
3984                                SqlServerConfigOptionName::TextColumns
3985                                    | SqlServerConfigOptionName::ExcludeColumns
3986                            )
3987                        });
3988
3989                        // Drop all text / exclude columns that are not currently referred to.
3990                        // SQL Server text/exclude column refs are 3-part (schema.table.col),
3991                        // which truncate to 2-part (schema.table). But external references
3992                        // are 3-part (database.schema.table). Use suffix matching since
3993                        // a SQL Server source connects to a single database.
3994                        let column_referenced =
3995                            |column_qualified_reference: &UnresolvedItemName| {
3996                                mz_ore::soft_assert_eq_or_log!(
3997                                    column_qualified_reference.0.len(),
3998                                    3,
3999                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4000                                );
4001                                let mut table = column_qualified_reference.clone();
4002                                table.0.truncate(2);
4003                                curr_references.iter().any(|r| r.0.ends_with(&table.0))
4004                            };
4005                        text_columns.retain(column_referenced);
4006                        exclude_columns.retain(column_referenced);
4007
4008                        // Merge the current text / exclude columns into the new text / exclude columns.
4009                        new_text_columns.extend(text_columns);
4010                        new_exclude_columns.extend(exclude_columns);
4011
4012                        // If we have text columns, add them to the options.
4013                        if !new_text_columns.is_empty() {
4014                            new_text_columns.sort();
4015                            let new_text_columns = new_text_columns
4016                                .into_iter()
4017                                .map(WithOptionValue::UnresolvedItemName)
4018                                .collect();
4019
4020                            curr_options.push(SqlServerConfigOption {
4021                                name: SqlServerConfigOptionName::TextColumns,
4022                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4023                            });
4024                        }
4025                        // If we have exclude columns, add them to the options.
4026                        if !new_exclude_columns.is_empty() {
4027                            new_exclude_columns.sort();
4028                            let new_exclude_columns = new_exclude_columns
4029                                .into_iter()
4030                                .map(WithOptionValue::UnresolvedItemName)
4031                                .collect();
4032
4033                            curr_options.push(SqlServerConfigOption {
4034                                name: SqlServerConfigOptionName::ExcludeColumns,
4035                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4036                            });
4037                        }
4038                    }
4039                    _ => return Err(purification_err()),
4040                };
4041
4042                let mut catalog = self.catalog().for_system_session();
4043                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4044
4045                // Re-define our source in terms of the amended statement
4046                let planned = mz_sql::plan::plan(
4047                    None,
4048                    &catalog,
4049                    Statement::CreateSource(create_source_stmt),
4050                    &Params::empty(),
4051                    &resolved_ids,
4052                )
4053                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4054                let plan = match planned {
4055                    Plan::CreateSource(plan) => plan,
4056                    _ => unreachable!("create source plan is only valid response"),
4057                };
4058
4059                // Asserting that we've done the right thing with dependencies
4060                // here requires mocking out objects in the catalog, which is a
4061                // large task for an operation we have to cover in tests anyway.
4062                let source = Source::new(
4063                    plan,
4064                    cur_source.global_id,
4065                    resolved_ids,
4066                    cur_source.custom_logical_compaction_window,
4067                    cur_source.is_retained_metrics_object,
4068                );
4069
4070                // Get new ingestion description for storage.
4071                let desc = match &source.data_source {
4072                    DataSourceDesc::Ingestion { desc, .. }
4073                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4074                        desc.clone().into_inline_connection(self.catalog().state())
4075                    }
4076                    _ => unreachable!("already verified of type ingestion"),
4077                };
4078
4079                self.controller
4080                    .storage
4081                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4082                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4083
4084                // Redefine source. This must be done before we create any new
4085                // subsources so that it has the right ingestion.
4086                let mut ops = vec![catalog::Op::UpdateItem {
4087                    id: item_id,
4088                    // Look this up again so we don't have to hold an immutable reference to the
4089                    // entry for so long.
4090                    name: self.catalog.get_entry(&item_id).name().clone(),
4091                    to_item: CatalogItem::Source(source),
4092                }];
4093
4094                let CreateSourceInner {
4095                    ops: new_ops,
4096                    sources: _,
4097                    if_not_exists_ids,
4098                } = self.create_source_inner(session, subsources).await?;
4099
4100                ops.extend(new_ops.into_iter());
4101
4102                assert!(
4103                    if_not_exists_ids.is_empty(),
4104                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4105                );
4106
4107                self.catalog_transact(Some(session), ops).await?;
4108            }
4109            plan::AlterSourceAction::RefreshReferences { references } => {
4110                self.catalog_transact(
4111                    Some(session),
4112                    vec![catalog::Op::UpdateSourceReferences {
4113                        source_id: item_id,
4114                        references: references.into(),
4115                    }],
4116                )
4117                .await?;
4118            }
4119        }
4120
4121        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4122    }
4123
4124    #[instrument]
4125    pub(super) async fn sequence_alter_system_set(
4126        &mut self,
4127        session: &Session,
4128        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4129    ) -> Result<ExecuteResponse, AdapterError> {
4130        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4131        // We want to ensure that the network policy we're switching too actually exists.
4132        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4133            self.validate_alter_system_network_policy(session, &value)?;
4134        }
4135
4136        let op = match value {
4137            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4138                name: name.clone(),
4139                value: OwnedVarInput::SqlSet(values),
4140            },
4141            plan::VariableValue::Default => {
4142                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4143            }
4144        };
4145        self.catalog_transact(Some(session), vec![op]).await?;
4146
4147        session.add_notice(AdapterNotice::VarDefaultUpdated {
4148            role: None,
4149            var_name: Some(name),
4150        });
4151        Ok(ExecuteResponse::AlteredSystemConfiguration)
4152    }
4153
4154    #[instrument]
4155    pub(super) async fn sequence_alter_system_reset(
4156        &mut self,
4157        session: &Session,
4158        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4159    ) -> Result<ExecuteResponse, AdapterError> {
4160        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4161        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4162        self.catalog_transact(Some(session), vec![op]).await?;
4163        session.add_notice(AdapterNotice::VarDefaultUpdated {
4164            role: None,
4165            var_name: Some(name),
4166        });
4167        Ok(ExecuteResponse::AlteredSystemConfiguration)
4168    }
4169
4170    #[instrument]
4171    pub(super) async fn sequence_alter_system_reset_all(
4172        &mut self,
4173        session: &Session,
4174        _: plan::AlterSystemResetAllPlan,
4175    ) -> Result<ExecuteResponse, AdapterError> {
4176        self.is_user_allowed_to_alter_system(session, None)?;
4177        let op = catalog::Op::ResetAllSystemConfiguration;
4178        self.catalog_transact(Some(session), vec![op]).await?;
4179        session.add_notice(AdapterNotice::VarDefaultUpdated {
4180            role: None,
4181            var_name: None,
4182        });
4183        Ok(ExecuteResponse::AlteredSystemConfiguration)
4184    }
4185
4186    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4187    fn is_user_allowed_to_alter_system(
4188        &self,
4189        session: &Session,
4190        var_name: Option<&str>,
4191    ) -> Result<(), AdapterError> {
4192        match (session.user().kind(), var_name) {
4193            // Only internal superusers can reset all system variables.
4194            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4195            // Whether or not a variable can be modified depends if we're an internal superuser.
4196            (UserKind::Superuser, Some(name))
4197                if session.user().is_internal()
4198                    || self.catalog().system_config().user_modifiable(name) =>
4199            {
4200                // In lieu of plumbing the user to all system config functions, just check that
4201                // the var is visible.
4202                let var = self.catalog().system_config().get(name)?;
4203                var.visible(session.user(), self.catalog().system_config())?;
4204                Ok(())
4205            }
4206            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4207            // session variables.
4208            (UserKind::Regular, Some(name))
4209                if self.catalog().system_config().user_modifiable(name) =>
4210            {
4211                Err(AdapterError::Unauthorized(
4212                    rbac::UnauthorizedError::Superuser {
4213                        action: format!("toggle the '{name}' system configuration parameter"),
4214                    },
4215                ))
4216            }
4217            _ => Err(AdapterError::Unauthorized(
4218                rbac::UnauthorizedError::MzSystem {
4219                    action: "alter system".into(),
4220                },
4221            )),
4222        }
4223    }
4224
4225    fn validate_alter_system_network_policy(
4226        &self,
4227        session: &Session,
4228        policy_value: &plan::VariableValue,
4229    ) -> Result<(), AdapterError> {
4230        let policy_name = match &policy_value {
4231            // Make sure the compiled in default still exists.
4232            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4233            plan::VariableValue::Values(values) if values.len() == 1 => {
4234                values.iter().next().cloned()
4235            }
4236            plan::VariableValue::Values(values) => {
4237                tracing::warn!(?values, "can't set multiple network policies at once");
4238                None
4239            }
4240        };
4241        let maybe_network_policy = policy_name
4242            .as_ref()
4243            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4244        let Some(network_policy) = maybe_network_policy else {
4245            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4246                VarError::InvalidParameterValue {
4247                    name: NETWORK_POLICY.name(),
4248                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4249                    reason: "no network policy with such name exists".to_string(),
4250                },
4251            )));
4252        };
4253        self.validate_alter_network_policy(session, &network_policy.rules)
4254    }
4255
4256    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4257    ///
4258    /// This helps prevent users from modifying network policies in a way that would lock out their
4259    /// current connection.
4260    fn validate_alter_network_policy(
4261        &self,
4262        session: &Session,
4263        policy_rules: &Vec<NetworkPolicyRule>,
4264    ) -> Result<(), AdapterError> {
4265        // If the user is not an internal user attempt to protect them from
4266        // blocking themselves.
4267        if session.user().is_internal() {
4268            return Ok(());
4269        }
4270        if let Some(ip) = session.meta().client_ip() {
4271            validate_ip_with_policy_rules(ip, policy_rules)
4272                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4273        } else {
4274            // Sessions without IPs are only temporarily constructed for default values
4275            // they should not be permitted here.
4276            return Err(AdapterError::NetworkPolicyDenied(
4277                NetworkPolicyError::MissingIp,
4278            ));
4279        }
4280        Ok(())
4281    }
4282
4283    // Returns the name of the portal to execute.
4284    #[instrument]
4285    pub(super) fn sequence_execute(
4286        &self,
4287        session: &mut Session,
4288        plan: plan::ExecutePlan,
4289    ) -> Result<String, AdapterError> {
4290        // Verify the stmt is still valid.
4291        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4292        let ps = session
4293            .get_prepared_statement_unverified(&plan.name)
4294            .expect("known to exist");
4295        let stmt = ps.stmt().cloned();
4296        let desc = ps.desc().clone();
4297        let state_revision = ps.state_revision;
4298        let logging = Arc::clone(ps.logging());
4299        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4300    }
4301
4302    #[instrument]
4303    pub(super) async fn sequence_grant_privileges(
4304        &mut self,
4305        session: &Session,
4306        plan::GrantPrivilegesPlan {
4307            update_privileges,
4308            grantees,
4309        }: plan::GrantPrivilegesPlan,
4310    ) -> Result<ExecuteResponse, AdapterError> {
4311        self.sequence_update_privileges(
4312            session,
4313            update_privileges,
4314            grantees,
4315            UpdatePrivilegeVariant::Grant,
4316        )
4317        .await
4318    }
4319
4320    #[instrument]
4321    pub(super) async fn sequence_revoke_privileges(
4322        &mut self,
4323        session: &Session,
4324        plan::RevokePrivilegesPlan {
4325            update_privileges,
4326            revokees,
4327        }: plan::RevokePrivilegesPlan,
4328    ) -> Result<ExecuteResponse, AdapterError> {
4329        self.sequence_update_privileges(
4330            session,
4331            update_privileges,
4332            revokees,
4333            UpdatePrivilegeVariant::Revoke,
4334        )
4335        .await
4336    }
4337
4338    #[instrument]
4339    async fn sequence_update_privileges(
4340        &mut self,
4341        session: &Session,
4342        update_privileges: Vec<UpdatePrivilege>,
4343        grantees: Vec<RoleId>,
4344        variant: UpdatePrivilegeVariant,
4345    ) -> Result<ExecuteResponse, AdapterError> {
4346        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4347        let mut warnings = Vec::new();
4348        let catalog = self.catalog().for_session(session);
4349
4350        for UpdatePrivilege {
4351            acl_mode,
4352            target_id,
4353            grantor,
4354        } in update_privileges
4355        {
4356            let actual_object_type = catalog.get_system_object_type(&target_id);
4357            // For all relations we allow all applicable table privileges, but send a warning if the
4358            // privilege isn't actually applicable to the object type.
4359            if actual_object_type.is_relation() {
4360                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4361                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4362                if !non_applicable_privileges.is_empty() {
4363                    let object_description =
4364                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4365                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4366                        non_applicable_privileges,
4367                        object_description,
4368                    })
4369                }
4370            }
4371
4372            if let SystemObjectId::Object(object_id) = &target_id {
4373                self.catalog()
4374                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4375            }
4376
4377            let privileges = self
4378                .catalog()
4379                .get_privileges(&target_id, session.conn_id())
4380                // Should be unreachable since the parser will refuse to parse grant/revoke
4381                // statements on objects without privileges.
4382                .ok_or(AdapterError::Unsupported(
4383                    "GRANTs/REVOKEs on an object type with no privileges",
4384                ))?;
4385
4386            for grantee in &grantees {
4387                self.catalog().ensure_not_system_role(grantee)?;
4388                self.catalog().ensure_not_predefined_role(grantee)?;
4389                let existing_privilege = privileges
4390                    .get_acl_item(grantee, &grantor)
4391                    .map(Cow::Borrowed)
4392                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4393
4394                match variant {
4395                    UpdatePrivilegeVariant::Grant
4396                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4397                    {
4398                        ops.push(catalog::Op::UpdatePrivilege {
4399                            target_id: target_id.clone(),
4400                            privilege: MzAclItem {
4401                                grantee: *grantee,
4402                                grantor,
4403                                acl_mode,
4404                            },
4405                            variant,
4406                        });
4407                    }
4408                    UpdatePrivilegeVariant::Revoke
4409                        if !existing_privilege
4410                            .acl_mode
4411                            .intersection(acl_mode)
4412                            .is_empty() =>
4413                    {
4414                        ops.push(catalog::Op::UpdatePrivilege {
4415                            target_id: target_id.clone(),
4416                            privilege: MzAclItem {
4417                                grantee: *grantee,
4418                                grantor,
4419                                acl_mode,
4420                            },
4421                            variant,
4422                        });
4423                    }
4424                    // no-op
4425                    _ => {}
4426                }
4427            }
4428        }
4429
4430        if ops.is_empty() {
4431            session.add_notices(warnings);
4432            return Ok(variant.into());
4433        }
4434
4435        let res = self
4436            .catalog_transact(Some(session), ops)
4437            .await
4438            .map(|_| match variant {
4439                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4440                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4441            });
4442        if res.is_ok() {
4443            session.add_notices(warnings);
4444        }
4445        res
4446    }
4447
4448    #[instrument]
4449    pub(super) async fn sequence_alter_default_privileges(
4450        &mut self,
4451        session: &Session,
4452        plan::AlterDefaultPrivilegesPlan {
4453            privilege_objects,
4454            privilege_acl_items,
4455            is_grant,
4456        }: plan::AlterDefaultPrivilegesPlan,
4457    ) -> Result<ExecuteResponse, AdapterError> {
4458        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4459        let variant = if is_grant {
4460            UpdatePrivilegeVariant::Grant
4461        } else {
4462            UpdatePrivilegeVariant::Revoke
4463        };
4464        for privilege_object in &privilege_objects {
4465            self.catalog()
4466                .ensure_not_system_role(&privilege_object.role_id)?;
4467            self.catalog()
4468                .ensure_not_predefined_role(&privilege_object.role_id)?;
4469            if let Some(database_id) = privilege_object.database_id {
4470                self.catalog()
4471                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4472            }
4473            if let Some(schema_id) = privilege_object.schema_id {
4474                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4475                let schema_spec: SchemaSpecifier = schema_id.into();
4476
4477                self.catalog().ensure_not_reserved_object(
4478                    &(database_spec, schema_spec).into(),
4479                    session.conn_id(),
4480                )?;
4481            }
4482            for privilege_acl_item in &privilege_acl_items {
4483                self.catalog()
4484                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4485                self.catalog()
4486                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4487                ops.push(catalog::Op::UpdateDefaultPrivilege {
4488                    privilege_object: privilege_object.clone(),
4489                    privilege_acl_item: privilege_acl_item.clone(),
4490                    variant,
4491                })
4492            }
4493        }
4494
4495        self.catalog_transact(Some(session), ops).await?;
4496        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4497    }
4498
4499    #[instrument]
4500    pub(super) async fn sequence_grant_role(
4501        &mut self,
4502        session: &Session,
4503        plan::GrantRolePlan {
4504            role_ids,
4505            member_ids,
4506            grantor_id,
4507        }: plan::GrantRolePlan,
4508    ) -> Result<ExecuteResponse, AdapterError> {
4509        let catalog = self.catalog();
4510        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4511        for role_id in role_ids {
4512            for member_id in &member_ids {
4513                let member_membership: BTreeSet<_> =
4514                    catalog.get_role(member_id).membership().keys().collect();
4515                if member_membership.contains(&role_id) {
4516                    let role_name = catalog.get_role(&role_id).name().to_string();
4517                    let member_name = catalog.get_role(member_id).name().to_string();
4518                    // We need this check so we don't accidentally return a success on a reserved role.
4519                    catalog.ensure_not_reserved_role(member_id)?;
4520                    catalog.ensure_grantable_role(&role_id)?;
4521                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4522                        role_name,
4523                        member_name,
4524                    });
4525                } else {
4526                    ops.push(catalog::Op::GrantRole {
4527                        role_id,
4528                        member_id: *member_id,
4529                        grantor_id,
4530                    });
4531                }
4532            }
4533        }
4534
4535        if ops.is_empty() {
4536            return Ok(ExecuteResponse::GrantedRole);
4537        }
4538
4539        self.catalog_transact(Some(session), ops)
4540            .await
4541            .map(|_| ExecuteResponse::GrantedRole)
4542    }
4543
4544    #[instrument]
4545    pub(super) async fn sequence_revoke_role(
4546        &mut self,
4547        session: &Session,
4548        plan::RevokeRolePlan {
4549            role_ids,
4550            member_ids,
4551            grantor_id,
4552        }: plan::RevokeRolePlan,
4553    ) -> Result<ExecuteResponse, AdapterError> {
4554        let catalog = self.catalog();
4555        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4556        for role_id in role_ids {
4557            for member_id in &member_ids {
4558                let member_membership: BTreeSet<_> =
4559                    catalog.get_role(member_id).membership().keys().collect();
4560                if !member_membership.contains(&role_id) {
4561                    let role_name = catalog.get_role(&role_id).name().to_string();
4562                    let member_name = catalog.get_role(member_id).name().to_string();
4563                    // We need this check so we don't accidentally return a success on a reserved role.
4564                    catalog.ensure_not_reserved_role(member_id)?;
4565                    catalog.ensure_grantable_role(&role_id)?;
4566                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4567                        role_name,
4568                        member_name,
4569                    });
4570                } else {
4571                    ops.push(catalog::Op::RevokeRole {
4572                        role_id,
4573                        member_id: *member_id,
4574                        grantor_id,
4575                    });
4576                }
4577            }
4578        }
4579
4580        if ops.is_empty() {
4581            return Ok(ExecuteResponse::RevokedRole);
4582        }
4583
4584        self.catalog_transact(Some(session), ops)
4585            .await
4586            .map(|_| ExecuteResponse::RevokedRole)
4587    }
4588
4589    #[instrument]
4590    pub(super) async fn sequence_alter_owner(
4591        &mut self,
4592        session: &Session,
4593        plan::AlterOwnerPlan {
4594            id,
4595            object_type,
4596            new_owner,
4597        }: plan::AlterOwnerPlan,
4598    ) -> Result<ExecuteResponse, AdapterError> {
4599        let mut ops = vec![catalog::Op::UpdateOwner {
4600            id: id.clone(),
4601            new_owner,
4602        }];
4603
4604        match &id {
4605            ObjectId::Item(global_id) => {
4606                let entry = self.catalog().get_entry(global_id);
4607
4608                // Cannot directly change the owner of an index.
4609                if entry.is_index() {
4610                    let name = self
4611                        .catalog()
4612                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4613                        .to_string();
4614                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4615                    return Ok(ExecuteResponse::AlteredObject(object_type));
4616                }
4617
4618                // Alter owner cascades down to dependent indexes.
4619                let dependent_index_ops = entry
4620                    .used_by()
4621                    .into_iter()
4622                    .filter(|id| self.catalog().get_entry(id).is_index())
4623                    .map(|id| catalog::Op::UpdateOwner {
4624                        id: ObjectId::Item(*id),
4625                        new_owner,
4626                    });
4627                ops.extend(dependent_index_ops);
4628
4629                // Alter owner cascades down to progress collections.
4630                let dependent_subsources =
4631                    entry
4632                        .progress_id()
4633                        .into_iter()
4634                        .map(|item_id| catalog::Op::UpdateOwner {
4635                            id: ObjectId::Item(item_id),
4636                            new_owner,
4637                        });
4638                ops.extend(dependent_subsources);
4639            }
4640            ObjectId::Cluster(cluster_id) => {
4641                let cluster = self.catalog().get_cluster(*cluster_id);
4642                // Alter owner cascades down to cluster replicas.
4643                let managed_cluster_replica_ops =
4644                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4645                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4646                        new_owner,
4647                    });
4648                ops.extend(managed_cluster_replica_ops);
4649            }
4650            _ => {}
4651        }
4652
4653        self.catalog_transact(Some(session), ops)
4654            .await
4655            .map(|_| ExecuteResponse::AlteredObject(object_type))
4656    }
4657
4658    #[instrument]
4659    pub(super) async fn sequence_reassign_owned(
4660        &mut self,
4661        session: &Session,
4662        plan::ReassignOwnedPlan {
4663            old_roles,
4664            new_role,
4665            reassign_ids,
4666        }: plan::ReassignOwnedPlan,
4667    ) -> Result<ExecuteResponse, AdapterError> {
4668        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4669            self.catalog().ensure_not_reserved_role(role_id)?;
4670        }
4671
4672        let ops = reassign_ids
4673            .into_iter()
4674            .map(|id| catalog::Op::UpdateOwner {
4675                id,
4676                new_owner: new_role,
4677            })
4678            .collect();
4679
4680        self.catalog_transact(Some(session), ops)
4681            .await
4682            .map(|_| ExecuteResponse::ReassignOwned)
4683    }
4684
4685    #[instrument]
4686    pub(crate) async fn handle_deferred_statement(&mut self) {
4687        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4688        // was processed, removing the single element from deferred_statements, so it is expected
4689        // that this is sometimes empty.
4690        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4691            return;
4692        };
4693        match ps {
4694            crate::coord::PlanStatement::Statement { stmt, params } => {
4695                self.handle_execute_inner(stmt, params, ctx).await;
4696            }
4697            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4698                self.sequence_plan(ctx, plan, resolved_ids).await;
4699            }
4700        }
4701    }
4702
4703    #[instrument]
4704    // TODO(parkmycar): Remove this once we have an actual implementation.
4705    #[allow(clippy::unused_async)]
4706    pub(super) async fn sequence_alter_table(
4707        &mut self,
4708        ctx: &mut ExecuteContext,
4709        plan: plan::AlterTablePlan,
4710    ) -> Result<ExecuteResponse, AdapterError> {
4711        let plan::AlterTablePlan {
4712            relation_id,
4713            column_name,
4714            column_type,
4715            raw_sql_type,
4716        } = plan;
4717
4718        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4719        let (_, new_global_id) = self.allocate_user_id().await?;
4720        let ops = vec![catalog::Op::AlterAddColumn {
4721            id: relation_id,
4722            new_global_id,
4723            name: column_name,
4724            typ: column_type,
4725            sql: raw_sql_type,
4726        }];
4727
4728        self.catalog_transact_with_context(None, Some(ctx), ops)
4729            .await?;
4730
4731        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4732    }
4733
4734    /// Prepares to apply a replacement materialized view.
4735    #[instrument]
4736    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4737        &mut self,
4738        ctx: ExecuteContext,
4739        plan: AlterMaterializedViewApplyReplacementPlan,
4740    ) {
4741        // To ensure there is no time gap in the output, we can only apply a replacement if the
4742        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4743        // might not be the case initially, so we have to wait. To this end, we install a watch set
4744        // waiting for the target MV's write frontier to advance sufficiently.
4745        //
4746        // Note that the replacement's dataflow is not performing any writes, so it can only be
4747        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4748        // replacement's write frontier is always <= the target's.
4749
4750        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4751
4752        let plan_validity = PlanValidity::new(
4753            self.catalog().transient_revision(),
4754            BTreeSet::from_iter([id, replacement_id]),
4755            None,
4756            None,
4757            ctx.session().role_metadata().clone(),
4758        );
4759
4760        let target = self.catalog.get_entry(&id);
4761        let target_gid = target.latest_global_id();
4762
4763        let replacement = self.catalog.get_entry(&replacement_id);
4764        let replacement_gid = replacement.latest_global_id();
4765
4766        let target_upper = self
4767            .controller
4768            .storage_collections
4769            .collection_frontiers(target_gid)
4770            .expect("target MV exists")
4771            .write_frontier;
4772        let replacement_upper = self
4773            .controller
4774            .compute
4775            .collection_frontiers(replacement_gid, replacement.cluster_id())
4776            .expect("replacement MV exists")
4777            .write_frontier;
4778
4779        info!(
4780            %id, %replacement_id, ?target_upper, ?replacement_upper,
4781            "preparing materialized view replacement application",
4782        );
4783
4784        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4785            // A replacement's write frontier can only become empty if the target's write frontier
4786            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4787            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4788            // the user by returning an error, rather than applying the useless replacement.
4789            //
4790            // Note that we can't assert on `target_upper` being empty here, because the reporting
4791            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4792            // from persist, which we cannot do without incurring I/O.
4793            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4794                name: target.name().item.clone(),
4795            }));
4796            return;
4797        };
4798
4799        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4800        // specified timestamp. Since we only need to wait until the target frontier is >= the
4801        // replacement's frontier, we can step back the timestamp.
4802        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4803
4804        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4805        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4806        // command.
4807        self.install_storage_watch_set(
4808            ctx.session().conn_id().clone(),
4809            BTreeSet::from_iter([target_gid]),
4810            replacement_upper_ts,
4811            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4812                ctx: Some(ctx),
4813                otel_ctx: OpenTelemetryContext::obtain(),
4814                plan,
4815                plan_validity,
4816            }),
4817        )
4818        .expect("target collection exists");
4819    }
4820
4821    /// Finishes applying a replacement materialized view after the frontier wait completed.
4822    #[instrument]
4823    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4824        &mut self,
4825        mut ctx: AlterMaterializedViewReadyContext,
4826    ) {
4827        ctx.otel_ctx.attach_as_parent();
4828
4829        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4830
4831        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4832        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4833        // world has arbitrarily changed since we performed planning, and we must re-assert
4834        // that it still matches our requirements.
4835        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4836            ctx.retire(Err(err));
4837            return;
4838        }
4839
4840        info!(
4841            %id, %replacement_id,
4842            "finishing materialized view replacement application",
4843        );
4844
4845        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4846        match self
4847            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4848            .await
4849        {
4850            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4851                ObjectType::MaterializedView,
4852            ))),
4853            Err(err) => ctx.retire(Err(err)),
4854        }
4855    }
4856
4857    pub(super) async fn statistics_oracle(
4858        &self,
4859        session: &Session,
4860        source_ids: &BTreeSet<GlobalId>,
4861        query_as_of: &Antichain<Timestamp>,
4862        is_oneshot: bool,
4863    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4864        super::statistics_oracle(
4865            session,
4866            source_ids,
4867            query_as_of,
4868            is_oneshot,
4869            self.catalog().system_config(),
4870            self.controller.storage_collections.as_ref(),
4871        )
4872        .await
4873    }
4874}
4875
4876impl Coordinator {
4877    /// Process the metainfo from a newly created non-transient dataflow.
4878    async fn process_dataflow_metainfo(
4879        &mut self,
4880        df_meta: DataflowMetainfo,
4881        export_id: GlobalId,
4882        ctx: Option<&mut ExecuteContext>,
4883        notice_ids: Vec<GlobalId>,
4884    ) -> Option<BuiltinTableAppendNotify> {
4885        // Emit raw notices to the user.
4886        if let Some(ctx) = ctx {
4887            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4888        }
4889
4890        // Create a metainfo with rendered notices.
4891        let df_meta = self
4892            .catalog()
4893            .render_notices(df_meta, notice_ids, Some(export_id));
4894
4895        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4896        // in-memory state.
4897        if self.catalog().state().system_config().enable_mz_notices()
4898            && !df_meta.optimizer_notices.is_empty()
4899        {
4900            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4901            self.catalog().state().pack_optimizer_notices(
4902                &mut builtin_table_updates,
4903                df_meta.optimizer_notices.iter(),
4904                Diff::ONE,
4905            );
4906
4907            // Save the metainfo.
4908            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4909
4910            Some(
4911                self.builtin_table_update()
4912                    .execute(builtin_table_updates)
4913                    .await
4914                    .0,
4915            )
4916        } else {
4917            // Save the metainfo.
4918            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4919
4920            None
4921        }
4922    }
4923}