Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, Eval, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44    RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49    SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
55    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59    SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63    StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{
101    BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn, UserWriteResponder,
102};
103use crate::coord::read_then_write::validate_read_then_write_dependencies;
104use crate::coord::sequencer::emit_optimizer_notices;
105use crate::coord::{
106    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
107    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
108    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
109    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
110    WatchSetResponse, validate_ip_with_policy_rules,
111};
112use crate::error::AdapterError;
113use crate::notice::{AdapterNotice, DroppedInUseIndex};
114use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
115use crate::optimize::{self, Optimize};
116use crate::session::{
117    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
118    WriteLocks, WriteOp,
119};
120use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
121use crate::{PeekResponseUnary, ReadHolds};
122
123/// A future that resolves to a real-time recency timestamp.
124type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
125
126mod cluster;
127mod copy_from;
128mod create_index;
129mod create_materialized_view;
130mod create_view;
131mod explain_timestamp;
132mod peek;
133mod secret;
134mod subscribe;
135
136/// Attempts to evaluate an expression. If an error is returned then the error is sent
137/// to the client and the function is exited.
138macro_rules! return_if_err {
139    ($expr:expr, $ctx:expr) => {
140        match $expr {
141            Ok(v) => v,
142            Err(e) => return $ctx.retire(Err(e.into())),
143        }
144    };
145}
146
147pub(super) use return_if_err;
148
149struct DropOps {
150    ops: Vec<catalog::Op>,
151    dropped_active_db: bool,
152    dropped_active_cluster: bool,
153    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
154}
155
156// A bundle of values returned from create_source_inner
157struct CreateSourceInner {
158    ops: Vec<catalog::Op>,
159    sources: Vec<(CatalogItemId, Source)>,
160    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
161}
162
163impl Coordinator {
164    /// Sequences a [Staged] plan.
165    ///
166    /// This is designed for plans that execute both on and off the coordinator
167    /// thread. Stages can either produce another stage to execute or a final
168    /// response. Maintains the connection-scoped cancel watch in
169    /// `connection_cancel_watches` while a stage is cancelable.
170    pub(crate) async fn sequence_staged<S>(
171        &mut self,
172        mut ctx: S::Ctx,
173        parent_span: Span,
174        mut stage: S,
175    ) where
176        S: Staged + 'static,
177        S::Ctx: Send + 'static,
178    {
179        return_if_err!(stage.validity().check(self.catalog()), ctx);
180        loop {
181            let mut cancel_enabled = stage.cancel_enabled();
182            if let Some(session) = ctx.session() {
183                if cancel_enabled {
184                    // Channel to await cancellation. Insert a new channel, but check if the previous one
185                    // was already canceled.
186                    if let Some((_prev_tx, prev_rx)) = self
187                        .connection_cancel_watches
188                        .insert(session.conn_id().clone(), watch::channel(false))
189                    {
190                        let was_canceled = *prev_rx.borrow();
191                        if was_canceled {
192                            ctx.retire(Err(AdapterError::Canceled));
193                            return;
194                        }
195                    }
196                } else {
197                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
198                    // when cancel_enabled may have been true on an earlier stage.
199                    self.connection_cancel_watches.remove(session.conn_id());
200                }
201            } else {
202                cancel_enabled = false
203            };
204            let next = stage
205                .stage(self, &mut ctx)
206                .instrument(parent_span.clone())
207                .await;
208            let res = return_if_err!(next, ctx);
209            stage = match res {
210                StageResult::Handle(handle) => {
211                    let internal_cmd_tx = self.internal_cmd_tx.clone();
212                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
213                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
214                    });
215                    return;
216                }
217                StageResult::HandleRetire(handle) => {
218                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
219                        ctx.retire(Ok(resp));
220                    });
221                    return;
222                }
223                StageResult::Response(resp) => {
224                    ctx.retire(Ok(resp));
225                    return;
226                }
227                StageResult::Immediate(stage) => *stage,
228            }
229        }
230    }
231
232    /// Waits for either the spawned stage work to complete or cancellation to
233    /// be signaled through the connection-scoped cancel watch.
234    fn handle_spawn<C, T, F>(
235        &self,
236        ctx: C,
237        handle: JoinHandle<Result<T, AdapterError>>,
238        cancel_enabled: bool,
239        f: F,
240    ) where
241        C: StagedContext + Send + 'static,
242        T: Send + 'static,
243        F: FnOnce(C, T) + Send + 'static,
244    {
245        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
246            .session()
247            .and_then(|session| self.connection_cancel_watches.get(session.conn_id()))
248        {
249            let mut rx = rx.clone();
250            Box::pin(async move {
251                // Wait for true or dropped sender.
252                let _ = rx.wait_for(|v| *v).await;
253                ()
254            })
255        } else {
256            Box::pin(future::pending())
257        };
258        spawn(|| "sequence_staged", async move {
259            tokio::select! {
260                res = handle => {
261                    let next = return_if_err!(res, ctx);
262                    f(ctx, next);
263                }
264                _ = rx, if cancel_enabled => {
265                    ctx.retire(Err(AdapterError::Canceled));
266                }
267            }
268        });
269    }
270
271    async fn create_source_inner(
272        &self,
273        session: &Session,
274        plans: Vec<plan::CreateSourcePlanBundle>,
275    ) -> Result<CreateSourceInner, AdapterError> {
276        let mut ops = vec![];
277        let mut sources = vec![];
278
279        let if_not_exists_ids = plans
280            .iter()
281            .filter_map(
282                |plan::CreateSourcePlanBundle {
283                     item_id,
284                     global_id: _,
285                     plan,
286                     resolved_ids: _,
287                     available_source_references: _,
288                 }| {
289                    if plan.if_not_exists {
290                        Some((*item_id, plan.name.clone()))
291                    } else {
292                        None
293                    }
294                },
295            )
296            .collect::<BTreeMap<_, _>>();
297
298        for plan::CreateSourcePlanBundle {
299            item_id,
300            global_id,
301            mut plan,
302            resolved_ids,
303            available_source_references,
304        } in plans
305        {
306            let name = plan.name.clone();
307
308            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
309            if let mz_sql::plan::DataSourceDesc::Webhook {
310                validate_using: Some(validate),
311                ..
312            } = &mut plan.source.data_source
313            {
314                if let Err(reason) = validate.reduce_expression().await {
315                    self.metrics
316                        .webhook_validation_reduce_failures
317                        .with_label_values(&[reason])
318                        .inc();
319                    return Err(AdapterError::Internal(format!(
320                        "failed to reduce check expression, {reason}"
321                    )));
322                }
323            }
324
325            // If this source contained a set of available source references, update the
326            // source references catalog table.
327            let mut reference_ops = vec![];
328            if let Some(references) = &available_source_references {
329                reference_ops.push(catalog::Op::UpdateSourceReferences {
330                    source_id: item_id,
331                    references: references.clone().into(),
332                });
333            }
334
335            let source = Source::new(plan, global_id, resolved_ids, None, false);
336            ops.push(catalog::Op::CreateItem {
337                id: item_id,
338                name,
339                item: CatalogItem::Source(source.clone()),
340                owner_id: *session.current_role_id(),
341            });
342            sources.push((item_id, source));
343            // These operations must be executed after the source is added to the catalog.
344            ops.extend(reference_ops);
345        }
346
347        Ok(CreateSourceInner {
348            ops,
349            sources,
350            if_not_exists_ids,
351        })
352    }
353
354    /// Subsources are planned differently from other statements because they
355    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
356    /// Because of this, we have usually "missed" the opportunity to plan them
357    /// through the normal statement execution life cycle (the exception being
358    /// during bootstrapping).
359    ///
360    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
361    pub(crate) fn plan_subsource(
362        &self,
363        session: &Session,
364        params: &mz_sql::plan::Params,
365        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
366        item_id: CatalogItemId,
367        global_id: GlobalId,
368    ) -> Result<CreateSourcePlanBundle, AdapterError> {
369        let catalog = self.catalog().for_session(session);
370        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
371
372        let (plan, _sql_impl_ids) = self.plan_statement(
373            session,
374            Statement::CreateSubsource(subsource_stmt),
375            params,
376            &resolved_ids,
377        )?;
378        let plan = match plan {
379            Plan::CreateSource(plan) => plan,
380            _ => unreachable!(),
381        };
382        Ok(CreateSourcePlanBundle {
383            item_id,
384            global_id,
385            plan,
386            resolved_ids,
387            available_source_references: None,
388        })
389    }
390
391    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
392    pub(crate) async fn plan_purified_alter_source_add_subsource(
393        &mut self,
394        session: &Session,
395        params: Params,
396        source_name: ResolvedItemName,
397        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
398        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
399    ) -> Result<(Plan, ResolvedIds), AdapterError> {
400        let mut subsource_plans = Vec::with_capacity(subsources.len());
401
402        // Generate subsource statements
403        let conn_catalog = self.catalog().for_system_session();
404        let pcx = plan::PlanContext::zero();
405        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
406
407        let entry = self.catalog().get_entry(source_name.item_id());
408        let source = entry.source().ok_or_else(|| {
409            AdapterError::internal(
410                "plan alter source",
411                format!("expected Source found {entry:?}"),
412            )
413        })?;
414
415        let item_id = entry.id();
416        let ingestion_id = source.global_id();
417        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
418
419        let ids = self
420            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
421            .await?;
422        for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
423            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
424            subsource_plans.push(s);
425        }
426
427        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
428            subsources: subsource_plans,
429            options,
430        };
431
432        Ok((
433            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
434                item_id,
435                ingestion_id,
436                action,
437            }),
438            ResolvedIds::empty(),
439        ))
440    }
441
442    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
443    pub(crate) fn plan_purified_alter_source_refresh_references(
444        &self,
445        _session: &Session,
446        _params: Params,
447        source_name: ResolvedItemName,
448        available_source_references: plan::SourceReferences,
449    ) -> Result<(Plan, ResolvedIds), AdapterError> {
450        let entry = self.catalog().get_entry(source_name.item_id());
451        let source = entry.source().ok_or_else(|| {
452            AdapterError::internal(
453                "plan alter source",
454                format!("expected Source found {entry:?}"),
455            )
456        })?;
457        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
458            references: available_source_references,
459        };
460
461        Ok((
462            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
463                item_id: entry.id(),
464                ingestion_id: source.global_id(),
465                action,
466            }),
467            ResolvedIds::empty(),
468        ))
469    }
470
471    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
472    /// the primary source, and any ingestion export subsources (e.g. PG
473    /// tables).
474    pub(crate) async fn plan_purified_create_source(
475        &mut self,
476        ctx: &ExecuteContext,
477        params: Params,
478        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
479        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
480        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
481        available_source_references: plan::SourceReferences,
482    ) -> Result<(Plan, ResolvedIds), AdapterError> {
483        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
484
485        // 1. First plan the progress subsource, if any.
486        if let Some(progress_stmt) = progress_stmt {
487            // The primary source depends on this subsource because the primary
488            // source needs to know its shard ID, and the easiest way of
489            // guaranteeing that the shard ID is discoverable is to create this
490            // collection first.
491            assert_none!(progress_stmt.of_source);
492            let (item_id, global_id) = self.allocate_user_id().await?;
493            let progress_plan =
494                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
495            let progress_full_name = self
496                .catalog()
497                .resolve_full_name(&progress_plan.plan.name, None);
498            let progress_subsource = ResolvedItemName::Item {
499                id: progress_plan.item_id,
500                qualifiers: progress_plan.plan.name.qualifiers.clone(),
501                full_name: progress_full_name,
502                print_id: true,
503                version: RelationVersionSelector::Latest,
504            };
505
506            create_source_plans.push(progress_plan);
507
508            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
509        }
510
511        let catalog = self.catalog().for_session(ctx.session());
512        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
513
514        let propagated_with_options: Vec<_> = source_stmt
515            .with_options
516            .iter()
517            .filter_map(|opt| match opt.name {
518                CreateSourceOptionName::TimestampInterval => None,
519                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
520                    name: CreateSubsourceOptionName::RetainHistory,
521                    value: opt.value.clone(),
522                }),
523            })
524            .collect();
525
526        // 2. Then plan the main source.
527        let source_plan = match self.plan_statement(
528            ctx.session(),
529            Statement::CreateSource(source_stmt),
530            &params,
531            &resolved_ids,
532        )? {
533            (Plan::CreateSource(plan), _sql_impl_ids) => plan,
534            (p, _) => unreachable!("s must be CreateSourcePlan but got {:?}", p),
535        };
536
537        let (item_id, global_id) = self.allocate_user_id().await?;
538
539        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
540        let of_source = ResolvedItemName::Item {
541            id: item_id,
542            qualifiers: source_plan.name.qualifiers.clone(),
543            full_name: source_full_name,
544            print_id: true,
545            version: RelationVersionSelector::Latest,
546        };
547
548        // Generate subsource statements
549        let conn_catalog = self.catalog().for_system_session();
550        let pcx = plan::PlanContext::zero();
551        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
552
553        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
554
555        for subsource_stmt in subsource_stmts.iter_mut() {
556            subsource_stmt
557                .with_options
558                .extend(propagated_with_options.iter().cloned())
559        }
560
561        create_source_plans.push(CreateSourcePlanBundle {
562            item_id,
563            global_id,
564            plan: source_plan,
565            resolved_ids: resolved_ids.clone(),
566            available_source_references: Some(available_source_references),
567        });
568
569        // 3. Finally, plan all the subsources
570        let ids = self
571            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
572            .await?;
573        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
574            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
575            create_source_plans.push(plan);
576        }
577
578        Ok((
579            Plan::CreateSources(create_source_plans),
580            ResolvedIds::empty(),
581        ))
582    }
583
584    #[instrument]
585    pub(super) async fn sequence_create_source(
586        &mut self,
587        ctx: &mut ExecuteContext,
588        plans: Vec<plan::CreateSourcePlanBundle>,
589    ) -> Result<ExecuteResponse, AdapterError> {
590        let CreateSourceInner {
591            ops,
592            sources,
593            if_not_exists_ids,
594        } = self.create_source_inner(ctx.session(), plans).await?;
595
596        let transact_result = self
597            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
598            .await;
599
600        // Check if any sources are webhook sources and report them as created.
601        for (item_id, source) in &sources {
602            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
603                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
604                    ctx.session()
605                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
606                }
607            }
608        }
609
610        match transact_result {
611            Ok(()) => Ok(ExecuteResponse::CreatedSource),
612            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
613                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
614            })) if if_not_exists_ids.contains_key(&id) => {
615                ctx.session()
616                    .add_notice(AdapterNotice::ObjectAlreadyExists {
617                        name: if_not_exists_ids[&id].item.clone(),
618                        ty: "source",
619                    });
620                Ok(ExecuteResponse::CreatedSource)
621            }
622            Err(err) => Err(err),
623        }
624    }
625
626    #[instrument]
627    pub(super) async fn sequence_create_connection(
628        &mut self,
629        mut ctx: ExecuteContext,
630        plan: plan::CreateConnectionPlan,
631        resolved_ids: ResolvedIds,
632    ) {
633        let (connection_id, connection_gid) = match self.allocate_user_id().await {
634            Ok(item_id) => item_id,
635            Err(err) => return ctx.retire(Err(err)),
636        };
637
638        match &plan.connection.details {
639            ConnectionDetails::Ssh { key_1, key_2, .. } => {
640                let key_1 = match key_1.as_key_pair() {
641                    Some(key_1) => key_1.clone(),
642                    None => {
643                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
644                            "the PUBLIC KEY 1 option cannot be explicitly specified"
645                        ))));
646                    }
647                };
648
649                let key_2 = match key_2.as_key_pair() {
650                    Some(key_2) => key_2.clone(),
651                    None => {
652                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
653                            "the PUBLIC KEY 2 option cannot be explicitly specified"
654                        ))));
655                    }
656                };
657
658                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
659                let secret = key_set.to_bytes();
660                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
661                    return ctx.retire(Err(err.into()));
662                }
663                self.caching_secrets_reader.invalidate(connection_id);
664            }
665            _ => (),
666        };
667
668        if plan.validate {
669            let internal_cmd_tx = self.internal_cmd_tx.clone();
670            let transient_revision = self.catalog().transient_revision();
671            let conn_id = ctx.session().conn_id().clone();
672            let otel_ctx = OpenTelemetryContext::obtain();
673            let role_metadata = ctx.session().role_metadata().clone();
674
675            let connection = plan
676                .connection
677                .details
678                .to_connection()
679                .into_inline_connection(self.catalog().state());
680
681            let current_storage_parameters = self.controller.storage.config().clone();
682            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
683                let result = match std::panic::AssertUnwindSafe(
684                    connection.validate(connection_id, &current_storage_parameters),
685                )
686                .ore_catch_unwind()
687                .await
688                {
689                    Ok(Ok(())) => Ok(plan),
690                    Ok(Err(err)) => Err(err.into()),
691                    Err(_panic) => {
692                        tracing::error!("connection validation panicked");
693                        Err(AdapterError::Internal(
694                            "connection validation panicked".into(),
695                        ))
696                    }
697                };
698
699                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
700                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
701                    CreateConnectionValidationReady {
702                        ctx,
703                        result,
704                        connection_id,
705                        connection_gid,
706                        plan_validity: PlanValidity::new(
707                            transient_revision,
708                            resolved_ids.items().copied().collect(),
709                            None,
710                            None,
711                            role_metadata,
712                        ),
713                        otel_ctx,
714                        resolved_ids: resolved_ids.clone(),
715                    },
716                ));
717                if let Err(e) = result {
718                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
719                }
720            });
721        } else {
722            let result = self
723                .sequence_create_connection_stage_finish(
724                    &mut ctx,
725                    connection_id,
726                    connection_gid,
727                    plan,
728                    resolved_ids,
729                )
730                .await;
731            ctx.retire(result);
732        }
733    }
734
735    #[instrument]
736    pub(crate) async fn sequence_create_connection_stage_finish(
737        &mut self,
738        ctx: &mut ExecuteContext,
739        connection_id: CatalogItemId,
740        connection_gid: GlobalId,
741        plan: plan::CreateConnectionPlan,
742        resolved_ids: ResolvedIds,
743    ) -> Result<ExecuteResponse, AdapterError> {
744        let ops = vec![catalog::Op::CreateItem {
745            id: connection_id,
746            name: plan.name.clone(),
747            item: CatalogItem::Connection(Connection {
748                create_sql: plan.connection.create_sql,
749                global_id: connection_gid,
750                details: plan.connection.details.clone(),
751                resolved_ids,
752            }),
753            owner_id: *ctx.session().current_role_id(),
754        }];
755
756        // VPC endpoint creation for AWS PrivateLink connections is now handled
757        // in apply_catalog_implications.
758        let conn_id = ctx.session().conn_id().clone();
759        let transact_result = self
760            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
761            .await;
762
763        match transact_result {
764            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
765            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
766                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
767            })) if plan.if_not_exists => {
768                // Clean up SSH key material if it was persisted, since the
769                // catalog item was not created.
770                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
771                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
772                        tracing::warn!(
773                            "Dropping SSH secret for existing connection has encountered an error: {}",
774                            e
775                        );
776                    } else {
777                        self.caching_secrets_reader.invalidate(connection_id);
778                    }
779                }
780                ctx.session()
781                    .add_notice(AdapterNotice::ObjectAlreadyExists {
782                        name: plan.name.item,
783                        ty: "connection",
784                    });
785                Ok(ExecuteResponse::CreatedConnection)
786            }
787            Err(err) => {
788                // Clean up SSH key material if it was persisted, since the
789                // catalog item was not created.
790                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
791                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
792                        tracing::warn!(
793                            "Dropping SSH secret for failed connection has encountered an error: {}",
794                            e
795                        );
796                    } else {
797                        self.caching_secrets_reader.invalidate(connection_id);
798                    }
799                }
800                Err(err)
801            }
802        }
803    }
804
805    #[instrument]
806    pub(super) async fn sequence_create_database(
807        &mut self,
808        session: &Session,
809        plan: plan::CreateDatabasePlan,
810    ) -> Result<ExecuteResponse, AdapterError> {
811        let ops = vec![catalog::Op::CreateDatabase {
812            name: plan.name.clone(),
813            owner_id: *session.current_role_id(),
814        }];
815        match self.catalog_transact(Some(session), ops).await {
816            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
817            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
818                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
819            })) if plan.if_not_exists => {
820                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
821                Ok(ExecuteResponse::CreatedDatabase)
822            }
823            Err(err) => Err(err),
824        }
825    }
826
827    #[instrument]
828    pub(super) async fn sequence_create_schema(
829        &mut self,
830        session: &Session,
831        plan: plan::CreateSchemaPlan,
832    ) -> Result<ExecuteResponse, AdapterError> {
833        let op = catalog::Op::CreateSchema {
834            database_id: plan.database_spec,
835            schema_name: plan.schema_name.clone(),
836            owner_id: *session.current_role_id(),
837        };
838        match self.catalog_transact(Some(session), vec![op]).await {
839            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
840            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
841                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
842            })) if plan.if_not_exists => {
843                session.add_notice(AdapterNotice::SchemaAlreadyExists {
844                    name: plan.schema_name,
845                });
846                Ok(ExecuteResponse::CreatedSchema)
847            }
848            Err(err) => Err(err),
849        }
850    }
851
852    /// Validates the role attributes for a `CREATE ROLE` statement.
853    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
854        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
855            if attributes.superuser.is_some() || attributes.password.is_some() {
856                return Err(AdapterError::UnavailableFeature {
857                    feature: "SUPERUSER and PASSWORD attributes".to_string(),
858                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
859                });
860            }
861        }
862        Ok(())
863    }
864
865    #[instrument]
866    pub(super) async fn sequence_create_role(
867        &mut self,
868        conn_id: Option<&ConnectionId>,
869        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
870    ) -> Result<ExecuteResponse, AdapterError> {
871        self.validate_role_attributes(&attributes.clone())?;
872        let op = catalog::Op::CreateRole { name, attributes };
873        self.catalog_transact_with_context(conn_id, None, vec![op])
874            .await
875            .map(|_| ExecuteResponse::CreatedRole)
876    }
877
878    #[instrument]
879    pub(super) async fn sequence_create_network_policy(
880        &mut self,
881        session: &Session,
882        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
883    ) -> Result<ExecuteResponse, AdapterError> {
884        let op = catalog::Op::CreateNetworkPolicy {
885            rules,
886            name,
887            owner_id: *session.current_role_id(),
888        };
889        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
890            .await
891            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
892    }
893
894    #[instrument]
895    pub(super) async fn sequence_alter_network_policy(
896        &mut self,
897        session: &Session,
898        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
899    ) -> Result<ExecuteResponse, AdapterError> {
900        // TODO(network_policy): Consider role based network policies here.
901        let current_network_policy_name =
902            self.catalog().system_config().default_network_policy_name();
903        // Check if the way we're alerting the policy is still valid for the current connection.
904        if current_network_policy_name == name {
905            self.validate_alter_network_policy(session, &rules)?;
906        }
907
908        let op = catalog::Op::AlterNetworkPolicy {
909            id,
910            rules,
911            name,
912            owner_id: *session.current_role_id(),
913        };
914        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
915            .await
916            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
917    }
918
919    #[instrument]
920    pub(super) async fn sequence_create_table(
921        &mut self,
922        ctx: &mut ExecuteContext,
923        plan: plan::CreateTablePlan,
924        resolved_ids: ResolvedIds,
925    ) -> Result<ExecuteResponse, AdapterError> {
926        let plan::CreateTablePlan {
927            name,
928            table,
929            if_not_exists,
930        } = plan;
931
932        let conn_id = if table.temporary {
933            Some(ctx.session().conn_id())
934        } else {
935            None
936        };
937        let (table_id, global_id) = self.allocate_user_id().await?;
938        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
939
940        let data_source = match table.data_source {
941            plan::TableDataSource::TableWrites { defaults } => {
942                TableDataSource::TableWrites { defaults }
943            }
944            plan::TableDataSource::DataSource {
945                desc: data_source_plan,
946                timeline,
947            } => match data_source_plan {
948                plan::DataSourceDesc::IngestionExport {
949                    ingestion_id,
950                    external_reference,
951                    details,
952                    data_config,
953                } => TableDataSource::DataSource {
954                    desc: DataSourceDesc::IngestionExport {
955                        ingestion_id,
956                        external_reference,
957                        details,
958                        data_config,
959                    },
960                    timeline,
961                },
962                plan::DataSourceDesc::Webhook {
963                    validate_using,
964                    body_format,
965                    headers,
966                    cluster_id,
967                } => TableDataSource::DataSource {
968                    desc: DataSourceDesc::Webhook {
969                        validate_using,
970                        body_format,
971                        headers,
972                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
973                    },
974                    timeline,
975                },
976                o => {
977                    unreachable!("CREATE TABLE data source got {:?}", o)
978                }
979            },
980        };
981
982        let is_webhook = if let TableDataSource::DataSource {
983            desc: DataSourceDesc::Webhook { .. },
984            timeline: _,
985        } = &data_source
986        {
987            true
988        } else {
989            false
990        };
991
992        let table = Table {
993            create_sql: Some(table.create_sql),
994            desc: table.desc,
995            collections,
996            conn_id: conn_id.cloned(),
997            resolved_ids,
998            custom_logical_compaction_window: table.compaction_window,
999            is_retained_metrics_object: false,
1000            data_source,
1001        };
1002        let ops = vec![catalog::Op::CreateItem {
1003            id: table_id,
1004            name: name.clone(),
1005            item: CatalogItem::Table(table.clone()),
1006            owner_id: *ctx.session().current_role_id(),
1007        }];
1008
1009        let catalog_result = self
1010            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1011            .await;
1012
1013        if is_webhook {
1014            // try_get_webhook_url will make up a URL for things that are not
1015            // webhooks, so we guard against that here.
1016            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1017                ctx.session()
1018                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1019            }
1020        }
1021
1022        match catalog_result {
1023            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1024            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1025                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1026            })) if if_not_exists => {
1027                ctx.session_mut()
1028                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1029                        name: name.item,
1030                        ty: "table",
1031                    });
1032                Ok(ExecuteResponse::CreatedTable)
1033            }
1034            Err(err) => Err(err),
1035        }
1036    }
1037
1038    #[instrument]
1039    pub(super) async fn sequence_create_sink(
1040        &mut self,
1041        ctx: ExecuteContext,
1042        plan: plan::CreateSinkPlan,
1043        resolved_ids: ResolvedIds,
1044    ) {
1045        let plan::CreateSinkPlan {
1046            name,
1047            sink,
1048            with_snapshot,
1049            if_not_exists,
1050            in_cluster,
1051        } = plan;
1052
1053        // First try to allocate an ID and an OID. If either fails, we're done.
1054        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1055
1056        let catalog_sink = Sink {
1057            create_sql: sink.create_sql,
1058            global_id,
1059            from: sink.from,
1060            connection: sink.connection,
1061            envelope: sink.envelope,
1062            version: sink.version,
1063            with_snapshot,
1064            resolved_ids,
1065            cluster_id: in_cluster,
1066            commit_interval: sink.commit_interval,
1067        };
1068
1069        let ops = vec![catalog::Op::CreateItem {
1070            id: item_id,
1071            name: name.clone(),
1072            item: CatalogItem::Sink(catalog_sink.clone()),
1073            owner_id: *ctx.session().current_role_id(),
1074        }];
1075
1076        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1077
1078        match result {
1079            Ok(()) => {}
1080            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1081                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1082            })) if if_not_exists => {
1083                ctx.session()
1084                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1085                        name: name.item,
1086                        ty: "sink",
1087                    });
1088                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1089                return;
1090            }
1091            Err(e) => {
1092                ctx.retire(Err(e));
1093                return;
1094            }
1095        };
1096
1097        self.create_storage_export(global_id, &catalog_sink)
1098            .await
1099            .unwrap_or_terminate("cannot fail to create exports");
1100
1101        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1102            .await;
1103
1104        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1105    }
1106
1107    /// Validates that a view definition does not contain any expressions that may lead to
1108    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1109    ///
1110    /// We prevent these expressions so that we can add columns to system tables without
1111    /// changing the definition of the view.
1112    ///
1113    /// Here is a bit of a hand wavy proof as to why we only need to check the
1114    /// immediate view definition for system objects and ambiguous column
1115    /// references, and not the entire dependency tree:
1116    ///
1117    ///   - A view with no object references cannot have any ambiguous column
1118    ///   references to a system object, because it has no system objects.
1119    ///   - A view with a direct reference to a system object and a * or
1120    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1121    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1122    ///   any ambiguous column references to a system object, because all column
1123    ///   references are explicitly named.
1124    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1125    ///   system object cannot have any ambiguous column references to a system
1126    ///   object, because there are no system objects in the top level view and
1127    ///   all sub-views are guaranteed to have no ambiguous column references to
1128    ///   system objects.
1129    pub(super) fn validate_system_column_references(
1130        &self,
1131        uses_ambiguous_columns: bool,
1132        depends_on: &BTreeSet<GlobalId>,
1133    ) -> Result<(), AdapterError> {
1134        if uses_ambiguous_columns
1135            && depends_on
1136                .iter()
1137                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1138        {
1139            Err(AdapterError::AmbiguousSystemColumnReference)
1140        } else {
1141            Ok(())
1142        }
1143    }
1144
1145    #[instrument]
1146    pub(super) async fn sequence_create_type(
1147        &mut self,
1148        session: &Session,
1149        plan: plan::CreateTypePlan,
1150        resolved_ids: ResolvedIds,
1151    ) -> Result<ExecuteResponse, AdapterError> {
1152        let (item_id, global_id) = self.allocate_user_id().await?;
1153        // Validate the type definition (e.g., composite columns) before storing.
1154        plan.typ
1155            .inner
1156            .desc(&self.catalog().for_session(session))
1157            .map_err(AdapterError::from)?;
1158        let typ = Type {
1159            create_sql: Some(plan.typ.create_sql),
1160            global_id,
1161            details: CatalogTypeDetails {
1162                array_id: None,
1163                typ: plan.typ.inner,
1164                pg_metadata: None,
1165            },
1166            resolved_ids,
1167        };
1168        let op = catalog::Op::CreateItem {
1169            id: item_id,
1170            name: plan.name,
1171            item: CatalogItem::Type(typ),
1172            owner_id: *session.current_role_id(),
1173        };
1174        match self.catalog_transact(Some(session), vec![op]).await {
1175            Ok(()) => Ok(ExecuteResponse::CreatedType),
1176            Err(err) => Err(err),
1177        }
1178    }
1179
1180    #[instrument]
1181    pub(super) async fn sequence_comment_on(
1182        &mut self,
1183        session: &Session,
1184        plan: plan::CommentPlan,
1185    ) -> Result<ExecuteResponse, AdapterError> {
1186        let op = catalog::Op::Comment {
1187            object_id: plan.object_id,
1188            sub_component: plan.sub_component,
1189            comment: plan.comment,
1190        };
1191        self.catalog_transact(Some(session), vec![op]).await?;
1192        Ok(ExecuteResponse::Comment)
1193    }
1194
1195    #[instrument]
1196    pub(super) async fn sequence_drop_objects(
1197        &mut self,
1198        ctx: &mut ExecuteContext,
1199        plan::DropObjectsPlan {
1200            drop_ids,
1201            object_type,
1202            referenced_ids,
1203        }: plan::DropObjectsPlan,
1204    ) -> Result<ExecuteResponse, AdapterError> {
1205        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1206        let mut objects = Vec::new();
1207        for obj_id in &drop_ids {
1208            if !referenced_ids_hashset.contains(obj_id) {
1209                let object_info = ErrorMessageObjectDescription::from_id(
1210                    obj_id,
1211                    &self.catalog().for_session(ctx.session()),
1212                )
1213                .to_string();
1214                objects.push(object_info);
1215            }
1216        }
1217
1218        if !objects.is_empty() {
1219            ctx.session()
1220                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1221        }
1222
1223        // Collect GlobalIds for expression cache invalidation.
1224        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1225            .iter()
1226            .filter_map(|id| match id {
1227                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1228                _ => None,
1229            })
1230            .flatten()
1231            .collect();
1232
1233        let DropOps {
1234            ops,
1235            dropped_active_db,
1236            dropped_active_cluster,
1237            dropped_in_use_indexes,
1238        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1239
1240        self.catalog_transact_with_context(None, Some(ctx), ops)
1241            .await?;
1242
1243        // Invalidate expression cache entries for dropped objects.
1244        if !expr_cache_invalidate_ids.is_empty() {
1245            let _fut = self.catalog().update_expression_cache(
1246                Default::default(),
1247                Default::default(),
1248                expr_cache_invalidate_ids,
1249            );
1250        }
1251
1252        fail::fail_point!("after_sequencer_drop_replica");
1253
1254        if dropped_active_db {
1255            ctx.session()
1256                .add_notice(AdapterNotice::DroppedActiveDatabase {
1257                    name: ctx.session().vars().database().to_string(),
1258                });
1259        }
1260        if dropped_active_cluster {
1261            ctx.session()
1262                .add_notice(AdapterNotice::DroppedActiveCluster {
1263                    name: ctx.session().vars().cluster().to_string(),
1264                });
1265        }
1266        for dropped_in_use_index in dropped_in_use_indexes {
1267            ctx.session()
1268                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1269            self.metrics
1270                .optimization_notices
1271                .with_label_values(&["DroppedInUseIndex"])
1272                .inc_by(1);
1273        }
1274        Ok(ExecuteResponse::DroppedObject(object_type))
1275    }
1276
1277    fn validate_dropped_role_ownership(
1278        &self,
1279        session: &Session,
1280        dropped_roles: &BTreeMap<RoleId, &str>,
1281    ) -> Result<(), AdapterError> {
1282        fn privilege_check(
1283            privileges: &PrivilegeMap,
1284            dropped_roles: &BTreeMap<RoleId, &str>,
1285            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1286            object_id: &SystemObjectId,
1287            catalog: &ConnCatalog,
1288        ) {
1289            for privilege in privileges.all_values() {
1290                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1291                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1292                    let object_description =
1293                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1294                    dependent_objects
1295                        .entry(role_name.to_string())
1296                        .or_default()
1297                        .push(format!(
1298                            "privileges on {object_description} granted by {grantor_name}",
1299                        ));
1300                }
1301                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1302                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1303                    let object_description =
1304                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1305                    dependent_objects
1306                        .entry(role_name.to_string())
1307                        .or_default()
1308                        .push(format!(
1309                            "privileges granted on {object_description} to {grantee_name}"
1310                        ));
1311                }
1312            }
1313        }
1314
1315        let catalog = self.catalog().for_session(session);
1316        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1317        for entry in self.catalog.entries() {
1318            let id = SystemObjectId::Object(entry.id().into());
1319            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1320                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1321                dependent_objects
1322                    .entry(role_name.to_string())
1323                    .or_default()
1324                    .push(format!("owner of {object_description}"));
1325            }
1326            privilege_check(
1327                entry.privileges(),
1328                dropped_roles,
1329                &mut dependent_objects,
1330                &id,
1331                &catalog,
1332            );
1333        }
1334        for database in self.catalog.databases() {
1335            let database_id = SystemObjectId::Object(database.id().into());
1336            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1337                let object_description =
1338                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1339                dependent_objects
1340                    .entry(role_name.to_string())
1341                    .or_default()
1342                    .push(format!("owner of {object_description}"));
1343            }
1344            privilege_check(
1345                &database.privileges,
1346                dropped_roles,
1347                &mut dependent_objects,
1348                &database_id,
1349                &catalog,
1350            );
1351            for schema in database.schemas_by_id.values() {
1352                let schema_id = SystemObjectId::Object(
1353                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1354                );
1355                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1356                    let object_description =
1357                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1358                    dependent_objects
1359                        .entry(role_name.to_string())
1360                        .or_default()
1361                        .push(format!("owner of {object_description}"));
1362                }
1363                privilege_check(
1364                    &schema.privileges,
1365                    dropped_roles,
1366                    &mut dependent_objects,
1367                    &schema_id,
1368                    &catalog,
1369                );
1370            }
1371        }
1372        for cluster in self.catalog.clusters() {
1373            let cluster_id = SystemObjectId::Object(cluster.id().into());
1374            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1375                let object_description =
1376                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1377                dependent_objects
1378                    .entry(role_name.to_string())
1379                    .or_default()
1380                    .push(format!("owner of {object_description}"));
1381            }
1382            privilege_check(
1383                &cluster.privileges,
1384                dropped_roles,
1385                &mut dependent_objects,
1386                &cluster_id,
1387                &catalog,
1388            );
1389            for replica in cluster.replicas() {
1390                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1391                    let replica_id =
1392                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1393                    let object_description =
1394                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1395                    dependent_objects
1396                        .entry(role_name.to_string())
1397                        .or_default()
1398                        .push(format!("owner of {object_description}"));
1399                }
1400            }
1401        }
1402        privilege_check(
1403            self.catalog().system_privileges(),
1404            dropped_roles,
1405            &mut dependent_objects,
1406            &SystemObjectId::System,
1407            &catalog,
1408        );
1409        for (default_privilege_object, default_privilege_acl_items) in
1410            self.catalog.default_privileges()
1411        {
1412            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1413                dependent_objects
1414                    .entry(role_name.to_string())
1415                    .or_default()
1416                    .push(format!(
1417                        "default privileges on {}S created by {}",
1418                        default_privilege_object.object_type, role_name
1419                    ));
1420            }
1421            for default_privilege_acl_item in default_privilege_acl_items {
1422                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1423                    dependent_objects
1424                        .entry(role_name.to_string())
1425                        .or_default()
1426                        .push(format!(
1427                            "default privileges on {}S granted to {}",
1428                            default_privilege_object.object_type, role_name
1429                        ));
1430                }
1431            }
1432        }
1433
1434        if !dependent_objects.is_empty() {
1435            Err(AdapterError::DependentObject(dependent_objects))
1436        } else {
1437            Ok(())
1438        }
1439    }
1440
1441    #[instrument]
1442    pub(super) async fn sequence_drop_owned(
1443        &mut self,
1444        session: &Session,
1445        plan: plan::DropOwnedPlan,
1446    ) -> Result<ExecuteResponse, AdapterError> {
1447        for role_id in &plan.role_ids {
1448            self.catalog().ensure_not_reserved_role(role_id)?;
1449        }
1450
1451        let mut privilege_revokes = plan.privilege_revokes;
1452
1453        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1454        let session_catalog = self.catalog().for_session(session);
1455        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1456            && !session.is_superuser()
1457        {
1458            // Obtain all roles that the current session is a member of.
1459            let role_membership =
1460                session_catalog.collect_role_membership(session.current_role_id());
1461            let invalid_revokes: BTreeSet<_> = privilege_revokes
1462                .extract_if(.., |(_, privilege)| {
1463                    !role_membership.contains(&privilege.grantor)
1464                })
1465                .map(|(object_id, _)| object_id)
1466                .collect();
1467            for invalid_revoke in invalid_revokes {
1468                let object_description =
1469                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1470                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1471            }
1472        }
1473
1474        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1475            catalog::Op::UpdatePrivilege {
1476                target_id: object_id,
1477                privilege,
1478                variant: UpdatePrivilegeVariant::Revoke,
1479            }
1480        });
1481        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1482            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1483                privilege_object,
1484                privilege_acl_item,
1485                variant: UpdatePrivilegeVariant::Revoke,
1486            },
1487        );
1488        let DropOps {
1489            ops: drop_ops,
1490            dropped_active_db,
1491            dropped_active_cluster,
1492            dropped_in_use_indexes,
1493        } = self.sequence_drop_common(session, plan.drop_ids)?;
1494
1495        let ops = privilege_revoke_ops
1496            .chain(default_privilege_revoke_ops)
1497            .chain(drop_ops.into_iter())
1498            .collect();
1499
1500        self.catalog_transact(Some(session), ops).await?;
1501
1502        if dropped_active_db {
1503            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1504                name: session.vars().database().to_string(),
1505            });
1506        }
1507        if dropped_active_cluster {
1508            session.add_notice(AdapterNotice::DroppedActiveCluster {
1509                name: session.vars().cluster().to_string(),
1510            });
1511        }
1512        for dropped_in_use_index in dropped_in_use_indexes {
1513            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1514        }
1515        Ok(ExecuteResponse::DroppedOwned)
1516    }
1517
1518    fn sequence_drop_common(
1519        &self,
1520        session: &Session,
1521        ids: Vec<ObjectId>,
1522    ) -> Result<DropOps, AdapterError> {
1523        let mut dropped_active_db = false;
1524        let mut dropped_active_cluster = false;
1525        let mut dropped_in_use_indexes = Vec::new();
1526        let mut dropped_roles = BTreeMap::new();
1527        let mut dropped_databases = BTreeSet::new();
1528        let mut dropped_schemas = BTreeSet::new();
1529        // Dropping either the group role or the member role of a role membership will trigger a
1530        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1531        // role membership twice.
1532        let mut role_revokes = BTreeSet::new();
1533        // Dropping a database or a schema will revoke all default roles associated with that
1534        // database or schema.
1535        let mut default_privilege_revokes = BTreeSet::new();
1536
1537        // Clusters we're dropping
1538        let mut clusters_to_drop = BTreeSet::new();
1539
1540        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1541        for id in &ids {
1542            match id {
1543                ObjectId::Database(id) => {
1544                    let name = self.catalog().get_database(id).name();
1545                    if name == session.vars().database() {
1546                        dropped_active_db = true;
1547                    }
1548                    dropped_databases.insert(id);
1549                }
1550                ObjectId::Schema((_, spec)) => {
1551                    if let SchemaSpecifier::Id(id) = spec {
1552                        dropped_schemas.insert(id);
1553                    }
1554                }
1555                ObjectId::Cluster(id) => {
1556                    clusters_to_drop.insert(*id);
1557                    if let Some(active_id) = self
1558                        .catalog()
1559                        .active_cluster(session)
1560                        .ok()
1561                        .map(|cluster| cluster.id())
1562                    {
1563                        if id == &active_id {
1564                            dropped_active_cluster = true;
1565                        }
1566                    }
1567                }
1568                ObjectId::Role(id) => {
1569                    let role = self.catalog().get_role(id);
1570                    let name = role.name();
1571                    dropped_roles.insert(*id, name);
1572                    // We must revoke all role memberships that the dropped roles belongs to.
1573                    for (group_id, grantor_id) in &role.membership.map {
1574                        role_revokes.insert((*group_id, *id, *grantor_id));
1575                    }
1576                }
1577                ObjectId::Item(id) => {
1578                    if let Some(index) = self.catalog().get_entry(id).index() {
1579                        let humanizer = self.catalog().for_session(session);
1580                        let dependants = self
1581                            .controller
1582                            .compute
1583                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1584                            .ok()
1585                            .into_iter()
1586                            .flatten()
1587                            .filter(|dependant_id| {
1588                                // Transient Ids belong to Peeks. We are not interested for now in
1589                                // peeks depending on a dropped index.
1590                                // TODO: show a different notice in this case. Something like
1591                                // "There is an in-progress ad hoc SELECT that uses the dropped
1592                                // index. The resources used by the index will be freed when all
1593                                // such SELECTs complete."
1594                                if dependant_id.is_transient() {
1595                                    return false;
1596                                }
1597                                // The item should exist, but don't panic if it doesn't.
1598                                let Some(dependent_id) = humanizer
1599                                    .try_get_item_by_global_id(dependant_id)
1600                                    .map(|item| item.id())
1601                                else {
1602                                    return false;
1603                                };
1604                                // If the dependent object is also being dropped, then there is no
1605                                // problem, so we don't want a notice.
1606                                !ids_set.contains(&ObjectId::Item(dependent_id))
1607                            })
1608                            .flat_map(|dependant_id| {
1609                                // If we are not able to find a name for this ID it probably means
1610                                // we have already dropped the compute collection, in which case we
1611                                // can ignore it.
1612                                humanizer.humanize_id(dependant_id)
1613                            })
1614                            .collect_vec();
1615                        if !dependants.is_empty() {
1616                            dropped_in_use_indexes.push(DroppedInUseIndex {
1617                                index_name: humanizer
1618                                    .humanize_id(index.global_id())
1619                                    .unwrap_or_else(|| id.to_string()),
1620                                dependant_objects: dependants,
1621                            });
1622                        }
1623                    }
1624                }
1625                _ => {}
1626            }
1627        }
1628
1629        for id in &ids {
1630            match id {
1631                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1632                // unless they are internal replicas, which exist outside the scope
1633                // of managed clusters.
1634                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1635                    if !clusters_to_drop.contains(cluster_id) {
1636                        let cluster = self.catalog.get_cluster(*cluster_id);
1637                        if cluster.is_managed() {
1638                            let replica =
1639                                cluster.replica(*replica_id).expect("Catalog out of sync");
1640                            if !replica.config.location.internal() {
1641                                coord_bail!("cannot drop replica of managed cluster");
1642                            }
1643                        }
1644                    }
1645                }
1646                _ => {}
1647            }
1648        }
1649
1650        for role_id in dropped_roles.keys() {
1651            self.catalog().ensure_not_reserved_role(role_id)?;
1652        }
1653        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1654        // If any role is a member of a dropped role, then we must revoke that membership.
1655        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1656        for role in self.catalog().user_roles() {
1657            for dropped_role_id in
1658                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1659            {
1660                role_revokes.insert((
1661                    **dropped_role_id,
1662                    role.id(),
1663                    *role
1664                        .membership
1665                        .map
1666                        .get(*dropped_role_id)
1667                        .expect("included in keys above"),
1668                ));
1669            }
1670        }
1671
1672        for (default_privilege_object, default_privilege_acls) in
1673            self.catalog().default_privileges()
1674        {
1675            if matches!(
1676                &default_privilege_object.database_id,
1677                Some(database_id) if dropped_databases.contains(database_id),
1678            ) || matches!(
1679                &default_privilege_object.schema_id,
1680                Some(schema_id) if dropped_schemas.contains(schema_id),
1681            ) {
1682                for default_privilege_acl in default_privilege_acls {
1683                    default_privilege_revokes.insert((
1684                        default_privilege_object.clone(),
1685                        default_privilege_acl.clone(),
1686                    ));
1687                }
1688            }
1689        }
1690
1691        let ops = role_revokes
1692            .into_iter()
1693            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1694                role_id,
1695                member_id,
1696                grantor_id,
1697            })
1698            .chain(default_privilege_revokes.into_iter().map(
1699                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1700                    privilege_object,
1701                    privilege_acl_item,
1702                    variant: UpdatePrivilegeVariant::Revoke,
1703                },
1704            ))
1705            .chain(iter::once(catalog::Op::DropObjects(
1706                ids.into_iter()
1707                    .map(DropObjectInfo::manual_drop_from_object_id)
1708                    .collect(),
1709            )))
1710            .collect();
1711
1712        Ok(DropOps {
1713            ops,
1714            dropped_active_db,
1715            dropped_active_cluster,
1716            dropped_in_use_indexes,
1717        })
1718    }
1719
1720    pub(super) fn sequence_explain_schema(
1721        &self,
1722        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1723    ) -> Result<ExecuteResponse, AdapterError> {
1724        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1725            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1726        })?;
1727
1728        let json_string = json_string(&json_value);
1729        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1730        Ok(Self::send_immediate_rows(row))
1731    }
1732
1733    pub(super) fn sequence_show_all_variables(
1734        &self,
1735        session: &Session,
1736    ) -> Result<ExecuteResponse, AdapterError> {
1737        let mut rows = viewable_variables(self.catalog().state(), session)
1738            .map(|v| (v.name(), v.value(), v.description()))
1739            .collect::<Vec<_>>();
1740        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1741
1742        // TODO(parkmycar): Pack all of these into a single RowCollection.
1743        let rows: Vec<_> = rows
1744            .into_iter()
1745            .map(|(name, val, desc)| {
1746                Row::pack_slice(&[
1747                    Datum::String(name),
1748                    Datum::String(&val),
1749                    Datum::String(desc),
1750                ])
1751            })
1752            .collect();
1753        Ok(Self::send_immediate_rows(rows))
1754    }
1755
1756    pub(super) fn sequence_show_variable(
1757        &self,
1758        session: &Session,
1759        plan: plan::ShowVariablePlan,
1760    ) -> Result<ExecuteResponse, AdapterError> {
1761        if &plan.name == SCHEMA_ALIAS {
1762            let schemas = self.catalog.resolve_search_path(session);
1763            let schema = schemas.first();
1764            return match schema {
1765                Some((database_spec, schema_spec)) => {
1766                    let schema_name = &self
1767                        .catalog
1768                        .get_schema(database_spec, schema_spec, session.conn_id())
1769                        .name()
1770                        .schema;
1771                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1772                    Ok(Self::send_immediate_rows(row))
1773                }
1774                None => {
1775                    if session.vars().current_object_missing_warnings() {
1776                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1777                            search_path: session
1778                                .vars()
1779                                .search_path()
1780                                .into_iter()
1781                                .map(|schema| schema.to_string())
1782                                .collect(),
1783                        });
1784                    }
1785                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1786                }
1787            };
1788        }
1789
1790        let variable = session
1791            .vars()
1792            .get(self.catalog().system_config(), &plan.name)
1793            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1794
1795        // In lieu of plumbing the user to all system config functions, just check that the var is
1796        // visible.
1797        variable.visible(session.user(), self.catalog().system_config())?;
1798
1799        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1800        if variable.name() == vars::DATABASE.name()
1801            && matches!(
1802                self.catalog().resolve_database(&variable.value()),
1803                Err(CatalogError::UnknownDatabase(_))
1804            )
1805            && session.vars().current_object_missing_warnings()
1806        {
1807            let name = variable.value();
1808            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1809        } else if variable.name() == vars::CLUSTER.name()
1810            && matches!(
1811                self.catalog().resolve_cluster(&variable.value()),
1812                Err(CatalogError::UnknownCluster(_))
1813            )
1814            && session.vars().current_object_missing_warnings()
1815        {
1816            let name = variable.value();
1817            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1818        }
1819        Ok(Self::send_immediate_rows(row))
1820    }
1821
1822    #[instrument]
1823    pub(super) async fn sequence_inspect_shard(
1824        &self,
1825        session: &Session,
1826        plan: plan::InspectShardPlan,
1827    ) -> Result<ExecuteResponse, AdapterError> {
1828        // TODO: Not thrilled about this rbac special case here, but probably
1829        // sufficient for now.
1830        if !session.user().is_internal() {
1831            return Err(AdapterError::Unauthorized(
1832                rbac::UnauthorizedError::MzSystem {
1833                    action: "inspect".into(),
1834                },
1835            ));
1836        }
1837        let state = self
1838            .controller
1839            .storage
1840            .inspect_persist_state(plan.id)
1841            .await?;
1842        let jsonb = Jsonb::from_serde_json(state)?;
1843        Ok(Self::send_immediate_rows(jsonb.into_row()))
1844    }
1845
1846    #[instrument]
1847    pub(super) fn sequence_set_variable(
1848        &self,
1849        session: &mut Session,
1850        plan: plan::SetVariablePlan,
1851    ) -> Result<ExecuteResponse, AdapterError> {
1852        let (name, local) = (plan.name, plan.local);
1853        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1854            self.validate_set_isolation_level(session)?;
1855        }
1856        if &name == vars::CLUSTER.name() {
1857            self.validate_set_cluster(session)?;
1858        }
1859
1860        let vars = session.vars_mut();
1861        let values = match plan.value {
1862            plan::VariableValue::Default => None,
1863            plan::VariableValue::Values(values) => Some(values),
1864        };
1865
1866        match values {
1867            Some(values) => {
1868                vars.set(
1869                    self.catalog().system_config(),
1870                    &name,
1871                    VarInput::SqlSet(&values),
1872                    local,
1873                )?;
1874
1875                let vars = session.vars();
1876
1877                // Emit a warning when deprecated variables are used.
1878                // TODO(database-issues#8069) remove this after sufficient time has passed
1879                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1880                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1881                } else if name == vars::CLUSTER.name()
1882                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1883                {
1884                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1885                }
1886
1887                // Database or cluster value does not correspond to a catalog item.
1888                if name.as_str() == vars::DATABASE.name()
1889                    && matches!(
1890                        self.catalog().resolve_database(vars.database()),
1891                        Err(CatalogError::UnknownDatabase(_))
1892                    )
1893                    && session.vars().current_object_missing_warnings()
1894                {
1895                    let name = vars.database().to_string();
1896                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1897                } else if name.as_str() == vars::CLUSTER.name()
1898                    && matches!(
1899                        self.catalog().resolve_cluster(vars.cluster()),
1900                        Err(CatalogError::UnknownCluster(_))
1901                    )
1902                    && session.vars().current_object_missing_warnings()
1903                {
1904                    let name = vars.cluster().to_string();
1905                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1906                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1907                    let v = values.into_first().to_lowercase();
1908                    if v == IsolationLevel::ReadUncommitted.as_str()
1909                        || v == IsolationLevel::ReadCommitted.as_str()
1910                        || v == IsolationLevel::RepeatableRead.as_str()
1911                    {
1912                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1913                            isolation_level: v,
1914                        });
1915                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1916                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1917                    }
1918                }
1919            }
1920            None => vars.reset(self.catalog().system_config(), &name, local)?,
1921        }
1922
1923        Ok(ExecuteResponse::SetVariable { name, reset: false })
1924    }
1925
1926    pub(super) fn sequence_reset_variable(
1927        &self,
1928        session: &mut Session,
1929        plan: plan::ResetVariablePlan,
1930    ) -> Result<ExecuteResponse, AdapterError> {
1931        let name = plan.name;
1932        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1933            self.validate_set_isolation_level(session)?;
1934        }
1935        if &name == vars::CLUSTER.name() {
1936            self.validate_set_cluster(session)?;
1937        }
1938        session
1939            .vars_mut()
1940            .reset(self.catalog().system_config(), &name, false)?;
1941        Ok(ExecuteResponse::SetVariable { name, reset: true })
1942    }
1943
1944    pub(super) fn sequence_set_transaction(
1945        &self,
1946        session: &mut Session,
1947        plan: plan::SetTransactionPlan,
1948    ) -> Result<ExecuteResponse, AdapterError> {
1949        // TODO(jkosh44) Only supports isolation levels for now.
1950        for mode in plan.modes {
1951            match mode {
1952                TransactionMode::AccessMode(_) => {
1953                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1954                }
1955                TransactionMode::IsolationLevel(isolation_level) => {
1956                    self.validate_set_isolation_level(session)?;
1957
1958                    session.vars_mut().set(
1959                        self.catalog().system_config(),
1960                        TRANSACTION_ISOLATION_VAR_NAME,
1961                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1962                        plan.local,
1963                    )?
1964                }
1965            }
1966        }
1967        Ok(ExecuteResponse::SetVariable {
1968            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1969            reset: false,
1970        })
1971    }
1972
1973    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1974        if session.transaction().contains_ops() {
1975            Err(AdapterError::InvalidSetIsolationLevel)
1976        } else {
1977            Ok(())
1978        }
1979    }
1980
1981    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1982        if session.transaction().contains_ops() {
1983            Err(AdapterError::InvalidSetCluster)
1984        } else {
1985            Ok(())
1986        }
1987    }
1988
1989    #[instrument]
1990    pub(super) async fn sequence_end_transaction(
1991        &mut self,
1992        mut ctx: ExecuteContext,
1993        mut action: EndTransactionAction,
1994    ) {
1995        // If the transaction has failed, we can only rollback.
1996        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1997            (&action, ctx.session().transaction())
1998        {
1999            action = EndTransactionAction::Rollback;
2000        }
2001        let response = match action {
2002            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2003                params: BTreeMap::new(),
2004            }),
2005            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2006                params: BTreeMap::new(),
2007            }),
2008        };
2009
2010        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2011
2012        let (response, action) = match result {
2013            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2014                (response, action)
2015            }
2016            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2017                // Make sure we have the correct set of write locks for this transaction.
2018                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2019                // transactions.
2020                let validated_locks = match write_lock_guards {
2021                    None => None,
2022                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2023                        Ok(locks) => Some(locks),
2024                        Err(missing) => {
2025                            tracing::error!(?missing, "programming error, missing write locks");
2026                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2027                        }
2028                    },
2029                };
2030
2031                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2032                for WriteOp { id, rows } in writes {
2033                    let total_rows = collected_writes.entry(id).or_default();
2034                    total_rows.push(rows);
2035                }
2036
2037                self.submit_write(PendingWriteTxn::User {
2038                    span: Span::current(),
2039                    writes: collected_writes,
2040                    write_locks: validated_locks,
2041                    responder: UserWriteResponder::Session(PendingTxn {
2042                        ctx,
2043                        response,
2044                        action,
2045                    }),
2046                });
2047                return;
2048            }
2049            Ok((
2050                Some(TransactionOps::Peeks {
2051                    determination,
2052                    requires_linearization: RequireLinearization::Required,
2053                    ..
2054                }),
2055                _,
2056            )) if ctx.session().vars().transaction_isolation()
2057                == &IsolationLevel::StrictSerializable =>
2058            {
2059                let conn_id = ctx.session().conn_id().clone();
2060                let pending_read_txn = PendingReadTxn {
2061                    txn: PendingRead::Read {
2062                        txn: PendingTxn {
2063                            ctx,
2064                            response,
2065                            action,
2066                        },
2067                    },
2068                    timestamp_context: determination.timestamp_context,
2069                    created: Instant::now(),
2070                    num_requeues: 0,
2071                    otel_ctx: OpenTelemetryContext::obtain(),
2072                };
2073                self.strict_serializable_reads_tx
2074                    .send((conn_id, pending_read_txn))
2075                    .expect("sending to strict_serializable_reads_tx cannot fail");
2076                return;
2077            }
2078            Ok((
2079                Some(TransactionOps::Peeks {
2080                    determination,
2081                    requires_linearization: RequireLinearization::Required,
2082                    ..
2083                }),
2084                _,
2085            )) if ctx.session().vars().transaction_isolation()
2086                == &IsolationLevel::StrongSessionSerializable =>
2087            {
2088                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2089                    ctx.session_mut()
2090                        .ensure_timestamp_oracle(timeline.clone())
2091                        .apply_write(*ts);
2092                }
2093                (response, action)
2094            }
2095            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2096                self.internal_cmd_tx
2097                    .send(Message::ExecuteSingleStatementTransaction {
2098                        ctx,
2099                        otel_ctx: OpenTelemetryContext::obtain(),
2100                        stmt,
2101                        params,
2102                    })
2103                    .expect("must send");
2104                return;
2105            }
2106            Ok((_, _)) => (response, action),
2107            Err(err) => (Err(err), EndTransactionAction::Rollback),
2108        };
2109        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2110        // Append any parameters that changed to the response.
2111        let response = response.map(|mut r| {
2112            r.extend_params(changed);
2113            ExecuteResponse::from(r)
2114        });
2115
2116        ctx.retire(response);
2117    }
2118
2119    #[instrument]
2120    async fn sequence_end_transaction_inner(
2121        &mut self,
2122        ctx: &mut ExecuteContext,
2123        action: EndTransactionAction,
2124    ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2125        let txn = self.clear_transaction(ctx.session_mut()).await;
2126
2127        if let EndTransactionAction::Commit = action {
2128            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2129                match &mut ops {
2130                    TransactionOps::Writes(writes) => {
2131                        for WriteOp { id, .. } in &mut writes.iter() {
2132                            // Re-verify this id exists.
2133                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2134                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2135                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2136                                })
2137                            })?;
2138                        }
2139
2140                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2141                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2142                    }
2143                    TransactionOps::DDL {
2144                        ops,
2145                        state: _,
2146                        side_effects,
2147                        revision,
2148                        snapshot: _,
2149                    } => {
2150                        // Make sure our catalog hasn't changed.
2151                        if *revision != self.catalog().transient_revision() {
2152                            return Err(AdapterError::DDLTransactionRace);
2153                        }
2154                        // Commit all of our queued ops.
2155                        let ops = std::mem::take(ops);
2156                        let side_effects = std::mem::take(side_effects);
2157                        self.catalog_transact_with_side_effects(
2158                            Some(ctx),
2159                            ops,
2160                            move |a, mut ctx| {
2161                                Box::pin(async move {
2162                                    for side_effect in side_effects {
2163                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2164                                    }
2165                                })
2166                            },
2167                        )
2168                        .await?;
2169                    }
2170                    _ => (),
2171                }
2172                return Ok((Some(ops), write_lock_guards));
2173            }
2174        }
2175
2176        Ok((None, None))
2177    }
2178
2179    pub(super) async fn sequence_side_effecting_func(
2180        &mut self,
2181        ctx: ExecuteContext,
2182        plan: SideEffectingFunc,
2183    ) {
2184        match plan {
2185            SideEffectingFunc::PgCancelBackend { connection_id } => {
2186                let Some(connection_id) = connection_id else {
2187                    // The argument was `NULL`, so, like in PostgreSQL, the
2188                    // function returns `NULL`.
2189                    ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[
2190                        Datum::Null,
2191                    ]))));
2192                    return;
2193                };
2194
2195                if ctx.session().conn_id().unhandled() == connection_id {
2196                    // As a special case, if we're canceling ourselves, we send
2197                    // back a canceled resposne to the client issuing the query,
2198                    // and so we need to do no further processing of the cancel.
2199                    ctx.retire(Err(AdapterError::Canceled));
2200                    return;
2201                }
2202
2203                let res = if let Some((id_handle, _conn_meta)) =
2204                    self.active_conns.get_key_value(&connection_id)
2205                {
2206                    // check_plan already verified role membership.
2207                    self.handle_privileged_cancel(id_handle.clone()).await;
2208                    Datum::True
2209                } else {
2210                    Datum::False
2211                };
2212                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2213            }
2214        }
2215    }
2216
2217    /// Execute a side-effecting function from the frontend peek path.
2218    /// This is separate from `sequence_side_effecting_func` because
2219    /// - It doesn't have an ExecuteContext.
2220    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2221    ///   peek sequencing can't look at `active_conns`.
2222    ///
2223    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2224    /// sequencing.
2225    pub(crate) async fn execute_side_effecting_func(
2226        &mut self,
2227        plan: SideEffectingFunc,
2228        conn_id: ConnectionId,
2229        current_role: RoleId,
2230    ) -> Result<ExecuteResponse, AdapterError> {
2231        match plan {
2232            SideEffectingFunc::PgCancelBackend { connection_id } => {
2233                let Some(connection_id) = connection_id else {
2234                    // The argument was `NULL`, so, like in PostgreSQL, the
2235                    // function returns `NULL`.
2236                    return Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])));
2237                };
2238
2239                if conn_id.unhandled() == connection_id {
2240                    // As a special case, if we're canceling ourselves, we return
2241                    // a canceled response to the client issuing the query,
2242                    // and so we need to do no further processing of the cancel.
2243                    return Err(AdapterError::Canceled);
2244                }
2245
2246                // Perform RBAC check: the current user must be a member of the role
2247                // that owns the connection being cancelled.
2248                if let Some((_id_handle, conn_meta)) =
2249                    self.active_conns.get_key_value(&connection_id)
2250                {
2251                    let target_role = *conn_meta.authenticated_role_id();
2252                    let role_membership = self
2253                        .catalog()
2254                        .state()
2255                        .collect_role_membership(&current_role);
2256                    if !role_membership.contains(&target_role) {
2257                        let target_role_name = self
2258                            .catalog()
2259                            .try_get_role(&target_role)
2260                            .map(|role| role.name().to_string())
2261                            .unwrap_or_else(|| target_role.to_string());
2262                        return Err(AdapterError::Unauthorized(
2263                            rbac::UnauthorizedError::RoleMembership {
2264                                role_names: vec![target_role_name],
2265                            },
2266                        ));
2267                    }
2268
2269                    // RBAC check passed, proceed with cancellation.
2270                    let id_handle = self
2271                        .active_conns
2272                        .get_key_value(&connection_id)
2273                        .map(|(id, _)| id.clone())
2274                        .expect("checked above");
2275                    self.handle_privileged_cancel(id_handle).await;
2276                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2277                } else {
2278                    // Connection not found, return false.
2279                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2280                }
2281            }
2282        }
2283    }
2284
2285    /// Inner method that performs the actual real-time recency timestamp determination.
2286    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2287    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2288    pub(crate) async fn determine_real_time_recent_timestamp(
2289        &self,
2290        source_ids: impl Iterator<Item = GlobalId>,
2291        real_time_recency_timeout: Duration,
2292    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2293        let item_ids = source_ids
2294            .map(|gid| {
2295                self.catalog
2296                    .try_resolve_item_id(&gid)
2297                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2298            })
2299            .collect::<Result<Vec<_>, _>>()?;
2300
2301        // Find all dependencies transitively because we need to ensure that
2302        // RTR queries determine the timestamp from the sources' (i.e.
2303        // storage objects that ingest data from external systems) remap
2304        // data. We "cheat" a little bit and filter out any IDs that aren't
2305        // user objects because we know they are not a RTR source.
2306        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2307        // If none of the sources are user objects, we don't need to provide
2308        // a RTR timestamp.
2309        if to_visit.is_empty() {
2310            return Ok(None);
2311        }
2312
2313        let mut timestamp_objects = BTreeSet::new();
2314
2315        while let Some(id) = to_visit.pop_front() {
2316            timestamp_objects.insert(id);
2317            to_visit.extend(
2318                self.catalog()
2319                    .get_entry(&id)
2320                    .uses()
2321                    .into_iter()
2322                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2323            );
2324        }
2325        let timestamp_objects = timestamp_objects
2326            .into_iter()
2327            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2328            .collect();
2329
2330        let r = self
2331            .controller
2332            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2333            .await?;
2334
2335        Ok(Some(r))
2336    }
2337
2338    pub(crate) async fn await_real_time_recent_timestamp<F>(
2339        catalog: Arc<Catalog>,
2340        fut: F,
2341    ) -> Result<Timestamp, AdapterError>
2342    where
2343        F: Future<Output = Result<Timestamp, StorageError>>,
2344    {
2345        fut.await
2346            .map_err(|error| Self::real_time_recent_timestamp_error(&catalog, error))
2347    }
2348
2349    fn real_time_recent_timestamp_error(catalog: &Catalog, error: StorageError) -> AdapterError {
2350        let rtr_name = |id: &GlobalId| {
2351            catalog
2352                .try_get_entry_by_global_id(id)
2353                .map(|e| e.name().item.clone())
2354                .unwrap_or_else(|| id.to_string())
2355        };
2356
2357        match error {
2358            StorageError::RtrTimeout(id) => AdapterError::RtrTimeout(rtr_name(&id)),
2359            StorageError::RtrDropFailure(id) => AdapterError::RtrDropFailure(rtr_name(&id)),
2360            error => error.into(),
2361        }
2362    }
2363
2364    /// Checks to see if the session needs a real time recency timestamp and if so returns
2365    /// a future that will return the timestamp.
2366    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2367        &self,
2368        session: &Session,
2369        source_ids: impl Iterator<Item = GlobalId>,
2370    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2371        let vars = session.vars();
2372
2373        if vars.real_time_recency()
2374            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2375            && !session.contains_read_timestamp()
2376        {
2377            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2378                .await
2379        } else {
2380            Ok(None)
2381        }
2382    }
2383
2384    #[instrument]
2385    pub(super) async fn sequence_explain_plan(
2386        &mut self,
2387        ctx: ExecuteContext,
2388        plan: plan::ExplainPlanPlan,
2389        target_cluster: TargetCluster,
2390    ) {
2391        match &plan.explainee {
2392            plan::Explainee::Statement(stmt) => match stmt {
2393                plan::ExplaineeStatement::CreateView { .. } => {
2394                    self.explain_create_view(ctx, plan).await;
2395                }
2396                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2397                    self.explain_create_materialized_view(ctx, plan).await;
2398                }
2399                plan::ExplaineeStatement::CreateIndex { .. } => {
2400                    self.explain_create_index(ctx, plan).await;
2401                }
2402                plan::ExplaineeStatement::Select { .. } => {
2403                    self.explain_peek(ctx, plan, target_cluster).await;
2404                }
2405                plan::ExplaineeStatement::Subscribe { .. } => {
2406                    self.explain_subscribe(ctx, plan, target_cluster).await;
2407                }
2408            },
2409            plan::Explainee::View(_) => {
2410                let result = self.explain_view(&ctx, plan);
2411                ctx.retire(result);
2412            }
2413            plan::Explainee::MaterializedView(_) => {
2414                let result = self.explain_materialized_view(&ctx, plan);
2415                ctx.retire(result);
2416            }
2417            plan::Explainee::Index(_) => {
2418                let result = self.explain_index(&ctx, plan);
2419                ctx.retire(result);
2420            }
2421            plan::Explainee::ReplanView(_) => {
2422                self.explain_replan_view(ctx, plan).await;
2423            }
2424            plan::Explainee::ReplanMaterializedView(_) => {
2425                self.explain_replan_materialized_view(ctx, plan).await;
2426            }
2427            plan::Explainee::ReplanIndex(_) => {
2428                self.explain_replan_index(ctx, plan).await;
2429            }
2430        };
2431    }
2432
2433    pub(super) async fn sequence_explain_pushdown(
2434        &mut self,
2435        ctx: ExecuteContext,
2436        plan: plan::ExplainPushdownPlan,
2437        target_cluster: TargetCluster,
2438    ) {
2439        match plan.explainee {
2440            Explainee::Statement(ExplaineeStatement::Select {
2441                broken: false,
2442                plan,
2443                desc: _,
2444            }) => {
2445                let stage = return_if_err!(
2446                    self.peek_validate(
2447                        ctx.session(),
2448                        plan,
2449                        target_cluster,
2450                        None,
2451                        ExplainContext::Pushdown,
2452                        Some(ctx.session().vars().max_query_result_size()),
2453                    ),
2454                    ctx
2455                );
2456                self.sequence_staged(ctx, Span::current(), stage).await;
2457            }
2458            Explainee::MaterializedView(item_id) => {
2459                self.explain_pushdown_materialized_view(ctx, item_id).await;
2460            }
2461            _ => {
2462                ctx.retire(Err(AdapterError::Unsupported(
2463                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2464                )));
2465            }
2466        };
2467    }
2468
2469    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2470    async fn execute_explain_pushdown_with_read_holds(
2471        &self,
2472        ctx: ExecuteContext,
2473        as_of: Antichain<Timestamp>,
2474        mz_now: ResultSpec<'static>,
2475        read_holds: Option<ReadHolds>,
2476        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2477    ) {
2478        let fut = self
2479            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2480            .await;
2481        task::spawn(|| "render explain pushdown", async move {
2482            // Transfer the necessary read holds over to the background task
2483            let _read_holds = read_holds;
2484            let res = fut.await;
2485            ctx.retire(res);
2486        });
2487    }
2488
2489    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2490    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2491        &self,
2492        session: &Session,
2493        as_of: Antichain<Timestamp>,
2494        mz_now: ResultSpec<'static>,
2495        imports: I,
2496    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2497        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2498        super::explain_pushdown_future_inner(
2499            session,
2500            &self.catalog,
2501            &self.controller.storage_collections,
2502            as_of,
2503            mz_now,
2504            imports,
2505        )
2506        .await
2507    }
2508
2509    #[instrument]
2510    pub(super) async fn sequence_insert(
2511        &mut self,
2512        mut ctx: ExecuteContext,
2513        plan: plan::InsertPlan,
2514    ) {
2515        // Normally, this would get checked when trying to add "write ops" to
2516        // the transaction but we go down diverging paths below, based on
2517        // whether the INSERT is only constant values or not.
2518        //
2519        // For the non-constant case we sequence an implicit read-then-write,
2520        // which messes with the transaction ops and would allow an implicit
2521        // read-then-write to sneak into a read-only transaction.
2522        if !ctx.session_mut().transaction().allows_writes() {
2523            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2524            return;
2525        }
2526
2527        // The structure of this code originates from a time where
2528        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2529        // optimized `MirRelationExpr`.
2530        //
2531        // Ideally, we would like to make the `selection.as_const().is_some()`
2532        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2533        // are planned as a Wrap($n, $vals) call, so until we can reduce
2534        // HirRelationExpr this will always returns false.
2535        //
2536        // Unfortunately, hitting the default path of the match below also
2537        // causes a lot of tests to fail, so we opted to go with the extra
2538        // `plan.values.clone()` statements when producing the `optimized_mir`
2539        // and re-optimize the values in the `sequence_read_then_write` call.
2540        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2541            // We don't perform any optimizations on an expression that is already
2542            // a constant for writes, as we want to maximize bulk-insert throughput.
2543            let expr = return_if_err!(
2544                plan.values
2545                    .clone()
2546                    .lower(self.catalog().system_config(), None),
2547                ctx
2548            );
2549            OptimizedMirRelationExpr(expr)
2550        } else {
2551            // Collect optimizer parameters.
2552            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2553
2554            // (`optimize::view::Optimizer` has a special case for constant queries.)
2555            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2556
2557            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2558            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2559        };
2560
2561        match optimized_mir.into_inner() {
2562            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2563                let catalog = self.owned_catalog();
2564                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2565                    let result =
2566                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2567                    ctx.retire(result);
2568                });
2569            }
2570            // All non-constant values must be planned as read-then-writes.
2571            _ => {
2572                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2573                    Some(table) => {
2574                        // Inserts always occur at the latest version of the table.
2575                        let desc = table.relation_desc_latest().expect("table has a desc");
2576                        desc.arity()
2577                    }
2578                    None => {
2579                        ctx.retire(Err(AdapterError::Catalog(
2580                            mz_catalog::memory::error::Error {
2581                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2582                                    plan.id.to_string(),
2583                                )),
2584                            },
2585                        )));
2586                        return;
2587                    }
2588                };
2589
2590                let finishing = RowSetFinishing {
2591                    order_by: vec![],
2592                    limit: None,
2593                    offset: 0,
2594                    project: (0..desc_arity).collect(),
2595                };
2596
2597                let read_then_write_plan = plan::ReadThenWritePlan {
2598                    id: plan.id,
2599                    selection: plan.values,
2600                    finishing,
2601                    assignments: BTreeMap::new(),
2602                    kind: MutationKind::Insert,
2603                    returning: plan.returning,
2604                };
2605
2606                self.sequence_read_then_write(ctx, read_then_write_plan)
2607                    .await;
2608            }
2609        }
2610    }
2611
2612    /// ReadThenWrite is a plan whose writes depend on the results of a
2613    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2614    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2615    /// serializability violation could occur.
2616    #[instrument]
2617    pub(super) async fn sequence_read_then_write(
2618        &mut self,
2619        mut ctx: ExecuteContext,
2620        plan: plan::ReadThenWritePlan,
2621    ) {
2622        let mut source_ids: BTreeSet<_> = plan
2623            .selection
2624            .depends_on()
2625            .into_iter()
2626            .map(|gid| self.catalog().resolve_item_id(&gid))
2627            .collect();
2628        source_ids.insert(plan.id);
2629
2630        // If the transaction doesn't already have write locks, acquire them.
2631        if ctx.session().transaction().write_locks().is_none() {
2632            // Pre-define all of the locks we need.
2633            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2634
2635            // Try acquiring all of our locks.
2636            for id in &source_ids {
2637                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2638                    write_locks.insert_lock(*id, lock);
2639                }
2640            }
2641
2642            // See if we acquired all of the neccessary locks.
2643            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2644                Ok(locks) => locks,
2645                Err(missing) => {
2646                    // Defer our write if we couldn't acquire all of the locks.
2647                    let role_metadata = ctx.session().role_metadata().clone();
2648                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2649                    let plan = DeferredPlan {
2650                        ctx,
2651                        plan: Plan::ReadThenWrite(plan),
2652                        validity: PlanValidity::new(
2653                            self.catalog.transient_revision(),
2654                            source_ids.clone(),
2655                            None,
2656                            None,
2657                            role_metadata,
2658                        ),
2659                        requires_locks: source_ids,
2660                    };
2661                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2662                }
2663            };
2664
2665            ctx.session_mut()
2666                .try_grant_write_locks(write_locks)
2667                .expect("session has already been granted write locks");
2668        }
2669
2670        let plan::ReadThenWritePlan {
2671            id,
2672            kind,
2673            selection,
2674            mut assignments,
2675            finishing,
2676            mut returning,
2677        } = plan;
2678
2679        // Read then writes can be queued, so re-verify the id exists.
2680        let desc = match self.catalog().try_get_entry(&id) {
2681            Some(table) => {
2682                // Inserts always occur at the latest version of the table.
2683                table
2684                    .relation_desc_latest()
2685                    .expect("table has a desc")
2686                    .into_owned()
2687            }
2688            None => {
2689                ctx.retire(Err(AdapterError::Catalog(
2690                    mz_catalog::memory::error::Error {
2691                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2692                    },
2693                )));
2694                return;
2695            }
2696        };
2697
2698        // Disallow mz_now in any position because read time and write time differ.
2699        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2700            || assignments.values().any(|e| e.contains_temporal())
2701            || returning.iter().any(|e| e.contains_temporal());
2702        if contains_temporal {
2703            ctx.retire(Err(AdapterError::Unsupported(
2704                "calls to mz_now in write statements",
2705            )));
2706            return;
2707        }
2708
2709        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations.
2710        for gid in selection.depends_on() {
2711            let item_id = self.catalog().resolve_item_id(&gid);
2712            if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2713                ctx.retire(Err(err));
2714                return;
2715            }
2716        }
2717
2718        let (peek_tx, peek_rx) = oneshot::channel();
2719        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2720        let (tx, _, session, extra) = ctx.into_parts();
2721        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2722        // execution context, because this peek does not directly correspond to an execute,
2723        // and so we don't need to take any action on its retirement.
2724        // TODO[btv]: we might consider extending statement logging to log the inner
2725        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2726        // and mint a new "real" execution context here. We'd also have to add some logic to
2727        // make sure such "sub-statements" are always sampled when the top-level statement is
2728        //
2729        // It's debatable whether this makes sense conceptually,
2730        // because the inner fragment here is not actually a
2731        // "statement" in its own right.
2732        let peek_ctx = ExecuteContext::from_parts(
2733            peek_client_tx,
2734            self.internal_cmd_tx.clone(),
2735            session,
2736            Default::default(),
2737        );
2738
2739        self.sequence_peek(
2740            peek_ctx,
2741            plan::SelectPlan {
2742                select: None,
2743                source: selection,
2744                when: QueryWhen::FreshestTableWrite,
2745                finishing,
2746                copy_to: None,
2747            },
2748            TargetCluster::Active,
2749            None,
2750        )
2751        .await;
2752
2753        let internal_cmd_tx = self.internal_cmd_tx.clone();
2754        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2755        let catalog = self.owned_catalog();
2756        let max_result_size = self.catalog().system_config().max_result_size();
2757
2758        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2759            let (peek_response, session) = match peek_rx.await {
2760                Ok(Response {
2761                    result: Ok(resp),
2762                    session,
2763                    otel_ctx,
2764                }) => {
2765                    otel_ctx.attach_as_parent();
2766                    (resp, session)
2767                }
2768                Ok(Response {
2769                    result: Err(e),
2770                    session,
2771                    otel_ctx,
2772                }) => {
2773                    let ctx =
2774                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2775                    otel_ctx.attach_as_parent();
2776                    ctx.retire(Err(e));
2777                    return;
2778                }
2779                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2780                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2781            };
2782            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2783            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2784
2785            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2786            if timeout_dur == Duration::ZERO {
2787                timeout_dur = Duration::MAX;
2788            }
2789
2790            let style = ExprPrepOneShot {
2791                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2792                session: ctx.session(),
2793                catalog_state: catalog.state(),
2794            };
2795            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2796                return_if_err!(style.prep_scalar_expr(expr), ctx);
2797            }
2798
2799            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2800                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2801                    let arena = RowArena::new();
2802                    let mut diffs = Vec::new();
2803                    let mut datum_vec = mz_repr::DatumVec::new();
2804
2805                    while let Some(row) = rows.next() {
2806                        if !assignments.is_empty() {
2807                            assert!(
2808                                matches!(kind, MutationKind::Update),
2809                                "only updates support assignments"
2810                            );
2811                            let mut datums = datum_vec.borrow_with(row);
2812                            let mut updates = vec![];
2813                            for (idx, expr) in &assignments {
2814                                let updated = match expr.eval(&datums, &arena) {
2815                                    Ok(updated) => updated,
2816                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2817                                };
2818                                updates.push((*idx, updated));
2819                            }
2820                            for (idx, new_value) in updates {
2821                                datums[idx] = new_value;
2822                            }
2823                            let updated = Row::pack_slice(&datums);
2824                            diffs.push((updated, Diff::ONE));
2825                        }
2826                        match kind {
2827                            // Updates and deletes always remove the
2828                            // current row. Updates will also add an
2829                            // updated value.
2830                            MutationKind::Update | MutationKind::Delete => {
2831                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2832                            }
2833                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2834                        }
2835                    }
2836
2837                    // Sum of all the rows' byte size, for checking if we go
2838                    // above the max_result_size threshold.
2839                    let mut byte_size: u64 = 0;
2840                    for (row, diff) in &diffs {
2841                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2842                        if diff.is_positive() {
2843                            for (idx, datum) in row.iter().enumerate() {
2844                                desc.constraints_met(idx, &datum)?;
2845                            }
2846                        }
2847                    }
2848                    Ok((diffs, byte_size))
2849                };
2850
2851            let diffs = match peek_response {
2852                ExecuteResponse::SendingRowsStreaming {
2853                    rows: mut rows_stream,
2854                    ..
2855                } => {
2856                    let mut byte_size: u64 = 0;
2857                    let mut diffs = Vec::new();
2858                    let result = loop {
2859                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2860                            Ok(Some(res)) => match res {
2861                                PeekResponseUnary::Rows(new_rows) => {
2862                                    match make_diffs(new_rows) {
2863                                        Ok((mut new_diffs, new_byte_size)) => {
2864                                            byte_size = byte_size.saturating_add(new_byte_size);
2865                                            if byte_size > max_result_size {
2866                                                break Err(AdapterError::ResultSize(format!(
2867                                                    "result exceeds max size of {max_result_size}"
2868                                                )));
2869                                            }
2870                                            diffs.append(&mut new_diffs)
2871                                        }
2872                                        Err(e) => break Err(e),
2873                                    };
2874                                }
2875                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2876                                PeekResponseUnary::Error(e) => {
2877                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2878                                }
2879                                PeekResponseUnary::DependencyDropped(dep) => {
2880                                    break Err(dep.to_concurrent_dependency_drop());
2881                                }
2882                            },
2883                            Ok(None) => break Ok(diffs),
2884                            Err(_) => {
2885                                // We timed out, so remove the pending peek. This is
2886                                // best-effort and doesn't guarantee we won't
2887                                // receive a response.
2888                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2889                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2890                                    conn_id: ctx.session().conn_id().clone(),
2891                                });
2892                                if let Err(e) = result {
2893                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2894                                }
2895                                break Err(AdapterError::StatementTimeout);
2896                            }
2897                        }
2898                    };
2899
2900                    result
2901                }
2902                ExecuteResponse::SendingRowsImmediate { rows } => {
2903                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2904                }
2905                resp => Err(AdapterError::Unstructured(anyhow!(
2906                    "unexpected peek response: {resp:?}"
2907                ))),
2908            };
2909
2910            let mut returning_rows = Vec::new();
2911            let mut diff_err: Option<AdapterError> = None;
2912            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2913                let arena = RowArena::new();
2914                for (row, diff) in diffs {
2915                    if !diff.is_positive() {
2916                        continue;
2917                    }
2918                    let mut returning_row = Row::with_capacity(returning.len());
2919                    let mut packer = returning_row.packer();
2920                    for expr in &returning {
2921                        let datums: Vec<_> = row.iter().collect();
2922                        match expr.eval(&datums, &arena) {
2923                            Ok(datum) => {
2924                                packer.push(datum);
2925                            }
2926                            Err(err) => {
2927                                diff_err = Some(err.into());
2928                                break;
2929                            }
2930                        }
2931                    }
2932                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2933                    let diff = match NonZeroUsize::try_from(diff) {
2934                        Ok(diff) => diff,
2935                        Err(err) => {
2936                            diff_err = Some(err.into());
2937                            break;
2938                        }
2939                    };
2940                    returning_rows.push((returning_row, diff));
2941                    if diff_err.is_some() {
2942                        break;
2943                    }
2944                }
2945            }
2946            let diffs = if let Some(err) = diff_err {
2947                Err(err)
2948            } else {
2949                diffs
2950            };
2951
2952            // We need to clear out the timestamp context so the write doesn't fail due to a
2953            // read only transaction.
2954            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2955            // No matter what isolation level the client is using, we must linearize this
2956            // read. The write will be performed right after this, as part of a single
2957            // transaction, so the write must have a timestamp greater than or equal to the
2958            // read.
2959            //
2960            // Note: It's only OK for the write to have a greater timestamp than the read
2961            // because the write lock prevents any other writes from happening in between
2962            // the read and write.
2963            if let Some(timestamp_context) = timestamp_context {
2964                let (tx, rx) = tokio::sync::oneshot::channel();
2965                let conn_id = ctx.session().conn_id().clone();
2966                let pending_read_txn = PendingReadTxn {
2967                    txn: PendingRead::ReadThenWrite { ctx, tx },
2968                    timestamp_context,
2969                    created: Instant::now(),
2970                    num_requeues: 0,
2971                    otel_ctx: OpenTelemetryContext::obtain(),
2972                };
2973                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2974                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
2975                if let Err(e) = result {
2976                    warn!(
2977                        "strict_serializable_reads_tx dropped before we could send: {:?}",
2978                        e
2979                    );
2980                    return;
2981                }
2982                let result = rx.await;
2983                // It is not an error for these results to be ready after `tx` has been dropped.
2984                ctx = match result {
2985                    Ok(Some(ctx)) => ctx,
2986                    Ok(None) => {
2987                        // Coordinator took our context and will handle responding to the client.
2988                        // This usually indicates that our transaction was aborted.
2989                        return;
2990                    }
2991                    Err(e) => {
2992                        warn!(
2993                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2994                            e
2995                        );
2996                        return;
2997                    }
2998                };
2999            }
3000
3001            match diffs {
3002                Ok(diffs) => {
3003                    let result = Self::send_diffs(
3004                        ctx.session_mut(),
3005                        plan::SendDiffsPlan {
3006                            id,
3007                            updates: diffs,
3008                            kind,
3009                            returning: returning_rows,
3010                            max_result_size,
3011                        },
3012                    );
3013                    ctx.retire(result);
3014                }
3015                Err(e) => {
3016                    ctx.retire(Err(e));
3017                }
3018            }
3019        });
3020    }
3021
3022    #[instrument]
3023    pub(super) async fn sequence_alter_item_rename(
3024        &mut self,
3025        ctx: &mut ExecuteContext,
3026        plan: plan::AlterItemRenamePlan,
3027    ) -> Result<ExecuteResponse, AdapterError> {
3028        let op = catalog::Op::RenameItem {
3029            id: plan.id,
3030            current_full_name: plan.current_full_name,
3031            to_name: plan.to_name,
3032        };
3033        match self
3034            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3035            .await
3036        {
3037            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3038            Err(err) => Err(err),
3039        }
3040    }
3041
3042    #[instrument]
3043    pub(super) async fn sequence_alter_retain_history(
3044        &mut self,
3045        ctx: &mut ExecuteContext,
3046        plan: plan::AlterRetainHistoryPlan,
3047    ) -> Result<ExecuteResponse, AdapterError> {
3048        let ops = vec![catalog::Op::AlterRetainHistory {
3049            id: plan.id,
3050            value: plan.value,
3051            window: plan.window,
3052        }];
3053        self.catalog_transact_with_context(None, Some(ctx), ops)
3054            .await?;
3055        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3056    }
3057
3058    #[instrument]
3059    pub(super) async fn sequence_alter_source_timestamp_interval(
3060        &mut self,
3061        ctx: &mut ExecuteContext,
3062        plan: plan::AlterSourceTimestampIntervalPlan,
3063    ) -> Result<ExecuteResponse, AdapterError> {
3064        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3065            id: plan.id,
3066            value: plan.value,
3067            interval: plan.interval,
3068        }];
3069        self.catalog_transact_with_context(None, Some(ctx), ops)
3070            .await?;
3071        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3072    }
3073
3074    #[instrument]
3075    pub(super) async fn sequence_alter_schema_rename(
3076        &mut self,
3077        ctx: &mut ExecuteContext,
3078        plan: plan::AlterSchemaRenamePlan,
3079    ) -> Result<ExecuteResponse, AdapterError> {
3080        let (database_spec, schema_spec) = plan.cur_schema_spec;
3081        let op = catalog::Op::RenameSchema {
3082            database_spec,
3083            schema_spec,
3084            new_name: plan.new_schema_name,
3085            check_reserved_names: true,
3086        };
3087        match self
3088            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3089            .await
3090        {
3091            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3092            Err(err) => Err(err),
3093        }
3094    }
3095
3096    #[instrument]
3097    pub(super) async fn sequence_alter_schema_swap(
3098        &mut self,
3099        ctx: &mut ExecuteContext,
3100        plan: plan::AlterSchemaSwapPlan,
3101    ) -> Result<ExecuteResponse, AdapterError> {
3102        let plan::AlterSchemaSwapPlan {
3103            schema_a_spec: (schema_a_db, schema_a),
3104            schema_a_name,
3105            schema_b_spec: (schema_b_db, schema_b),
3106            schema_b_name,
3107            name_temp,
3108        } = plan;
3109
3110        let op_a = catalog::Op::RenameSchema {
3111            database_spec: schema_a_db,
3112            schema_spec: schema_a,
3113            new_name: name_temp,
3114            check_reserved_names: false,
3115        };
3116        let op_b = catalog::Op::RenameSchema {
3117            database_spec: schema_b_db,
3118            schema_spec: schema_b,
3119            new_name: schema_a_name,
3120            check_reserved_names: false,
3121        };
3122        let op_c = catalog::Op::RenameSchema {
3123            database_spec: schema_a_db,
3124            schema_spec: schema_a,
3125            new_name: schema_b_name,
3126            check_reserved_names: false,
3127        };
3128
3129        match self
3130            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3131                Box::pin(async {})
3132            })
3133            .await
3134        {
3135            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3136            Err(err) => Err(err),
3137        }
3138    }
3139
3140    #[instrument]
3141    pub(super) async fn sequence_alter_role(
3142        &mut self,
3143        session: &Session,
3144        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3145    ) -> Result<ExecuteResponse, AdapterError> {
3146        let catalog = self.catalog().for_session(session);
3147        let role = catalog.get_role(&id);
3148
3149        // We'll send these notices to the user, if the operation is successful.
3150        let mut notices = vec![];
3151
3152        // Get the attributes and variables from the role, as they currently are.
3153        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3154        let mut vars = role.vars().clone();
3155
3156        // Whether to set the password to NULL. This is a special case since the existing
3157        // password is not stored in the role attributes.
3158        let mut nopassword = false;
3159
3160        // Apply our updates.
3161        match option {
3162            PlannedAlterRoleOption::Attributes(attrs) => {
3163                self.validate_role_attributes(&attrs.clone().into())?;
3164
3165                if let Some(inherit) = attrs.inherit {
3166                    attributes.inherit = inherit;
3167                }
3168
3169                if let Some(password) = attrs.password {
3170                    attributes.password = Some(password);
3171                    attributes.scram_iterations =
3172                        Some(self.catalog().system_config().scram_iterations())
3173                }
3174
3175                if let Some(superuser) = attrs.superuser {
3176                    attributes.superuser = Some(superuser);
3177                }
3178
3179                if let Some(login) = attrs.login {
3180                    attributes.login = Some(login);
3181                }
3182
3183                if attrs.nopassword.unwrap_or(false) {
3184                    nopassword = true;
3185                }
3186
3187                if let Some(notice) = self.should_emit_rbac_notice(session) {
3188                    notices.push(notice);
3189                }
3190            }
3191            PlannedAlterRoleOption::Variable(variable) => {
3192                // Get the variable to make sure it's valid and visible.
3193                let session_var = session.vars().inspect(variable.name())?;
3194                // Return early if it's not visible.
3195                session_var.visible(session.user(), catalog.system_vars())?;
3196
3197                // Emit a warning when deprecated variables are used.
3198                // TODO(database-issues#8069) remove this after sufficient time has passed
3199                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3200                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3201                } else if let PlannedRoleVariable::Set {
3202                    name,
3203                    value: VariableValue::Values(vals),
3204                } = &variable
3205                {
3206                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3207                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3208                    }
3209                }
3210
3211                let var_name = match variable {
3212                    PlannedRoleVariable::Set { name, value } => {
3213                        // Update our persisted set.
3214                        match &value {
3215                            VariableValue::Default => {
3216                                vars.remove(&name);
3217                            }
3218                            VariableValue::Values(vals) => {
3219                                let var = match &vals[..] {
3220                                    [val] => OwnedVarInput::Flat(val.clone()),
3221                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3222                                };
3223                                // Make sure the input is valid.
3224                                session_var.check(var.borrow())?;
3225
3226                                vars.insert(name.clone(), var);
3227                            }
3228                        };
3229                        name
3230                    }
3231                    PlannedRoleVariable::Reset { name } => {
3232                        // Remove it from our persisted values.
3233                        vars.remove(&name);
3234                        name
3235                    }
3236                };
3237
3238                // Emit a notice that they need to reconnect to see the change take effect.
3239                notices.push(AdapterNotice::VarDefaultUpdated {
3240                    role: Some(name.clone()),
3241                    var_name: Some(var_name),
3242                });
3243            }
3244        }
3245
3246        let op = catalog::Op::AlterRole {
3247            id,
3248            name,
3249            attributes,
3250            nopassword,
3251            vars: RoleVars { map: vars },
3252        };
3253        let response = self
3254            .catalog_transact(Some(session), vec![op])
3255            .await
3256            .map(|_| ExecuteResponse::AlteredRole)?;
3257
3258        // Send all of our queued notices.
3259        session.add_notices(notices);
3260
3261        Ok(response)
3262    }
3263
3264    #[instrument]
3265    pub(super) async fn sequence_alter_sink_prepare(
3266        &mut self,
3267        ctx: ExecuteContext,
3268        plan: plan::AlterSinkPlan,
3269    ) {
3270        // Put a read hold on the new relation
3271        let id_bundle = crate::CollectionIdBundle {
3272            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3273            compute_ids: BTreeMap::new(),
3274        };
3275        let read_hold = self.acquire_read_holds(&id_bundle);
3276
3277        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3278            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3279            return;
3280        };
3281
3282        let otel_ctx = OpenTelemetryContext::obtain();
3283        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3284
3285        let plan_validity = PlanValidity::new(
3286            self.catalog().transient_revision(),
3287            BTreeSet::from_iter([plan.item_id, from_item_id]),
3288            Some(plan.in_cluster),
3289            None,
3290            ctx.session().role_metadata().clone(),
3291        );
3292
3293        info!(
3294            "preparing alter sink for {}: frontiers={:?} export={:?}",
3295            plan.global_id,
3296            self.controller
3297                .storage_collections
3298                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3299            self.controller.storage.export(plan.global_id)
3300        );
3301
3302        // Now we must wait for the sink to make enough progress such that there is overlap between
3303        // the new `from` collection's read hold and the sink's write frontier.
3304        //
3305        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3306        // the watch set never completes and neither does the `ALTER SINK` command.
3307        self.install_storage_watch_set(
3308            ctx.session().conn_id().clone(),
3309            BTreeSet::from_iter([plan.global_id]),
3310            read_ts,
3311            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3312                ctx: Some(ctx),
3313                otel_ctx,
3314                plan,
3315                plan_validity,
3316                read_hold,
3317            }),
3318        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3319    }
3320
3321    #[instrument]
3322    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3323        ctx.otel_ctx.attach_as_parent();
3324
3325        let plan::AlterSinkPlan {
3326            item_id,
3327            global_id,
3328            sink: sink_plan,
3329            with_snapshot,
3330            in_cluster,
3331        } = ctx.plan.clone();
3332
3333        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3334        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3335        // arbitrarily changed since we performed planning, and we must re-assert that it still
3336        // matches our requirements.
3337        //
3338        // The `PlanValidity` check ensures that both the sink and the new source relation still
3339        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3340        // time, which we do by comparing the catalog sink version to the one in the plan.
3341        match ctx.plan_validity.check(self.catalog()) {
3342            Ok(()) => {}
3343            Err(err) => {
3344                ctx.retire(Err(err));
3345                return;
3346            }
3347        }
3348
3349        let entry = self.catalog().get_entry(&item_id);
3350        let CatalogItem::Sink(old_sink) = entry.item() else {
3351            panic!("invalid item kind for `AlterSinkPlan`");
3352        };
3353
3354        if sink_plan.version != old_sink.version + 1 {
3355            ctx.retire(Err(AdapterError::ChangedPlan(
3356                "sink was altered concurrently".into(),
3357            )));
3358            return;
3359        }
3360
3361        info!(
3362            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3363            self.controller
3364                .storage_collections
3365                .collections_frontiers(vec![global_id, sink_plan.from]),
3366            self.controller.storage.export(global_id),
3367        );
3368
3369        // Assert that we can recover the updates that happened at the timestamps of the write
3370        // frontier. This must be true in this call.
3371        let write_frontier = &self
3372            .controller
3373            .storage
3374            .export(global_id)
3375            .expect("sink known to exist")
3376            .write_frontier;
3377        let as_of = ctx.read_hold.least_valid_read();
3378        assert!(
3379            write_frontier.iter().all(|t| as_of.less_than(t)),
3380            "{:?} should be strictly less than {:?}",
3381            &*as_of,
3382            &**write_frontier
3383        );
3384
3385        // Parse the `create_sql` so we can update it to the new sink definition.
3386        //
3387        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3388        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3389        // names in the `create_sql` may have changed, for example due to a schema swap.
3390        let create_sql = &old_sink.create_sql;
3391        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3392        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3393            unreachable!("invalid statement kind for sink");
3394        };
3395
3396        // Update the sink version.
3397        stmt.with_options
3398            .retain(|o| o.name != CreateSinkOptionName::Version);
3399        stmt.with_options.push(CreateSinkOption {
3400            name: CreateSinkOptionName::Version,
3401            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3402                sink_plan.version.to_string(),
3403            ))),
3404        });
3405
3406        let conn_catalog = self.catalog().for_system_session();
3407        let (mut stmt, resolved_ids) =
3408            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3409
3410        // Update the `from` relation.
3411        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3412        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3413        stmt.from = ResolvedItemName::Item {
3414            id: from_entry.id(),
3415            qualifiers: from_entry.name.qualifiers.clone(),
3416            full_name,
3417            print_id: true,
3418            version: from_entry.version,
3419        };
3420
3421        let new_sink = Sink {
3422            create_sql: stmt.to_ast_string_stable(),
3423            global_id,
3424            from: sink_plan.from,
3425            connection: sink_plan.connection.clone(),
3426            envelope: sink_plan.envelope,
3427            version: sink_plan.version,
3428            with_snapshot,
3429            resolved_ids: resolved_ids.clone(),
3430            cluster_id: in_cluster,
3431            commit_interval: sink_plan.commit_interval,
3432        };
3433
3434        let ops = vec![catalog::Op::UpdateItem {
3435            id: item_id,
3436            name: entry.name().clone(),
3437            to_item: CatalogItem::Sink(new_sink),
3438        }];
3439
3440        match self
3441            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3442            .await
3443        {
3444            Ok(()) => {}
3445            Err(err) => {
3446                ctx.retire(Err(err));
3447                return;
3448            }
3449        }
3450
3451        let storage_sink_desc = StorageSinkDesc {
3452            from: sink_plan.from,
3453            from_desc: from_entry
3454                .relation_desc()
3455                .expect("sinks can only be built on items with descs")
3456                .into_owned(),
3457            connection: sink_plan
3458                .connection
3459                .clone()
3460                .into_inline_connection(self.catalog().state()),
3461            envelope: sink_plan.envelope,
3462            as_of,
3463            with_snapshot,
3464            version: sink_plan.version,
3465            from_storage_metadata: (),
3466            to_storage_metadata: (),
3467            commit_interval: sink_plan.commit_interval,
3468        };
3469
3470        self.controller
3471            .storage
3472            .alter_export(
3473                global_id,
3474                ExportDescription {
3475                    sink: storage_sink_desc,
3476                    instance_id: in_cluster,
3477                },
3478            )
3479            .await
3480            .unwrap_or_terminate("cannot fail to alter source desc");
3481
3482        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3483    }
3484
3485    #[instrument]
3486    pub(super) async fn sequence_alter_connection(
3487        &mut self,
3488        ctx: ExecuteContext,
3489        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3490    ) {
3491        match action {
3492            AlterConnectionAction::RotateKeys => {
3493                self.sequence_rotate_keys(ctx, id).await;
3494            }
3495            AlterConnectionAction::AlterOptions {
3496                set_options,
3497                drop_options,
3498                validate,
3499            } => {
3500                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3501                    .await
3502            }
3503        }
3504    }
3505
3506    #[instrument]
3507    async fn sequence_alter_connection_options(
3508        &mut self,
3509        mut ctx: ExecuteContext,
3510        id: CatalogItemId,
3511        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3512        drop_options: BTreeSet<ConnectionOptionName>,
3513        validate: bool,
3514    ) {
3515        let cur_entry = self.catalog().get_entry(&id);
3516        let cur_conn = cur_entry.connection().expect("known to be connection");
3517        let connection_gid = cur_conn.global_id();
3518
3519        let inner = || -> Result<Connection, AdapterError> {
3520            // Parse statement.
3521            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3522                .expect("invalid create sql persisted to catalog")
3523                .into_element()
3524                .ast
3525            {
3526                Statement::CreateConnection(stmt) => stmt,
3527                _ => unreachable!("proved type is source"),
3528            };
3529
3530            let catalog = self.catalog().for_system_session();
3531
3532            // Resolve items in statement
3533            let (mut create_conn_stmt, resolved_ids) =
3534                mz_sql::names::resolve(&catalog, create_conn_stmt)
3535                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3536
3537            // Retain options that are neither set nor dropped.
3538            create_conn_stmt
3539                .values
3540                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3541
3542            // Set new values
3543            create_conn_stmt.values.extend(
3544                set_options
3545                    .into_iter()
3546                    .map(|(name, value)| ConnectionOption { name, value }),
3547            );
3548
3549            // Open a new catalog, which we will use to re-plan our
3550            // statement with the desired config.
3551            let mut catalog = self.catalog().for_system_session();
3552            catalog.mark_id_unresolvable_for_replanning(id);
3553
3554            // Re-define our source in terms of the amended statement
3555            let plan = match mz_sql::plan::plan(
3556                None,
3557                &catalog,
3558                Statement::CreateConnection(create_conn_stmt),
3559                &Params::empty(),
3560                &resolved_ids,
3561            )
3562            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3563            {
3564                (Plan::CreateConnection(plan), _sql_impl_ids) => plan,
3565                (p, _) => {
3566                    unreachable!("create connection plan is only valid response, got {:?}", p)
3567                }
3568            };
3569
3570            // Parse statement.
3571            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3572                .expect("invalid create sql persisted to catalog")
3573                .into_element()
3574                .ast
3575            {
3576                Statement::CreateConnection(stmt) => stmt,
3577                _ => unreachable!("proved type is source"),
3578            };
3579
3580            let catalog = self.catalog().for_system_session();
3581
3582            // Resolve items in statement
3583            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3584                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3585
3586            Ok(Connection {
3587                create_sql: plan.connection.create_sql,
3588                global_id: cur_conn.global_id,
3589                details: plan.connection.details,
3590                resolved_ids: new_deps,
3591            })
3592        };
3593
3594        let conn = match inner() {
3595            Ok(conn) => conn,
3596            Err(e) => {
3597                return ctx.retire(Err(e));
3598            }
3599        };
3600
3601        if validate {
3602            let connection = conn
3603                .details
3604                .to_connection()
3605                .into_inline_connection(self.catalog().state());
3606
3607            let internal_cmd_tx = self.internal_cmd_tx.clone();
3608            let transient_revision = self.catalog().transient_revision();
3609            let conn_id = ctx.session().conn_id().clone();
3610            let otel_ctx = OpenTelemetryContext::obtain();
3611            let role_metadata = ctx.session().role_metadata().clone();
3612            let current_storage_parameters = self.controller.storage.config().clone();
3613
3614            task::spawn(
3615                || format!("validate_alter_connection:{conn_id}"),
3616                async move {
3617                    let resolved_ids = conn.resolved_ids.clone();
3618                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3619                    let result = match std::panic::AssertUnwindSafe(
3620                        connection.validate(id, &current_storage_parameters),
3621                    )
3622                    .ore_catch_unwind()
3623                    .await
3624                    {
3625                        Ok(Ok(())) => Ok(conn),
3626                        Ok(Err(err)) => Err(err.into()),
3627                        Err(_panic) => {
3628                            tracing::error!("alter connection validation panicked");
3629                            Err(AdapterError::Internal(
3630                                "connection validation panicked".into(),
3631                            ))
3632                        }
3633                    };
3634
3635                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3636                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3637                        AlterConnectionValidationReady {
3638                            ctx,
3639                            result,
3640                            connection_id: id,
3641                            connection_gid,
3642                            plan_validity: PlanValidity::new(
3643                                transient_revision,
3644                                dependency_ids.clone(),
3645                                None,
3646                                None,
3647                                role_metadata,
3648                            ),
3649                            otel_ctx,
3650                            resolved_ids,
3651                        },
3652                    ));
3653                    if let Err(e) = result {
3654                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3655                    }
3656                },
3657            );
3658        } else {
3659            let result = self
3660                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3661                .await;
3662            ctx.retire(result);
3663        }
3664    }
3665
3666    #[instrument]
3667    pub(crate) async fn sequence_alter_connection_stage_finish(
3668        &mut self,
3669        session: &Session,
3670        id: CatalogItemId,
3671        connection: Connection,
3672    ) -> Result<ExecuteResponse, AdapterError> {
3673        match self.catalog.get_entry(&id).item() {
3674            CatalogItem::Connection(curr_conn) => {
3675                curr_conn
3676                    .details
3677                    .to_connection()
3678                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3679                    .map_err(StorageError::from)?;
3680            }
3681            _ => unreachable!("known to be a connection"),
3682        };
3683
3684        let ops = vec![catalog::Op::UpdateItem {
3685            id,
3686            name: self.catalog.get_entry(&id).name().clone(),
3687            to_item: CatalogItem::Connection(connection.clone()),
3688        }];
3689
3690        self.catalog_transact(Some(session), ops).await?;
3691
3692        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3693        // and propagating connection changes to dependent sources, sinks, and
3694        // tables) is handled in `apply_catalog_implications` via
3695        // `handle_alter_connection`. The catalog transact above triggers that
3696        // code path.
3697
3698        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3699    }
3700
3701    #[instrument]
3702    pub(super) async fn sequence_alter_source(
3703        &mut self,
3704        session: &Session,
3705        plan::AlterSourcePlan {
3706            item_id,
3707            ingestion_id,
3708            action,
3709        }: plan::AlterSourcePlan,
3710    ) -> Result<ExecuteResponse, AdapterError> {
3711        let cur_entry = self.catalog().get_entry(&item_id);
3712        let cur_source = cur_entry.source().expect("known to be source");
3713
3714        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3715            // Parse statement.
3716            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3717                .expect("invalid create sql persisted to catalog")
3718                .into_element()
3719                .ast
3720            {
3721                Statement::CreateSource(stmt) => stmt,
3722                _ => unreachable!("proved type is source"),
3723            };
3724
3725            let catalog = coord.catalog().for_system_session();
3726
3727            // Resolve items in statement
3728            mz_sql::names::resolve(&catalog, create_source_stmt)
3729                .map_err(|e| AdapterError::internal(err_cx, e))
3730        };
3731
3732        match action {
3733            plan::AlterSourceAction::AddSubsourceExports {
3734                subsources,
3735                options,
3736            } => {
3737                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3738
3739                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3740                    text_columns: mut new_text_columns,
3741                    exclude_columns: mut new_exclude_columns,
3742                    ..
3743                } = options.try_into()?;
3744
3745                // Resolve items in statement
3746                let (mut create_source_stmt, resolved_ids) =
3747                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3748
3749                // Get all currently referred-to items
3750                let catalog = self.catalog();
3751                let curr_references: BTreeSet<_> = catalog
3752                    .get_entry(&item_id)
3753                    .used_by()
3754                    .into_iter()
3755                    .filter_map(|subsource| {
3756                        catalog
3757                            .get_entry(subsource)
3758                            .subsource_details()
3759                            .map(|(_id, reference, _details)| reference)
3760                    })
3761                    .collect();
3762
3763                // We are doing a lot of unwrapping, so just make an error to reference; all of
3764                // these invariants are guaranteed to be true because of how we plan subsources.
3765                let purification_err =
3766                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3767
3768                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3769                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3770                // statement.
3771                match &mut create_source_stmt.connection {
3772                    CreateSourceConnection::Postgres {
3773                        options: curr_options,
3774                        ..
3775                    } => {
3776                        let mz_sql::plan::PgConfigOptionExtracted {
3777                            mut text_columns, ..
3778                        } = curr_options.clone().try_into()?;
3779
3780                        // Drop text columns; we will add them back in
3781                        // as appropriate below.
3782                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3783
3784                        // Drop all text columns that are not currently referred to.
3785                        text_columns.retain(|column_qualified_reference| {
3786                            mz_ore::soft_assert_eq_or_log!(
3787                                column_qualified_reference.0.len(),
3788                                4,
3789                                "all TEXT COLUMNS values must be column-qualified references"
3790                            );
3791                            let mut table = column_qualified_reference.clone();
3792                            table.0.truncate(3);
3793                            curr_references.contains(&table)
3794                        });
3795
3796                        // Merge the current text columns into the new text columns.
3797                        new_text_columns.extend(text_columns);
3798
3799                        // If we have text columns, add them to the options.
3800                        if !new_text_columns.is_empty() {
3801                            new_text_columns.sort();
3802                            let new_text_columns = new_text_columns
3803                                .into_iter()
3804                                .map(WithOptionValue::UnresolvedItemName)
3805                                .collect();
3806
3807                            curr_options.push(PgConfigOption {
3808                                name: PgConfigOptionName::TextColumns,
3809                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3810                            });
3811                        }
3812                    }
3813                    CreateSourceConnection::MySql {
3814                        options: curr_options,
3815                        ..
3816                    } => {
3817                        let mz_sql::plan::MySqlConfigOptionExtracted {
3818                            mut text_columns,
3819                            mut exclude_columns,
3820                            ..
3821                        } = curr_options.clone().try_into()?;
3822
3823                        // Drop both ignore and text columns; we will add them back in
3824                        // as appropriate below.
3825                        curr_options.retain(|o| {
3826                            !matches!(
3827                                o.name,
3828                                MySqlConfigOptionName::TextColumns
3829                                    | MySqlConfigOptionName::ExcludeColumns
3830                            )
3831                        });
3832
3833                        // Drop all text / exclude columns that are not currently referred to.
3834                        let column_referenced =
3835                            |column_qualified_reference: &UnresolvedItemName| {
3836                                mz_ore::soft_assert_eq_or_log!(
3837                                    column_qualified_reference.0.len(),
3838                                    3,
3839                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3840                                );
3841                                let mut table = column_qualified_reference.clone();
3842                                table.0.truncate(2);
3843                                curr_references.contains(&table)
3844                            };
3845                        text_columns.retain(column_referenced);
3846                        exclude_columns.retain(column_referenced);
3847
3848                        // Merge the current text / exclude columns into the new text / exclude columns.
3849                        new_text_columns.extend(text_columns);
3850                        new_exclude_columns.extend(exclude_columns);
3851
3852                        // If we have text columns, add them to the options.
3853                        if !new_text_columns.is_empty() {
3854                            new_text_columns.sort();
3855                            let new_text_columns = new_text_columns
3856                                .into_iter()
3857                                .map(WithOptionValue::UnresolvedItemName)
3858                                .collect();
3859
3860                            curr_options.push(MySqlConfigOption {
3861                                name: MySqlConfigOptionName::TextColumns,
3862                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3863                            });
3864                        }
3865                        // If we have exclude columns, add them to the options.
3866                        if !new_exclude_columns.is_empty() {
3867                            new_exclude_columns.sort();
3868                            let new_exclude_columns = new_exclude_columns
3869                                .into_iter()
3870                                .map(WithOptionValue::UnresolvedItemName)
3871                                .collect();
3872
3873                            curr_options.push(MySqlConfigOption {
3874                                name: MySqlConfigOptionName::ExcludeColumns,
3875                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3876                            });
3877                        }
3878                    }
3879                    CreateSourceConnection::SqlServer {
3880                        options: curr_options,
3881                        ..
3882                    } => {
3883                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3884                            mut text_columns,
3885                            mut exclude_columns,
3886                            ..
3887                        } = curr_options.clone().try_into()?;
3888
3889                        // Drop both ignore and text columns; we will add them back in
3890                        // as appropriate below.
3891                        curr_options.retain(|o| {
3892                            !matches!(
3893                                o.name,
3894                                SqlServerConfigOptionName::TextColumns
3895                                    | SqlServerConfigOptionName::ExcludeColumns
3896                            )
3897                        });
3898
3899                        // Drop all text / exclude columns that are not currently referred to.
3900                        // SQL Server text/exclude column refs are 3-part (schema.table.col),
3901                        // which truncate to 2-part (schema.table). But external references
3902                        // are 3-part (database.schema.table). Use suffix matching since
3903                        // a SQL Server source connects to a single database.
3904                        let column_referenced =
3905                            |column_qualified_reference: &UnresolvedItemName| {
3906                                mz_ore::soft_assert_eq_or_log!(
3907                                    column_qualified_reference.0.len(),
3908                                    3,
3909                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3910                                );
3911                                let mut table = column_qualified_reference.clone();
3912                                table.0.truncate(2);
3913                                curr_references.iter().any(|r| r.0.ends_with(&table.0))
3914                            };
3915                        text_columns.retain(column_referenced);
3916                        exclude_columns.retain(column_referenced);
3917
3918                        // Merge the current text / exclude columns into the new text / exclude columns.
3919                        new_text_columns.extend(text_columns);
3920                        new_exclude_columns.extend(exclude_columns);
3921
3922                        // If we have text columns, add them to the options.
3923                        if !new_text_columns.is_empty() {
3924                            new_text_columns.sort();
3925                            let new_text_columns = new_text_columns
3926                                .into_iter()
3927                                .map(WithOptionValue::UnresolvedItemName)
3928                                .collect();
3929
3930                            curr_options.push(SqlServerConfigOption {
3931                                name: SqlServerConfigOptionName::TextColumns,
3932                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3933                            });
3934                        }
3935                        // If we have exclude columns, add them to the options.
3936                        if !new_exclude_columns.is_empty() {
3937                            new_exclude_columns.sort();
3938                            let new_exclude_columns = new_exclude_columns
3939                                .into_iter()
3940                                .map(WithOptionValue::UnresolvedItemName)
3941                                .collect();
3942
3943                            curr_options.push(SqlServerConfigOption {
3944                                name: SqlServerConfigOptionName::ExcludeColumns,
3945                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3946                            });
3947                        }
3948                    }
3949                    _ => return Err(purification_err()),
3950                };
3951
3952                let mut catalog = self.catalog().for_system_session();
3953                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3954
3955                // Re-define our source in terms of the amended statement
3956                let planned = mz_sql::plan::plan(
3957                    None,
3958                    &catalog,
3959                    Statement::CreateSource(create_source_stmt),
3960                    &Params::empty(),
3961                    &resolved_ids,
3962                )
3963                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3964                let plan = match planned {
3965                    (Plan::CreateSource(plan), _sql_impl_ids) => plan,
3966                    (p, _) => {
3967                        unreachable!("create source plan is only valid response, got {:?}", p)
3968                    }
3969                };
3970
3971                // Asserting that we've done the right thing with dependencies
3972                // here requires mocking out objects in the catalog, which is a
3973                // large task for an operation we have to cover in tests anyway.
3974                let source = Source::new(
3975                    plan,
3976                    cur_source.global_id,
3977                    resolved_ids,
3978                    cur_source.custom_logical_compaction_window,
3979                    cur_source.is_retained_metrics_object,
3980                );
3981
3982                // Get new ingestion description for storage.
3983                let desc = match &source.data_source {
3984                    DataSourceDesc::Ingestion { desc, .. }
3985                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3986                        desc.clone().into_inline_connection(self.catalog().state())
3987                    }
3988                    _ => unreachable!("already verified of type ingestion"),
3989                };
3990
3991                self.controller
3992                    .storage
3993                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
3994                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3995
3996                // Redefine source. This must be done before we create any new
3997                // subsources so that it has the right ingestion.
3998                let mut ops = vec![catalog::Op::UpdateItem {
3999                    id: item_id,
4000                    // Look this up again so we don't have to hold an immutable reference to the
4001                    // entry for so long.
4002                    name: self.catalog.get_entry(&item_id).name().clone(),
4003                    to_item: CatalogItem::Source(source),
4004                }];
4005
4006                let CreateSourceInner {
4007                    ops: new_ops,
4008                    sources: _,
4009                    if_not_exists_ids,
4010                } = self.create_source_inner(session, subsources).await?;
4011
4012                ops.extend(new_ops.into_iter());
4013
4014                assert!(
4015                    if_not_exists_ids.is_empty(),
4016                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4017                );
4018
4019                self.catalog_transact(Some(session), ops).await?;
4020            }
4021            plan::AlterSourceAction::RefreshReferences { references } => {
4022                self.catalog_transact(
4023                    Some(session),
4024                    vec![catalog::Op::UpdateSourceReferences {
4025                        source_id: item_id,
4026                        references: references.into(),
4027                    }],
4028                )
4029                .await?;
4030            }
4031        }
4032
4033        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4034    }
4035
4036    #[instrument]
4037    pub(super) async fn sequence_alter_system_set(
4038        &mut self,
4039        session: &Session,
4040        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4041    ) -> Result<ExecuteResponse, AdapterError> {
4042        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4043        // We want to ensure that the network policy we're switching too actually exists.
4044        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4045            self.validate_alter_system_network_policy(session, &value)?;
4046        }
4047
4048        let op = match value {
4049            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4050                name: name.clone(),
4051                value: OwnedVarInput::SqlSet(values),
4052            },
4053            plan::VariableValue::Default => {
4054                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4055            }
4056        };
4057        self.catalog_transact(Some(session), vec![op]).await?;
4058
4059        session.add_notice(AdapterNotice::VarDefaultUpdated {
4060            role: None,
4061            var_name: Some(name),
4062        });
4063        Ok(ExecuteResponse::AlteredSystemConfiguration)
4064    }
4065
4066    #[instrument]
4067    pub(super) async fn sequence_alter_system_reset(
4068        &mut self,
4069        session: &Session,
4070        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4071    ) -> Result<ExecuteResponse, AdapterError> {
4072        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4073        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4074        self.catalog_transact(Some(session), vec![op]).await?;
4075        session.add_notice(AdapterNotice::VarDefaultUpdated {
4076            role: None,
4077            var_name: Some(name),
4078        });
4079        Ok(ExecuteResponse::AlteredSystemConfiguration)
4080    }
4081
4082    #[instrument]
4083    pub(super) async fn sequence_alter_system_reset_all(
4084        &mut self,
4085        session: &Session,
4086        _: plan::AlterSystemResetAllPlan,
4087    ) -> Result<ExecuteResponse, AdapterError> {
4088        self.is_user_allowed_to_alter_system(session, None)?;
4089        let op = catalog::Op::ResetAllSystemConfiguration;
4090        self.catalog_transact(Some(session), vec![op]).await?;
4091        session.add_notice(AdapterNotice::VarDefaultUpdated {
4092            role: None,
4093            var_name: None,
4094        });
4095        Ok(ExecuteResponse::AlteredSystemConfiguration)
4096    }
4097
4098    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4099    fn is_user_allowed_to_alter_system(
4100        &self,
4101        session: &Session,
4102        var_name: Option<&str>,
4103    ) -> Result<(), AdapterError> {
4104        match (session.user().kind(), var_name) {
4105            // Only internal superusers can reset all system variables.
4106            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4107            // Whether or not a variable can be modified depends if we're an internal superuser.
4108            (UserKind::Superuser, Some(name))
4109                if session.user().is_internal()
4110                    || self.catalog().system_config().user_modifiable(name) =>
4111            {
4112                // In lieu of plumbing the user to all system config functions, just check that
4113                // the var is visible.
4114                let var = self.catalog().system_config().get(name)?;
4115                var.visible(session.user(), self.catalog().system_config())?;
4116                Ok(())
4117            }
4118            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4119            // session variables.
4120            (UserKind::Regular, Some(name))
4121                if self.catalog().system_config().user_modifiable(name) =>
4122            {
4123                Err(AdapterError::Unauthorized(
4124                    rbac::UnauthorizedError::Superuser {
4125                        action: format!("toggle the '{name}' system configuration parameter"),
4126                    },
4127                ))
4128            }
4129            _ => Err(AdapterError::Unauthorized(
4130                rbac::UnauthorizedError::MzSystem {
4131                    action: "alter system".into(),
4132                },
4133            )),
4134        }
4135    }
4136
4137    fn validate_alter_system_network_policy(
4138        &self,
4139        session: &Session,
4140        policy_value: &plan::VariableValue,
4141    ) -> Result<(), AdapterError> {
4142        let policy_name = match &policy_value {
4143            // Make sure the compiled in default still exists.
4144            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4145            plan::VariableValue::Values(values) if values.len() == 1 => {
4146                values.iter().next().cloned()
4147            }
4148            plan::VariableValue::Values(values) => {
4149                tracing::warn!(?values, "can't set multiple network policies at once");
4150                None
4151            }
4152        };
4153        let maybe_network_policy = policy_name
4154            .as_ref()
4155            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4156        let Some(network_policy) = maybe_network_policy else {
4157            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4158                VarError::InvalidParameterValue {
4159                    name: NETWORK_POLICY.name(),
4160                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4161                    reason: "no network policy with such name exists".to_string(),
4162                },
4163            )));
4164        };
4165        self.validate_alter_network_policy(session, &network_policy.rules)
4166    }
4167
4168    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4169    ///
4170    /// This helps prevent users from modifying network policies in a way that would lock out their
4171    /// current connection.
4172    fn validate_alter_network_policy(
4173        &self,
4174        session: &Session,
4175        policy_rules: &Vec<NetworkPolicyRule>,
4176    ) -> Result<(), AdapterError> {
4177        // If the user is not an internal user attempt to protect them from
4178        // blocking themselves.
4179        if session.user().is_internal() {
4180            return Ok(());
4181        }
4182        if let Some(ip) = session.meta().client_ip() {
4183            validate_ip_with_policy_rules(ip, policy_rules)
4184                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4185        } else {
4186            // Sessions without IPs are only temporarily constructed for default values
4187            // they should not be permitted here.
4188            return Err(AdapterError::NetworkPolicyDenied(
4189                NetworkPolicyError::MissingIp,
4190            ));
4191        }
4192        Ok(())
4193    }
4194
4195    // Returns the name of the portal to execute.
4196    #[instrument]
4197    pub(super) fn sequence_execute(
4198        &self,
4199        session: &mut Session,
4200        plan: plan::ExecutePlan,
4201    ) -> Result<String, AdapterError> {
4202        // Verify the stmt is still valid.
4203        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4204        let ps = session
4205            .get_prepared_statement_unverified(&plan.name)
4206            .expect("known to exist");
4207        let stmt = ps.stmt().cloned();
4208        let desc = ps.desc().clone();
4209        let state_revision = ps.state_revision;
4210        let logging = Arc::clone(ps.logging());
4211        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4212    }
4213
4214    #[instrument]
4215    pub(super) async fn sequence_grant_privileges(
4216        &mut self,
4217        session: &Session,
4218        plan::GrantPrivilegesPlan {
4219            update_privileges,
4220            grantees,
4221        }: plan::GrantPrivilegesPlan,
4222    ) -> Result<ExecuteResponse, AdapterError> {
4223        self.sequence_update_privileges(
4224            session,
4225            update_privileges,
4226            grantees,
4227            UpdatePrivilegeVariant::Grant,
4228        )
4229        .await
4230    }
4231
4232    #[instrument]
4233    pub(super) async fn sequence_revoke_privileges(
4234        &mut self,
4235        session: &Session,
4236        plan::RevokePrivilegesPlan {
4237            update_privileges,
4238            revokees,
4239        }: plan::RevokePrivilegesPlan,
4240    ) -> Result<ExecuteResponse, AdapterError> {
4241        self.sequence_update_privileges(
4242            session,
4243            update_privileges,
4244            revokees,
4245            UpdatePrivilegeVariant::Revoke,
4246        )
4247        .await
4248    }
4249
4250    #[instrument]
4251    async fn sequence_update_privileges(
4252        &mut self,
4253        session: &Session,
4254        update_privileges: Vec<UpdatePrivilege>,
4255        grantees: Vec<RoleId>,
4256        variant: UpdatePrivilegeVariant,
4257    ) -> Result<ExecuteResponse, AdapterError> {
4258        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4259        let mut warnings = Vec::new();
4260        let catalog = self.catalog().for_session(session);
4261
4262        for UpdatePrivilege {
4263            acl_mode,
4264            target_id,
4265            grantor,
4266        } in update_privileges
4267        {
4268            let actual_object_type = catalog.get_system_object_type(&target_id);
4269            // For all relations we allow all applicable table privileges, but send a warning if the
4270            // privilege isn't actually applicable to the object type.
4271            if actual_object_type.is_relation() {
4272                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4273                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4274                if !non_applicable_privileges.is_empty() {
4275                    let object_description =
4276                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4277                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4278                        non_applicable_privileges,
4279                        object_description,
4280                    })
4281                }
4282            }
4283
4284            if let SystemObjectId::Object(object_id) = &target_id {
4285                self.catalog()
4286                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4287            }
4288
4289            let privileges = self
4290                .catalog()
4291                .get_privileges(&target_id, session.conn_id())
4292                // Should be unreachable since the parser will refuse to parse grant/revoke
4293                // statements on objects without privileges.
4294                .ok_or(AdapterError::Unsupported(
4295                    "GRANTs/REVOKEs on an object type with no privileges",
4296                ))?;
4297
4298            for grantee in &grantees {
4299                self.catalog().ensure_not_system_role(grantee)?;
4300                self.catalog().ensure_not_predefined_role(grantee)?;
4301                let existing_privilege = privileges
4302                    .get_acl_item(grantee, &grantor)
4303                    .map(Cow::Borrowed)
4304                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4305
4306                match variant {
4307                    UpdatePrivilegeVariant::Grant
4308                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4309                    {
4310                        ops.push(catalog::Op::UpdatePrivilege {
4311                            target_id: target_id.clone(),
4312                            privilege: MzAclItem {
4313                                grantee: *grantee,
4314                                grantor,
4315                                acl_mode,
4316                            },
4317                            variant,
4318                        });
4319                    }
4320                    UpdatePrivilegeVariant::Revoke
4321                        if !existing_privilege
4322                            .acl_mode
4323                            .intersection(acl_mode)
4324                            .is_empty() =>
4325                    {
4326                        ops.push(catalog::Op::UpdatePrivilege {
4327                            target_id: target_id.clone(),
4328                            privilege: MzAclItem {
4329                                grantee: *grantee,
4330                                grantor,
4331                                acl_mode,
4332                            },
4333                            variant,
4334                        });
4335                    }
4336                    // no-op
4337                    _ => {}
4338                }
4339            }
4340        }
4341
4342        if ops.is_empty() {
4343            session.add_notices(warnings);
4344            return Ok(variant.into());
4345        }
4346
4347        let res = self
4348            .catalog_transact(Some(session), ops)
4349            .await
4350            .map(|_| match variant {
4351                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4352                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4353            });
4354        if res.is_ok() {
4355            session.add_notices(warnings);
4356        }
4357        res
4358    }
4359
4360    #[instrument]
4361    pub(super) async fn sequence_alter_default_privileges(
4362        &mut self,
4363        session: &Session,
4364        plan::AlterDefaultPrivilegesPlan {
4365            privilege_objects,
4366            privilege_acl_items,
4367            is_grant,
4368        }: plan::AlterDefaultPrivilegesPlan,
4369    ) -> Result<ExecuteResponse, AdapterError> {
4370        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4371        let variant = if is_grant {
4372            UpdatePrivilegeVariant::Grant
4373        } else {
4374            UpdatePrivilegeVariant::Revoke
4375        };
4376        for privilege_object in &privilege_objects {
4377            self.catalog()
4378                .ensure_not_system_role(&privilege_object.role_id)?;
4379            self.catalog()
4380                .ensure_not_predefined_role(&privilege_object.role_id)?;
4381            if let Some(database_id) = privilege_object.database_id {
4382                self.catalog()
4383                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4384            }
4385            if let Some(schema_id) = privilege_object.schema_id {
4386                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4387                let schema_spec: SchemaSpecifier = schema_id.into();
4388
4389                self.catalog().ensure_not_reserved_object(
4390                    &(database_spec, schema_spec).into(),
4391                    session.conn_id(),
4392                )?;
4393            }
4394            for privilege_acl_item in &privilege_acl_items {
4395                self.catalog()
4396                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4397                self.catalog()
4398                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4399                ops.push(catalog::Op::UpdateDefaultPrivilege {
4400                    privilege_object: privilege_object.clone(),
4401                    privilege_acl_item: privilege_acl_item.clone(),
4402                    variant,
4403                })
4404            }
4405        }
4406
4407        self.catalog_transact(Some(session), ops).await?;
4408        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4409    }
4410
4411    #[instrument]
4412    pub(super) async fn sequence_grant_role(
4413        &mut self,
4414        session: &Session,
4415        plan::GrantRolePlan {
4416            role_ids,
4417            member_ids,
4418            grantor_id,
4419        }: plan::GrantRolePlan,
4420    ) -> Result<ExecuteResponse, AdapterError> {
4421        let catalog = self.catalog();
4422        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4423        for role_id in role_ids {
4424            for member_id in &member_ids {
4425                let member_membership: BTreeSet<_> =
4426                    catalog.get_role(member_id).membership().keys().collect();
4427                if member_membership.contains(&role_id) {
4428                    let role_name = catalog.get_role(&role_id).name().to_string();
4429                    let member_name = catalog.get_role(member_id).name().to_string();
4430                    // We need this check so we don't accidentally return a success on a reserved role.
4431                    catalog.ensure_not_reserved_role(member_id)?;
4432                    catalog.ensure_grantable_role(&role_id)?;
4433                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4434                        role_name,
4435                        member_name,
4436                    });
4437                } else {
4438                    ops.push(catalog::Op::GrantRole {
4439                        role_id,
4440                        member_id: *member_id,
4441                        grantor_id,
4442                    });
4443                }
4444            }
4445        }
4446
4447        if ops.is_empty() {
4448            return Ok(ExecuteResponse::GrantedRole);
4449        }
4450
4451        self.catalog_transact(Some(session), ops)
4452            .await
4453            .map(|_| ExecuteResponse::GrantedRole)
4454    }
4455
4456    #[instrument]
4457    pub(super) async fn sequence_revoke_role(
4458        &mut self,
4459        session: &Session,
4460        plan::RevokeRolePlan {
4461            role_ids,
4462            member_ids,
4463            grantor_id,
4464        }: plan::RevokeRolePlan,
4465    ) -> Result<ExecuteResponse, AdapterError> {
4466        let catalog = self.catalog();
4467        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4468        for role_id in role_ids {
4469            for member_id in &member_ids {
4470                let member_membership: BTreeSet<_> =
4471                    catalog.get_role(member_id).membership().keys().collect();
4472                if !member_membership.contains(&role_id) {
4473                    let role_name = catalog.get_role(&role_id).name().to_string();
4474                    let member_name = catalog.get_role(member_id).name().to_string();
4475                    // We need this check so we don't accidentally return a success on a reserved role.
4476                    catalog.ensure_not_reserved_role(member_id)?;
4477                    catalog.ensure_grantable_role(&role_id)?;
4478                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4479                        role_name,
4480                        member_name,
4481                    });
4482                } else {
4483                    ops.push(catalog::Op::RevokeRole {
4484                        role_id,
4485                        member_id: *member_id,
4486                        grantor_id,
4487                    });
4488                }
4489            }
4490        }
4491
4492        if ops.is_empty() {
4493            return Ok(ExecuteResponse::RevokedRole);
4494        }
4495
4496        self.catalog_transact(Some(session), ops)
4497            .await
4498            .map(|_| ExecuteResponse::RevokedRole)
4499    }
4500
4501    #[instrument]
4502    pub(super) async fn sequence_alter_owner(
4503        &mut self,
4504        session: &Session,
4505        plan::AlterOwnerPlan {
4506            id,
4507            object_type,
4508            new_owner,
4509        }: plan::AlterOwnerPlan,
4510    ) -> Result<ExecuteResponse, AdapterError> {
4511        let mut ops = vec![catalog::Op::UpdateOwner {
4512            id: id.clone(),
4513            new_owner,
4514        }];
4515
4516        match &id {
4517            ObjectId::Item(global_id) => {
4518                let entry = self.catalog().get_entry(global_id);
4519
4520                // Cannot directly change the owner of an index.
4521                if entry.is_index() {
4522                    let name = self
4523                        .catalog()
4524                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4525                        .to_string();
4526                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4527                    return Ok(ExecuteResponse::AlteredObject(object_type));
4528                }
4529
4530                // Alter owner cascades down to dependent indexes.
4531                let dependent_index_ops = entry
4532                    .used_by()
4533                    .into_iter()
4534                    .filter(|id| self.catalog().get_entry(id).is_index())
4535                    .map(|id| catalog::Op::UpdateOwner {
4536                        id: ObjectId::Item(*id),
4537                        new_owner,
4538                    });
4539                ops.extend(dependent_index_ops);
4540
4541                // Alter owner cascades down to progress collections.
4542                let dependent_subsources =
4543                    entry
4544                        .progress_id()
4545                        .into_iter()
4546                        .map(|item_id| catalog::Op::UpdateOwner {
4547                            id: ObjectId::Item(item_id),
4548                            new_owner,
4549                        });
4550                ops.extend(dependent_subsources);
4551            }
4552            ObjectId::Cluster(cluster_id) => {
4553                let cluster = self.catalog().get_cluster(*cluster_id);
4554                // Alter owner cascades down to cluster replicas.
4555                let managed_cluster_replica_ops =
4556                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4557                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4558                        new_owner,
4559                    });
4560                ops.extend(managed_cluster_replica_ops);
4561            }
4562            _ => {}
4563        }
4564
4565        self.catalog_transact(Some(session), ops)
4566            .await
4567            .map(|_| ExecuteResponse::AlteredObject(object_type))
4568    }
4569
4570    #[instrument]
4571    pub(super) async fn sequence_reassign_owned(
4572        &mut self,
4573        session: &Session,
4574        plan::ReassignOwnedPlan {
4575            old_roles,
4576            new_role,
4577            reassign_ids,
4578        }: plan::ReassignOwnedPlan,
4579    ) -> Result<ExecuteResponse, AdapterError> {
4580        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4581            self.catalog().ensure_not_reserved_role(role_id)?;
4582        }
4583
4584        let ops = reassign_ids
4585            .into_iter()
4586            .map(|id| catalog::Op::UpdateOwner {
4587                id,
4588                new_owner: new_role,
4589            })
4590            .collect();
4591
4592        self.catalog_transact(Some(session), ops)
4593            .await
4594            .map(|_| ExecuteResponse::ReassignOwned)
4595    }
4596
4597    #[instrument]
4598    pub(crate) async fn handle_deferred_statement(&mut self) {
4599        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4600        // was processed, removing the single element from deferred_statements, so it is expected
4601        // that this is sometimes empty.
4602        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4603            return;
4604        };
4605        match ps {
4606            crate::coord::PlanStatement::Statement { stmt, params } => {
4607                self.handle_execute_inner(stmt, params, ctx).await;
4608            }
4609            crate::coord::PlanStatement::Plan {
4610                plan,
4611                resolved_ids,
4612                sql_impl_resolved_ids,
4613            } => {
4614                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_resolved_ids)
4615                    .await;
4616            }
4617        }
4618    }
4619
4620    #[instrument]
4621    // TODO(parkmycar): Remove this once we have an actual implementation.
4622    #[allow(clippy::unused_async)]
4623    pub(super) async fn sequence_alter_table(
4624        &mut self,
4625        ctx: &mut ExecuteContext,
4626        plan: plan::AlterTablePlan,
4627    ) -> Result<ExecuteResponse, AdapterError> {
4628        let plan::AlterTablePlan {
4629            relation_id,
4630            column_name,
4631            column_type,
4632            raw_sql_type,
4633        } = plan;
4634
4635        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4636        let (_, new_global_id) = self.allocate_user_id().await?;
4637        let ops = vec![catalog::Op::AlterAddColumn {
4638            id: relation_id,
4639            new_global_id,
4640            name: column_name,
4641            typ: column_type,
4642            sql: raw_sql_type,
4643        }];
4644
4645        self.catalog_transact_with_context(None, Some(ctx), ops)
4646            .await?;
4647
4648        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4649    }
4650
4651    /// Prepares to apply a replacement materialized view.
4652    #[instrument]
4653    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4654        &mut self,
4655        ctx: ExecuteContext,
4656        plan: AlterMaterializedViewApplyReplacementPlan,
4657    ) {
4658        // To ensure there is no time gap in the output, we can only apply a replacement if the
4659        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4660        // might not be the case initially, so we have to wait. To this end, we install a watch set
4661        // waiting for the target MV's write frontier to advance sufficiently.
4662        //
4663        // Note that the replacement's dataflow is not performing any writes, so it can only be
4664        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4665        // replacement's write frontier is always <= the target's.
4666
4667        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4668
4669        let plan_validity = PlanValidity::new(
4670            self.catalog().transient_revision(),
4671            BTreeSet::from_iter([id, replacement_id]),
4672            None,
4673            None,
4674            ctx.session().role_metadata().clone(),
4675        );
4676
4677        let target = self.catalog.get_entry(&id);
4678        let target_gid = target.latest_global_id();
4679
4680        let replacement = self.catalog.get_entry(&replacement_id);
4681        let replacement_gid = replacement.latest_global_id();
4682
4683        let target_upper = self
4684            .controller
4685            .storage_collections
4686            .collection_frontiers(target_gid)
4687            .expect("target MV exists")
4688            .write_frontier;
4689        let replacement_upper = self
4690            .controller
4691            .compute
4692            .collection_frontiers(replacement_gid, replacement.cluster_id())
4693            .expect("replacement MV exists")
4694            .write_frontier;
4695
4696        info!(
4697            %id, %replacement_id, ?target_upper, ?replacement_upper,
4698            "preparing materialized view replacement application",
4699        );
4700
4701        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4702            // A replacement's write frontier can only become empty if the target's write frontier
4703            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4704            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4705            // the user by returning an error, rather than applying the useless replacement.
4706            //
4707            // Note that we can't assert on `target_upper` being empty here, because the reporting
4708            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4709            // from persist, which we cannot do without incurring I/O.
4710            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4711                name: target.name().item.clone(),
4712            }));
4713            return;
4714        };
4715
4716        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4717        // specified timestamp. Since we only need to wait until the target frontier is >= the
4718        // replacement's frontier, we can step back the timestamp.
4719        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4720
4721        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4722        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4723        // command.
4724        self.install_storage_watch_set(
4725            ctx.session().conn_id().clone(),
4726            BTreeSet::from_iter([target_gid]),
4727            replacement_upper_ts,
4728            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4729                ctx: Some(ctx),
4730                otel_ctx: OpenTelemetryContext::obtain(),
4731                plan,
4732                plan_validity,
4733            }),
4734        )
4735        .expect("target collection exists");
4736    }
4737
4738    /// Finishes applying a replacement materialized view after the frontier wait completed.
4739    #[instrument]
4740    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4741        &mut self,
4742        mut ctx: AlterMaterializedViewReadyContext,
4743    ) {
4744        ctx.otel_ctx.attach_as_parent();
4745
4746        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4747
4748        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4749        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4750        // world has arbitrarily changed since we performed planning, and we must re-assert
4751        // that it still matches our requirements.
4752        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4753            ctx.retire(Err(err));
4754            return;
4755        }
4756
4757        info!(
4758            %id, %replacement_id,
4759            "finishing materialized view replacement application",
4760        );
4761
4762        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4763        match self
4764            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4765            .await
4766        {
4767            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4768                ObjectType::MaterializedView,
4769            ))),
4770            Err(err) => ctx.retire(Err(err)),
4771        }
4772    }
4773
4774    pub(super) async fn statistics_oracle(
4775        &self,
4776        session: &Session,
4777        source_ids: &BTreeSet<GlobalId>,
4778        query_as_of: &Antichain<Timestamp>,
4779        is_oneshot: bool,
4780    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4781        super::statistics_oracle(
4782            session,
4783            source_ids,
4784            query_as_of,
4785            is_oneshot,
4786            self.catalog().system_config(),
4787            self.controller.storage_collections.as_ref(),
4788        )
4789        .await
4790    }
4791}
4792
4793impl Coordinator {
4794    /// Emit the raw optimizer notices in `notices` to the user's session, if
4795    /// any.
4796    ///
4797    /// This intentionally consumes `RawOptimizerNotice`s (not pre-rendered
4798    /// ones) because the user-facing rendering goes through the user's
4799    /// session-aware humanizer, which produces e.g. schema-qualified names
4800    /// relative to the user's current database/schema.
4801    pub(crate) fn emit_raw_optimizer_notices_to_user(
4802        &self,
4803        ctx: &ExecuteContext,
4804        notices: &[RawOptimizerNotice],
4805    ) {
4806        emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4807    }
4808
4809    /// Persist already-rendered optimizer notices for a newly created
4810    /// non-transient dataflow.
4811    ///
4812    /// This:
4813    /// - packs builtin-table updates for `mz_optimizer_notices` (if enabled),
4814    /// - stores the rendered metainfo on the catalog object via
4815    ///   `set_dataflow_metainfo`,
4816    /// - and returns a future that resolves once the builtin-table append
4817    ///   has been observed, or `None` if nothing was appended.
4818    async fn persist_dataflow_metainfo(
4819        &mut self,
4820        df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4821        export_id: GlobalId,
4822    ) -> Option<BuiltinTableAppendNotify> {
4823        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4824        // in-memory state.
4825        if self.catalog().state().system_config().enable_mz_notices()
4826            && !df_meta.optimizer_notices.is_empty()
4827        {
4828            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4829            self.catalog().state().pack_optimizer_notices(
4830                &mut builtin_table_updates,
4831                df_meta.optimizer_notices.iter(),
4832                Diff::ONE,
4833            );
4834
4835            // Save the metainfo.
4836            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4837
4838            Some(
4839                self.builtin_table_update()
4840                    .execute(builtin_table_updates)
4841                    .await
4842                    .0,
4843            )
4844        } else {
4845            // Save the metainfo.
4846            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4847
4848            None
4849        }
4850    }
4851}