Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44    RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49    SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
55    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59    SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63    StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
101use crate::coord::read_then_write::validate_read_then_write_dependencies;
102use crate::coord::sequencer::emit_optimizer_notices;
103use crate::coord::{
104    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
105    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
106    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
107    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
108    WatchSetResponse, validate_ip_with_policy_rules,
109};
110use crate::error::AdapterError;
111use crate::notice::{AdapterNotice, DroppedInUseIndex};
112use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
113use crate::optimize::{self, Optimize};
114use crate::session::{
115    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
116    WriteLocks, WriteOp,
117};
118use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
119use crate::{PeekResponseUnary, ReadHolds};
120
121/// A future that resolves to a real-time recency timestamp.
122type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
123
124mod cluster;
125mod copy_from;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134/// Attempts to evaluate an expression. If an error is returned then the error is sent
135/// to the client and the function is exited.
136macro_rules! return_if_err {
137    ($expr:expr, $ctx:expr) => {
138        match $expr {
139            Ok(v) => v,
140            Err(e) => return $ctx.retire(Err(e.into())),
141        }
142    };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148    ops: Vec<catalog::Op>,
149    dropped_active_db: bool,
150    dropped_active_cluster: bool,
151    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154// A bundle of values returned from create_source_inner
155struct CreateSourceInner {
156    ops: Vec<catalog::Op>,
157    sources: Vec<(CatalogItemId, Source)>,
158    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
163    /// execute both on and off of the coordinator thread. Stages can either produce another stage
164    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
165    /// tracing.
166    pub(crate) async fn sequence_staged<S>(
167        &mut self,
168        mut ctx: S::Ctx,
169        parent_span: Span,
170        mut stage: S,
171    ) where
172        S: Staged + 'static,
173        S::Ctx: Send + 'static,
174    {
175        return_if_err!(stage.validity().check(self.catalog()), ctx);
176        loop {
177            let mut cancel_enabled = stage.cancel_enabled();
178            if let Some(session) = ctx.session() {
179                if cancel_enabled {
180                    // Channel to await cancellation. Insert a new channel, but check if the previous one
181                    // was already canceled.
182                    if let Some((_prev_tx, prev_rx)) = self
183                        .staged_cancellation
184                        .insert(session.conn_id().clone(), watch::channel(false))
185                    {
186                        let was_canceled = *prev_rx.borrow();
187                        if was_canceled {
188                            ctx.retire(Err(AdapterError::Canceled));
189                            return;
190                        }
191                    }
192                } else {
193                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
194                    // when cancel_enabled may have been true on an earlier stage.
195                    self.staged_cancellation.remove(session.conn_id());
196                }
197            } else {
198                cancel_enabled = false
199            };
200            let next = stage
201                .stage(self, &mut ctx)
202                .instrument(parent_span.clone())
203                .await;
204            let res = return_if_err!(next, ctx);
205            stage = match res {
206                StageResult::Handle(handle) => {
207                    let internal_cmd_tx = self.internal_cmd_tx.clone();
208                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210                    });
211                    return;
212                }
213                StageResult::HandleRetire(handle) => {
214                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215                        ctx.retire(Ok(resp));
216                    });
217                    return;
218                }
219                StageResult::Response(resp) => {
220                    ctx.retire(Ok(resp));
221                    return;
222                }
223                StageResult::Immediate(stage) => *stage,
224            }
225        }
226    }
227
228    fn handle_spawn<C, T, F>(
229        &self,
230        ctx: C,
231        handle: JoinHandle<Result<T, AdapterError>>,
232        cancel_enabled: bool,
233        f: F,
234    ) where
235        C: StagedContext + Send + 'static,
236        T: Send + 'static,
237        F: FnOnce(C, T) + Send + 'static,
238    {
239        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240            .session()
241            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242        {
243            let mut rx = rx.clone();
244            Box::pin(async move {
245                // Wait for true or dropped sender.
246                let _ = rx.wait_for(|v| *v).await;
247                ()
248            })
249        } else {
250            Box::pin(future::pending())
251        };
252        spawn(|| "sequence_staged", async move {
253            tokio::select! {
254                res = handle => {
255                    let next = return_if_err!(res, ctx);
256                    f(ctx, next);
257                }
258                _ = rx, if cancel_enabled => {
259                    ctx.retire(Err(AdapterError::Canceled));
260                }
261            }
262        });
263    }
264
265    async fn create_source_inner(
266        &self,
267        session: &Session,
268        plans: Vec<plan::CreateSourcePlanBundle>,
269    ) -> Result<CreateSourceInner, AdapterError> {
270        let mut ops = vec![];
271        let mut sources = vec![];
272
273        let if_not_exists_ids = plans
274            .iter()
275            .filter_map(
276                |plan::CreateSourcePlanBundle {
277                     item_id,
278                     global_id: _,
279                     plan,
280                     resolved_ids: _,
281                     available_source_references: _,
282                 }| {
283                    if plan.if_not_exists {
284                        Some((*item_id, plan.name.clone()))
285                    } else {
286                        None
287                    }
288                },
289            )
290            .collect::<BTreeMap<_, _>>();
291
292        for plan::CreateSourcePlanBundle {
293            item_id,
294            global_id,
295            mut plan,
296            resolved_ids,
297            available_source_references,
298        } in plans
299        {
300            let name = plan.name.clone();
301
302            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
303            if let mz_sql::plan::DataSourceDesc::Webhook {
304                validate_using: Some(validate),
305                ..
306            } = &mut plan.source.data_source
307            {
308                if let Err(reason) = validate.reduce_expression().await {
309                    self.metrics
310                        .webhook_validation_reduce_failures
311                        .with_label_values(&[reason])
312                        .inc();
313                    return Err(AdapterError::Internal(format!(
314                        "failed to reduce check expression, {reason}"
315                    )));
316                }
317            }
318
319            // If this source contained a set of available source references, update the
320            // source references catalog table.
321            let mut reference_ops = vec![];
322            if let Some(references) = &available_source_references {
323                reference_ops.push(catalog::Op::UpdateSourceReferences {
324                    source_id: item_id,
325                    references: references.clone().into(),
326                });
327            }
328
329            let source = Source::new(plan, global_id, resolved_ids, None, false);
330            ops.push(catalog::Op::CreateItem {
331                id: item_id,
332                name,
333                item: CatalogItem::Source(source.clone()),
334                owner_id: *session.current_role_id(),
335            });
336            sources.push((item_id, source));
337            // These operations must be executed after the source is added to the catalog.
338            ops.extend(reference_ops);
339        }
340
341        Ok(CreateSourceInner {
342            ops,
343            sources,
344            if_not_exists_ids,
345        })
346    }
347
348    /// Subsources are planned differently from other statements because they
349    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
350    /// Because of this, we have usually "missed" the opportunity to plan them
351    /// through the normal statement execution life cycle (the exception being
352    /// during bootstrapping).
353    ///
354    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
355    pub(crate) fn plan_subsource(
356        &self,
357        session: &Session,
358        params: &mz_sql::plan::Params,
359        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
360        item_id: CatalogItemId,
361        global_id: GlobalId,
362    ) -> Result<CreateSourcePlanBundle, AdapterError> {
363        let catalog = self.catalog().for_session(session);
364        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
365
366        let (plan, _sql_impl_ids) = self.plan_statement(
367            session,
368            Statement::CreateSubsource(subsource_stmt),
369            params,
370            &resolved_ids,
371        )?;
372        let plan = match plan {
373            Plan::CreateSource(plan) => plan,
374            _ => unreachable!(),
375        };
376        Ok(CreateSourcePlanBundle {
377            item_id,
378            global_id,
379            plan,
380            resolved_ids,
381            available_source_references: None,
382        })
383    }
384
385    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
386    pub(crate) async fn plan_purified_alter_source_add_subsource(
387        &mut self,
388        session: &Session,
389        params: Params,
390        source_name: ResolvedItemName,
391        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
392        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
393    ) -> Result<(Plan, ResolvedIds), AdapterError> {
394        let mut subsource_plans = Vec::with_capacity(subsources.len());
395
396        // Generate subsource statements
397        let conn_catalog = self.catalog().for_system_session();
398        let pcx = plan::PlanContext::zero();
399        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
400
401        let entry = self.catalog().get_entry(source_name.item_id());
402        let source = entry.source().ok_or_else(|| {
403            AdapterError::internal(
404                "plan alter source",
405                format!("expected Source found {entry:?}"),
406            )
407        })?;
408
409        let item_id = entry.id();
410        let ingestion_id = source.global_id();
411        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
412
413        let ids = self
414            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
415            .await?;
416        for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
417            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
418            subsource_plans.push(s);
419        }
420
421        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
422            subsources: subsource_plans,
423            options,
424        };
425
426        Ok((
427            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
428                item_id,
429                ingestion_id,
430                action,
431            }),
432            ResolvedIds::empty(),
433        ))
434    }
435
436    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
437    pub(crate) fn plan_purified_alter_source_refresh_references(
438        &self,
439        _session: &Session,
440        _params: Params,
441        source_name: ResolvedItemName,
442        available_source_references: plan::SourceReferences,
443    ) -> Result<(Plan, ResolvedIds), AdapterError> {
444        let entry = self.catalog().get_entry(source_name.item_id());
445        let source = entry.source().ok_or_else(|| {
446            AdapterError::internal(
447                "plan alter source",
448                format!("expected Source found {entry:?}"),
449            )
450        })?;
451        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
452            references: available_source_references,
453        };
454
455        Ok((
456            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
457                item_id: entry.id(),
458                ingestion_id: source.global_id(),
459                action,
460            }),
461            ResolvedIds::empty(),
462        ))
463    }
464
465    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
466    /// the primary source, and any ingestion export subsources (e.g. PG
467    /// tables).
468    pub(crate) async fn plan_purified_create_source(
469        &mut self,
470        ctx: &ExecuteContext,
471        params: Params,
472        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
473        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
474        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
475        available_source_references: plan::SourceReferences,
476    ) -> Result<(Plan, ResolvedIds), AdapterError> {
477        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
478
479        // 1. First plan the progress subsource, if any.
480        if let Some(progress_stmt) = progress_stmt {
481            // The primary source depends on this subsource because the primary
482            // source needs to know its shard ID, and the easiest way of
483            // guaranteeing that the shard ID is discoverable is to create this
484            // collection first.
485            assert_none!(progress_stmt.of_source);
486            let (item_id, global_id) = self.allocate_user_id().await?;
487            let progress_plan =
488                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
489            let progress_full_name = self
490                .catalog()
491                .resolve_full_name(&progress_plan.plan.name, None);
492            let progress_subsource = ResolvedItemName::Item {
493                id: progress_plan.item_id,
494                qualifiers: progress_plan.plan.name.qualifiers.clone(),
495                full_name: progress_full_name,
496                print_id: true,
497                version: RelationVersionSelector::Latest,
498            };
499
500            create_source_plans.push(progress_plan);
501
502            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
503        }
504
505        let catalog = self.catalog().for_session(ctx.session());
506        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
507
508        let propagated_with_options: Vec<_> = source_stmt
509            .with_options
510            .iter()
511            .filter_map(|opt| match opt.name {
512                CreateSourceOptionName::TimestampInterval => None,
513                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
514                    name: CreateSubsourceOptionName::RetainHistory,
515                    value: opt.value.clone(),
516                }),
517            })
518            .collect();
519
520        // 2. Then plan the main source.
521        let source_plan = match self.plan_statement(
522            ctx.session(),
523            Statement::CreateSource(source_stmt),
524            &params,
525            &resolved_ids,
526        )? {
527            (Plan::CreateSource(plan), _sql_impl_ids) => plan,
528            (p, _) => unreachable!("s must be CreateSourcePlan but got {:?}", p),
529        };
530
531        let (item_id, global_id) = self.allocate_user_id().await?;
532
533        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
534        let of_source = ResolvedItemName::Item {
535            id: item_id,
536            qualifiers: source_plan.name.qualifiers.clone(),
537            full_name: source_full_name,
538            print_id: true,
539            version: RelationVersionSelector::Latest,
540        };
541
542        // Generate subsource statements
543        let conn_catalog = self.catalog().for_system_session();
544        let pcx = plan::PlanContext::zero();
545        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
546
547        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
548
549        for subsource_stmt in subsource_stmts.iter_mut() {
550            subsource_stmt
551                .with_options
552                .extend(propagated_with_options.iter().cloned())
553        }
554
555        create_source_plans.push(CreateSourcePlanBundle {
556            item_id,
557            global_id,
558            plan: source_plan,
559            resolved_ids: resolved_ids.clone(),
560            available_source_references: Some(available_source_references),
561        });
562
563        // 3. Finally, plan all the subsources
564        let ids = self
565            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
566            .await?;
567        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
568            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
569            create_source_plans.push(plan);
570        }
571
572        Ok((
573            Plan::CreateSources(create_source_plans),
574            ResolvedIds::empty(),
575        ))
576    }
577
578    #[instrument]
579    pub(super) async fn sequence_create_source(
580        &mut self,
581        ctx: &mut ExecuteContext,
582        plans: Vec<plan::CreateSourcePlanBundle>,
583    ) -> Result<ExecuteResponse, AdapterError> {
584        let CreateSourceInner {
585            ops,
586            sources,
587            if_not_exists_ids,
588        } = self.create_source_inner(ctx.session(), plans).await?;
589
590        let transact_result = self
591            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
592            .await;
593
594        // Check if any sources are webhook sources and report them as created.
595        for (item_id, source) in &sources {
596            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
597                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
598                    ctx.session()
599                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
600                }
601            }
602        }
603
604        match transact_result {
605            Ok(()) => Ok(ExecuteResponse::CreatedSource),
606            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
607                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
608            })) if if_not_exists_ids.contains_key(&id) => {
609                ctx.session()
610                    .add_notice(AdapterNotice::ObjectAlreadyExists {
611                        name: if_not_exists_ids[&id].item.clone(),
612                        ty: "source",
613                    });
614                Ok(ExecuteResponse::CreatedSource)
615            }
616            Err(err) => Err(err),
617        }
618    }
619
620    #[instrument]
621    pub(super) async fn sequence_create_connection(
622        &mut self,
623        mut ctx: ExecuteContext,
624        plan: plan::CreateConnectionPlan,
625        resolved_ids: ResolvedIds,
626    ) {
627        let (connection_id, connection_gid) = match self.allocate_user_id().await {
628            Ok(item_id) => item_id,
629            Err(err) => return ctx.retire(Err(err)),
630        };
631
632        match &plan.connection.details {
633            ConnectionDetails::Ssh { key_1, key_2, .. } => {
634                let key_1 = match key_1.as_key_pair() {
635                    Some(key_1) => key_1.clone(),
636                    None => {
637                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
638                            "the PUBLIC KEY 1 option cannot be explicitly specified"
639                        ))));
640                    }
641                };
642
643                let key_2 = match key_2.as_key_pair() {
644                    Some(key_2) => key_2.clone(),
645                    None => {
646                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
647                            "the PUBLIC KEY 2 option cannot be explicitly specified"
648                        ))));
649                    }
650                };
651
652                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
653                let secret = key_set.to_bytes();
654                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
655                    return ctx.retire(Err(err.into()));
656                }
657                self.caching_secrets_reader.invalidate(connection_id);
658            }
659            _ => (),
660        };
661
662        if plan.validate {
663            let internal_cmd_tx = self.internal_cmd_tx.clone();
664            let transient_revision = self.catalog().transient_revision();
665            let conn_id = ctx.session().conn_id().clone();
666            let otel_ctx = OpenTelemetryContext::obtain();
667            let role_metadata = ctx.session().role_metadata().clone();
668
669            let connection = plan
670                .connection
671                .details
672                .to_connection()
673                .into_inline_connection(self.catalog().state());
674
675            let current_storage_parameters = self.controller.storage.config().clone();
676            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
677                let result = match std::panic::AssertUnwindSafe(
678                    connection.validate(connection_id, &current_storage_parameters),
679                )
680                .ore_catch_unwind()
681                .await
682                {
683                    Ok(Ok(())) => Ok(plan),
684                    Ok(Err(err)) => Err(err.into()),
685                    Err(_panic) => {
686                        tracing::error!("connection validation panicked");
687                        Err(AdapterError::Internal(
688                            "connection validation panicked".into(),
689                        ))
690                    }
691                };
692
693                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
694                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
695                    CreateConnectionValidationReady {
696                        ctx,
697                        result,
698                        connection_id,
699                        connection_gid,
700                        plan_validity: PlanValidity::new(
701                            transient_revision,
702                            resolved_ids.items().copied().collect(),
703                            None,
704                            None,
705                            role_metadata,
706                        ),
707                        otel_ctx,
708                        resolved_ids: resolved_ids.clone(),
709                    },
710                ));
711                if let Err(e) = result {
712                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
713                }
714            });
715        } else {
716            let result = self
717                .sequence_create_connection_stage_finish(
718                    &mut ctx,
719                    connection_id,
720                    connection_gid,
721                    plan,
722                    resolved_ids,
723                )
724                .await;
725            ctx.retire(result);
726        }
727    }
728
729    #[instrument]
730    pub(crate) async fn sequence_create_connection_stage_finish(
731        &mut self,
732        ctx: &mut ExecuteContext,
733        connection_id: CatalogItemId,
734        connection_gid: GlobalId,
735        plan: plan::CreateConnectionPlan,
736        resolved_ids: ResolvedIds,
737    ) -> Result<ExecuteResponse, AdapterError> {
738        let ops = vec![catalog::Op::CreateItem {
739            id: connection_id,
740            name: plan.name.clone(),
741            item: CatalogItem::Connection(Connection {
742                create_sql: plan.connection.create_sql,
743                global_id: connection_gid,
744                details: plan.connection.details.clone(),
745                resolved_ids,
746            }),
747            owner_id: *ctx.session().current_role_id(),
748        }];
749
750        // VPC endpoint creation for AWS PrivateLink connections is now handled
751        // in apply_catalog_implications.
752        let conn_id = ctx.session().conn_id().clone();
753        let transact_result = self
754            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
755            .await;
756
757        match transact_result {
758            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
759            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
760                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
761            })) if plan.if_not_exists => {
762                // Clean up SSH key material if it was persisted, since the
763                // catalog item was not created.
764                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
765                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
766                        tracing::warn!(
767                            "Dropping SSH secret for existing connection has encountered an error: {}",
768                            e
769                        );
770                    } else {
771                        self.caching_secrets_reader.invalidate(connection_id);
772                    }
773                }
774                ctx.session()
775                    .add_notice(AdapterNotice::ObjectAlreadyExists {
776                        name: plan.name.item,
777                        ty: "connection",
778                    });
779                Ok(ExecuteResponse::CreatedConnection)
780            }
781            Err(err) => {
782                // Clean up SSH key material if it was persisted, since the
783                // catalog item was not created.
784                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
785                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
786                        tracing::warn!(
787                            "Dropping SSH secret for failed connection has encountered an error: {}",
788                            e
789                        );
790                    } else {
791                        self.caching_secrets_reader.invalidate(connection_id);
792                    }
793                }
794                Err(err)
795            }
796        }
797    }
798
799    #[instrument]
800    pub(super) async fn sequence_create_database(
801        &mut self,
802        session: &Session,
803        plan: plan::CreateDatabasePlan,
804    ) -> Result<ExecuteResponse, AdapterError> {
805        let ops = vec![catalog::Op::CreateDatabase {
806            name: plan.name.clone(),
807            owner_id: *session.current_role_id(),
808        }];
809        match self.catalog_transact(Some(session), ops).await {
810            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
811            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
812                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
813            })) if plan.if_not_exists => {
814                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
815                Ok(ExecuteResponse::CreatedDatabase)
816            }
817            Err(err) => Err(err),
818        }
819    }
820
821    #[instrument]
822    pub(super) async fn sequence_create_schema(
823        &mut self,
824        session: &Session,
825        plan: plan::CreateSchemaPlan,
826    ) -> Result<ExecuteResponse, AdapterError> {
827        let op = catalog::Op::CreateSchema {
828            database_id: plan.database_spec,
829            schema_name: plan.schema_name.clone(),
830            owner_id: *session.current_role_id(),
831        };
832        match self.catalog_transact(Some(session), vec![op]).await {
833            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
834            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
835                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
836            })) if plan.if_not_exists => {
837                session.add_notice(AdapterNotice::SchemaAlreadyExists {
838                    name: plan.schema_name,
839                });
840                Ok(ExecuteResponse::CreatedSchema)
841            }
842            Err(err) => Err(err),
843        }
844    }
845
846    /// Validates the role attributes for a `CREATE ROLE` statement.
847    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
848        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
849            if attributes.superuser.is_some() || attributes.password.is_some() {
850                return Err(AdapterError::UnavailableFeature {
851                    feature: "SUPERUSER and PASSWORD attributes".to_string(),
852                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
853                });
854            }
855        }
856        Ok(())
857    }
858
859    #[instrument]
860    pub(super) async fn sequence_create_role(
861        &mut self,
862        conn_id: Option<&ConnectionId>,
863        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
864    ) -> Result<ExecuteResponse, AdapterError> {
865        self.validate_role_attributes(&attributes.clone())?;
866        let op = catalog::Op::CreateRole { name, attributes };
867        self.catalog_transact_with_context(conn_id, None, vec![op])
868            .await
869            .map(|_| ExecuteResponse::CreatedRole)
870    }
871
872    #[instrument]
873    pub(super) async fn sequence_create_network_policy(
874        &mut self,
875        session: &Session,
876        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
877    ) -> Result<ExecuteResponse, AdapterError> {
878        let op = catalog::Op::CreateNetworkPolicy {
879            rules,
880            name,
881            owner_id: *session.current_role_id(),
882        };
883        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
884            .await
885            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
886    }
887
888    #[instrument]
889    pub(super) async fn sequence_alter_network_policy(
890        &mut self,
891        session: &Session,
892        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
893    ) -> Result<ExecuteResponse, AdapterError> {
894        // TODO(network_policy): Consider role based network policies here.
895        let current_network_policy_name =
896            self.catalog().system_config().default_network_policy_name();
897        // Check if the way we're alerting the policy is still valid for the current connection.
898        if current_network_policy_name == name {
899            self.validate_alter_network_policy(session, &rules)?;
900        }
901
902        let op = catalog::Op::AlterNetworkPolicy {
903            id,
904            rules,
905            name,
906            owner_id: *session.current_role_id(),
907        };
908        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
909            .await
910            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
911    }
912
913    #[instrument]
914    pub(super) async fn sequence_create_table(
915        &mut self,
916        ctx: &mut ExecuteContext,
917        plan: plan::CreateTablePlan,
918        resolved_ids: ResolvedIds,
919    ) -> Result<ExecuteResponse, AdapterError> {
920        let plan::CreateTablePlan {
921            name,
922            table,
923            if_not_exists,
924        } = plan;
925
926        let conn_id = if table.temporary {
927            Some(ctx.session().conn_id())
928        } else {
929            None
930        };
931        let (table_id, global_id) = self.allocate_user_id().await?;
932        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
933
934        let data_source = match table.data_source {
935            plan::TableDataSource::TableWrites { defaults } => {
936                TableDataSource::TableWrites { defaults }
937            }
938            plan::TableDataSource::DataSource {
939                desc: data_source_plan,
940                timeline,
941            } => match data_source_plan {
942                plan::DataSourceDesc::IngestionExport {
943                    ingestion_id,
944                    external_reference,
945                    details,
946                    data_config,
947                } => TableDataSource::DataSource {
948                    desc: DataSourceDesc::IngestionExport {
949                        ingestion_id,
950                        external_reference,
951                        details,
952                        data_config,
953                    },
954                    timeline,
955                },
956                plan::DataSourceDesc::Webhook {
957                    validate_using,
958                    body_format,
959                    headers,
960                    cluster_id,
961                } => TableDataSource::DataSource {
962                    desc: DataSourceDesc::Webhook {
963                        validate_using,
964                        body_format,
965                        headers,
966                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
967                    },
968                    timeline,
969                },
970                o => {
971                    unreachable!("CREATE TABLE data source got {:?}", o)
972                }
973            },
974        };
975
976        let is_webhook = if let TableDataSource::DataSource {
977            desc: DataSourceDesc::Webhook { .. },
978            timeline: _,
979        } = &data_source
980        {
981            true
982        } else {
983            false
984        };
985
986        let table = Table {
987            create_sql: Some(table.create_sql),
988            desc: table.desc,
989            collections,
990            conn_id: conn_id.cloned(),
991            resolved_ids,
992            custom_logical_compaction_window: table.compaction_window,
993            is_retained_metrics_object: false,
994            data_source,
995        };
996        let ops = vec![catalog::Op::CreateItem {
997            id: table_id,
998            name: name.clone(),
999            item: CatalogItem::Table(table.clone()),
1000            owner_id: *ctx.session().current_role_id(),
1001        }];
1002
1003        let catalog_result = self
1004            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1005            .await;
1006
1007        if is_webhook {
1008            // try_get_webhook_url will make up a URL for things that are not
1009            // webhooks, so we guard against that here.
1010            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1011                ctx.session()
1012                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1013            }
1014        }
1015
1016        match catalog_result {
1017            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1018            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1019                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1020            })) if if_not_exists => {
1021                ctx.session_mut()
1022                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1023                        name: name.item,
1024                        ty: "table",
1025                    });
1026                Ok(ExecuteResponse::CreatedTable)
1027            }
1028            Err(err) => Err(err),
1029        }
1030    }
1031
1032    #[instrument]
1033    pub(super) async fn sequence_create_sink(
1034        &mut self,
1035        ctx: ExecuteContext,
1036        plan: plan::CreateSinkPlan,
1037        resolved_ids: ResolvedIds,
1038    ) {
1039        let plan::CreateSinkPlan {
1040            name,
1041            sink,
1042            with_snapshot,
1043            if_not_exists,
1044            in_cluster,
1045        } = plan;
1046
1047        // First try to allocate an ID and an OID. If either fails, we're done.
1048        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1049
1050        let catalog_sink = Sink {
1051            create_sql: sink.create_sql,
1052            global_id,
1053            from: sink.from,
1054            connection: sink.connection,
1055            envelope: sink.envelope,
1056            version: sink.version,
1057            with_snapshot,
1058            resolved_ids,
1059            cluster_id: in_cluster,
1060            commit_interval: sink.commit_interval,
1061        };
1062
1063        let ops = vec![catalog::Op::CreateItem {
1064            id: item_id,
1065            name: name.clone(),
1066            item: CatalogItem::Sink(catalog_sink.clone()),
1067            owner_id: *ctx.session().current_role_id(),
1068        }];
1069
1070        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1071
1072        match result {
1073            Ok(()) => {}
1074            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1075                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1076            })) if if_not_exists => {
1077                ctx.session()
1078                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1079                        name: name.item,
1080                        ty: "sink",
1081                    });
1082                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1083                return;
1084            }
1085            Err(e) => {
1086                ctx.retire(Err(e));
1087                return;
1088            }
1089        };
1090
1091        self.create_storage_export(global_id, &catalog_sink)
1092            .await
1093            .unwrap_or_terminate("cannot fail to create exports");
1094
1095        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1096            .await;
1097
1098        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1099    }
1100
1101    /// Validates that a view definition does not contain any expressions that may lead to
1102    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1103    ///
1104    /// We prevent these expressions so that we can add columns to system tables without
1105    /// changing the definition of the view.
1106    ///
1107    /// Here is a bit of a hand wavy proof as to why we only need to check the
1108    /// immediate view definition for system objects and ambiguous column
1109    /// references, and not the entire dependency tree:
1110    ///
1111    ///   - A view with no object references cannot have any ambiguous column
1112    ///   references to a system object, because it has no system objects.
1113    ///   - A view with a direct reference to a system object and a * or
1114    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1115    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1116    ///   any ambiguous column references to a system object, because all column
1117    ///   references are explicitly named.
1118    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1119    ///   system object cannot have any ambiguous column references to a system
1120    ///   object, because there are no system objects in the top level view and
1121    ///   all sub-views are guaranteed to have no ambiguous column references to
1122    ///   system objects.
1123    pub(super) fn validate_system_column_references(
1124        &self,
1125        uses_ambiguous_columns: bool,
1126        depends_on: &BTreeSet<GlobalId>,
1127    ) -> Result<(), AdapterError> {
1128        if uses_ambiguous_columns
1129            && depends_on
1130                .iter()
1131                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1132        {
1133            Err(AdapterError::AmbiguousSystemColumnReference)
1134        } else {
1135            Ok(())
1136        }
1137    }
1138
1139    #[instrument]
1140    pub(super) async fn sequence_create_type(
1141        &mut self,
1142        session: &Session,
1143        plan: plan::CreateTypePlan,
1144        resolved_ids: ResolvedIds,
1145    ) -> Result<ExecuteResponse, AdapterError> {
1146        let (item_id, global_id) = self.allocate_user_id().await?;
1147        // Validate the type definition (e.g., composite columns) before storing.
1148        plan.typ
1149            .inner
1150            .desc(&self.catalog().for_session(session))
1151            .map_err(AdapterError::from)?;
1152        let typ = Type {
1153            create_sql: Some(plan.typ.create_sql),
1154            global_id,
1155            details: CatalogTypeDetails {
1156                array_id: None,
1157                typ: plan.typ.inner,
1158                pg_metadata: None,
1159            },
1160            resolved_ids,
1161        };
1162        let op = catalog::Op::CreateItem {
1163            id: item_id,
1164            name: plan.name,
1165            item: CatalogItem::Type(typ),
1166            owner_id: *session.current_role_id(),
1167        };
1168        match self.catalog_transact(Some(session), vec![op]).await {
1169            Ok(()) => Ok(ExecuteResponse::CreatedType),
1170            Err(err) => Err(err),
1171        }
1172    }
1173
1174    #[instrument]
1175    pub(super) async fn sequence_comment_on(
1176        &mut self,
1177        session: &Session,
1178        plan: plan::CommentPlan,
1179    ) -> Result<ExecuteResponse, AdapterError> {
1180        let op = catalog::Op::Comment {
1181            object_id: plan.object_id,
1182            sub_component: plan.sub_component,
1183            comment: plan.comment,
1184        };
1185        self.catalog_transact(Some(session), vec![op]).await?;
1186        Ok(ExecuteResponse::Comment)
1187    }
1188
1189    #[instrument]
1190    pub(super) async fn sequence_drop_objects(
1191        &mut self,
1192        ctx: &mut ExecuteContext,
1193        plan::DropObjectsPlan {
1194            drop_ids,
1195            object_type,
1196            referenced_ids,
1197        }: plan::DropObjectsPlan,
1198    ) -> Result<ExecuteResponse, AdapterError> {
1199        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1200        let mut objects = Vec::new();
1201        for obj_id in &drop_ids {
1202            if !referenced_ids_hashset.contains(obj_id) {
1203                let object_info = ErrorMessageObjectDescription::from_id(
1204                    obj_id,
1205                    &self.catalog().for_session(ctx.session()),
1206                )
1207                .to_string();
1208                objects.push(object_info);
1209            }
1210        }
1211
1212        if !objects.is_empty() {
1213            ctx.session()
1214                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1215        }
1216
1217        // Collect GlobalIds for expression cache invalidation.
1218        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1219            .iter()
1220            .filter_map(|id| match id {
1221                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1222                _ => None,
1223            })
1224            .flatten()
1225            .collect();
1226
1227        let DropOps {
1228            ops,
1229            dropped_active_db,
1230            dropped_active_cluster,
1231            dropped_in_use_indexes,
1232        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1233
1234        self.catalog_transact_with_context(None, Some(ctx), ops)
1235            .await?;
1236
1237        // Invalidate expression cache entries for dropped objects.
1238        if !expr_cache_invalidate_ids.is_empty() {
1239            let _fut = self.catalog().update_expression_cache(
1240                Default::default(),
1241                Default::default(),
1242                expr_cache_invalidate_ids,
1243            );
1244        }
1245
1246        fail::fail_point!("after_sequencer_drop_replica");
1247
1248        if dropped_active_db {
1249            ctx.session()
1250                .add_notice(AdapterNotice::DroppedActiveDatabase {
1251                    name: ctx.session().vars().database().to_string(),
1252                });
1253        }
1254        if dropped_active_cluster {
1255            ctx.session()
1256                .add_notice(AdapterNotice::DroppedActiveCluster {
1257                    name: ctx.session().vars().cluster().to_string(),
1258                });
1259        }
1260        for dropped_in_use_index in dropped_in_use_indexes {
1261            ctx.session()
1262                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1263            self.metrics
1264                .optimization_notices
1265                .with_label_values(&["DroppedInUseIndex"])
1266                .inc_by(1);
1267        }
1268        Ok(ExecuteResponse::DroppedObject(object_type))
1269    }
1270
1271    fn validate_dropped_role_ownership(
1272        &self,
1273        session: &Session,
1274        dropped_roles: &BTreeMap<RoleId, &str>,
1275    ) -> Result<(), AdapterError> {
1276        fn privilege_check(
1277            privileges: &PrivilegeMap,
1278            dropped_roles: &BTreeMap<RoleId, &str>,
1279            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1280            object_id: &SystemObjectId,
1281            catalog: &ConnCatalog,
1282        ) {
1283            for privilege in privileges.all_values() {
1284                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1285                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1286                    let object_description =
1287                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1288                    dependent_objects
1289                        .entry(role_name.to_string())
1290                        .or_default()
1291                        .push(format!(
1292                            "privileges on {object_description} granted by {grantor_name}",
1293                        ));
1294                }
1295                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1296                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1297                    let object_description =
1298                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1299                    dependent_objects
1300                        .entry(role_name.to_string())
1301                        .or_default()
1302                        .push(format!(
1303                            "privileges granted on {object_description} to {grantee_name}"
1304                        ));
1305                }
1306            }
1307        }
1308
1309        let catalog = self.catalog().for_session(session);
1310        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1311        for entry in self.catalog.entries() {
1312            let id = SystemObjectId::Object(entry.id().into());
1313            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1314                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1315                dependent_objects
1316                    .entry(role_name.to_string())
1317                    .or_default()
1318                    .push(format!("owner of {object_description}"));
1319            }
1320            privilege_check(
1321                entry.privileges(),
1322                dropped_roles,
1323                &mut dependent_objects,
1324                &id,
1325                &catalog,
1326            );
1327        }
1328        for database in self.catalog.databases() {
1329            let database_id = SystemObjectId::Object(database.id().into());
1330            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1331                let object_description =
1332                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1333                dependent_objects
1334                    .entry(role_name.to_string())
1335                    .or_default()
1336                    .push(format!("owner of {object_description}"));
1337            }
1338            privilege_check(
1339                &database.privileges,
1340                dropped_roles,
1341                &mut dependent_objects,
1342                &database_id,
1343                &catalog,
1344            );
1345            for schema in database.schemas_by_id.values() {
1346                let schema_id = SystemObjectId::Object(
1347                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1348                );
1349                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1350                    let object_description =
1351                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1352                    dependent_objects
1353                        .entry(role_name.to_string())
1354                        .or_default()
1355                        .push(format!("owner of {object_description}"));
1356                }
1357                privilege_check(
1358                    &schema.privileges,
1359                    dropped_roles,
1360                    &mut dependent_objects,
1361                    &schema_id,
1362                    &catalog,
1363                );
1364            }
1365        }
1366        for cluster in self.catalog.clusters() {
1367            let cluster_id = SystemObjectId::Object(cluster.id().into());
1368            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1369                let object_description =
1370                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1371                dependent_objects
1372                    .entry(role_name.to_string())
1373                    .or_default()
1374                    .push(format!("owner of {object_description}"));
1375            }
1376            privilege_check(
1377                &cluster.privileges,
1378                dropped_roles,
1379                &mut dependent_objects,
1380                &cluster_id,
1381                &catalog,
1382            );
1383            for replica in cluster.replicas() {
1384                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1385                    let replica_id =
1386                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1387                    let object_description =
1388                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1389                    dependent_objects
1390                        .entry(role_name.to_string())
1391                        .or_default()
1392                        .push(format!("owner of {object_description}"));
1393                }
1394            }
1395        }
1396        privilege_check(
1397            self.catalog().system_privileges(),
1398            dropped_roles,
1399            &mut dependent_objects,
1400            &SystemObjectId::System,
1401            &catalog,
1402        );
1403        for (default_privilege_object, default_privilege_acl_items) in
1404            self.catalog.default_privileges()
1405        {
1406            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1407                dependent_objects
1408                    .entry(role_name.to_string())
1409                    .or_default()
1410                    .push(format!(
1411                        "default privileges on {}S created by {}",
1412                        default_privilege_object.object_type, role_name
1413                    ));
1414            }
1415            for default_privilege_acl_item in default_privilege_acl_items {
1416                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1417                    dependent_objects
1418                        .entry(role_name.to_string())
1419                        .or_default()
1420                        .push(format!(
1421                            "default privileges on {}S granted to {}",
1422                            default_privilege_object.object_type, role_name
1423                        ));
1424                }
1425            }
1426        }
1427
1428        if !dependent_objects.is_empty() {
1429            Err(AdapterError::DependentObject(dependent_objects))
1430        } else {
1431            Ok(())
1432        }
1433    }
1434
1435    #[instrument]
1436    pub(super) async fn sequence_drop_owned(
1437        &mut self,
1438        session: &Session,
1439        plan: plan::DropOwnedPlan,
1440    ) -> Result<ExecuteResponse, AdapterError> {
1441        for role_id in &plan.role_ids {
1442            self.catalog().ensure_not_reserved_role(role_id)?;
1443        }
1444
1445        let mut privilege_revokes = plan.privilege_revokes;
1446
1447        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1448        let session_catalog = self.catalog().for_session(session);
1449        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1450            && !session.is_superuser()
1451        {
1452            // Obtain all roles that the current session is a member of.
1453            let role_membership =
1454                session_catalog.collect_role_membership(session.current_role_id());
1455            let invalid_revokes: BTreeSet<_> = privilege_revokes
1456                .extract_if(.., |(_, privilege)| {
1457                    !role_membership.contains(&privilege.grantor)
1458                })
1459                .map(|(object_id, _)| object_id)
1460                .collect();
1461            for invalid_revoke in invalid_revokes {
1462                let object_description =
1463                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1464                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1465            }
1466        }
1467
1468        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1469            catalog::Op::UpdatePrivilege {
1470                target_id: object_id,
1471                privilege,
1472                variant: UpdatePrivilegeVariant::Revoke,
1473            }
1474        });
1475        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1476            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1477                privilege_object,
1478                privilege_acl_item,
1479                variant: UpdatePrivilegeVariant::Revoke,
1480            },
1481        );
1482        let DropOps {
1483            ops: drop_ops,
1484            dropped_active_db,
1485            dropped_active_cluster,
1486            dropped_in_use_indexes,
1487        } = self.sequence_drop_common(session, plan.drop_ids)?;
1488
1489        let ops = privilege_revoke_ops
1490            .chain(default_privilege_revoke_ops)
1491            .chain(drop_ops.into_iter())
1492            .collect();
1493
1494        self.catalog_transact(Some(session), ops).await?;
1495
1496        if dropped_active_db {
1497            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1498                name: session.vars().database().to_string(),
1499            });
1500        }
1501        if dropped_active_cluster {
1502            session.add_notice(AdapterNotice::DroppedActiveCluster {
1503                name: session.vars().cluster().to_string(),
1504            });
1505        }
1506        for dropped_in_use_index in dropped_in_use_indexes {
1507            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1508        }
1509        Ok(ExecuteResponse::DroppedOwned)
1510    }
1511
1512    fn sequence_drop_common(
1513        &self,
1514        session: &Session,
1515        ids: Vec<ObjectId>,
1516    ) -> Result<DropOps, AdapterError> {
1517        let mut dropped_active_db = false;
1518        let mut dropped_active_cluster = false;
1519        let mut dropped_in_use_indexes = Vec::new();
1520        let mut dropped_roles = BTreeMap::new();
1521        let mut dropped_databases = BTreeSet::new();
1522        let mut dropped_schemas = BTreeSet::new();
1523        // Dropping either the group role or the member role of a role membership will trigger a
1524        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1525        // role membership twice.
1526        let mut role_revokes = BTreeSet::new();
1527        // Dropping a database or a schema will revoke all default roles associated with that
1528        // database or schema.
1529        let mut default_privilege_revokes = BTreeSet::new();
1530
1531        // Clusters we're dropping
1532        let mut clusters_to_drop = BTreeSet::new();
1533
1534        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1535        for id in &ids {
1536            match id {
1537                ObjectId::Database(id) => {
1538                    let name = self.catalog().get_database(id).name();
1539                    if name == session.vars().database() {
1540                        dropped_active_db = true;
1541                    }
1542                    dropped_databases.insert(id);
1543                }
1544                ObjectId::Schema((_, spec)) => {
1545                    if let SchemaSpecifier::Id(id) = spec {
1546                        dropped_schemas.insert(id);
1547                    }
1548                }
1549                ObjectId::Cluster(id) => {
1550                    clusters_to_drop.insert(*id);
1551                    if let Some(active_id) = self
1552                        .catalog()
1553                        .active_cluster(session)
1554                        .ok()
1555                        .map(|cluster| cluster.id())
1556                    {
1557                        if id == &active_id {
1558                            dropped_active_cluster = true;
1559                        }
1560                    }
1561                }
1562                ObjectId::Role(id) => {
1563                    let role = self.catalog().get_role(id);
1564                    let name = role.name();
1565                    dropped_roles.insert(*id, name);
1566                    // We must revoke all role memberships that the dropped roles belongs to.
1567                    for (group_id, grantor_id) in &role.membership.map {
1568                        role_revokes.insert((*group_id, *id, *grantor_id));
1569                    }
1570                }
1571                ObjectId::Item(id) => {
1572                    if let Some(index) = self.catalog().get_entry(id).index() {
1573                        let humanizer = self.catalog().for_session(session);
1574                        let dependants = self
1575                            .controller
1576                            .compute
1577                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1578                            .ok()
1579                            .into_iter()
1580                            .flatten()
1581                            .filter(|dependant_id| {
1582                                // Transient Ids belong to Peeks. We are not interested for now in
1583                                // peeks depending on a dropped index.
1584                                // TODO: show a different notice in this case. Something like
1585                                // "There is an in-progress ad hoc SELECT that uses the dropped
1586                                // index. The resources used by the index will be freed when all
1587                                // such SELECTs complete."
1588                                if dependant_id.is_transient() {
1589                                    return false;
1590                                }
1591                                // The item should exist, but don't panic if it doesn't.
1592                                let Some(dependent_id) = humanizer
1593                                    .try_get_item_by_global_id(dependant_id)
1594                                    .map(|item| item.id())
1595                                else {
1596                                    return false;
1597                                };
1598                                // If the dependent object is also being dropped, then there is no
1599                                // problem, so we don't want a notice.
1600                                !ids_set.contains(&ObjectId::Item(dependent_id))
1601                            })
1602                            .flat_map(|dependant_id| {
1603                                // If we are not able to find a name for this ID it probably means
1604                                // we have already dropped the compute collection, in which case we
1605                                // can ignore it.
1606                                humanizer.humanize_id(dependant_id)
1607                            })
1608                            .collect_vec();
1609                        if !dependants.is_empty() {
1610                            dropped_in_use_indexes.push(DroppedInUseIndex {
1611                                index_name: humanizer
1612                                    .humanize_id(index.global_id())
1613                                    .unwrap_or_else(|| id.to_string()),
1614                                dependant_objects: dependants,
1615                            });
1616                        }
1617                    }
1618                }
1619                _ => {}
1620            }
1621        }
1622
1623        for id in &ids {
1624            match id {
1625                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1626                // unless they are internal replicas, which exist outside the scope
1627                // of managed clusters.
1628                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1629                    if !clusters_to_drop.contains(cluster_id) {
1630                        let cluster = self.catalog.get_cluster(*cluster_id);
1631                        if cluster.is_managed() {
1632                            let replica =
1633                                cluster.replica(*replica_id).expect("Catalog out of sync");
1634                            if !replica.config.location.internal() {
1635                                coord_bail!("cannot drop replica of managed cluster");
1636                            }
1637                        }
1638                    }
1639                }
1640                _ => {}
1641            }
1642        }
1643
1644        for role_id in dropped_roles.keys() {
1645            self.catalog().ensure_not_reserved_role(role_id)?;
1646        }
1647        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1648        // If any role is a member of a dropped role, then we must revoke that membership.
1649        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1650        for role in self.catalog().user_roles() {
1651            for dropped_role_id in
1652                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1653            {
1654                role_revokes.insert((
1655                    **dropped_role_id,
1656                    role.id(),
1657                    *role
1658                        .membership
1659                        .map
1660                        .get(*dropped_role_id)
1661                        .expect("included in keys above"),
1662                ));
1663            }
1664        }
1665
1666        for (default_privilege_object, default_privilege_acls) in
1667            self.catalog().default_privileges()
1668        {
1669            if matches!(
1670                &default_privilege_object.database_id,
1671                Some(database_id) if dropped_databases.contains(database_id),
1672            ) || matches!(
1673                &default_privilege_object.schema_id,
1674                Some(schema_id) if dropped_schemas.contains(schema_id),
1675            ) {
1676                for default_privilege_acl in default_privilege_acls {
1677                    default_privilege_revokes.insert((
1678                        default_privilege_object.clone(),
1679                        default_privilege_acl.clone(),
1680                    ));
1681                }
1682            }
1683        }
1684
1685        let ops = role_revokes
1686            .into_iter()
1687            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1688                role_id,
1689                member_id,
1690                grantor_id,
1691            })
1692            .chain(default_privilege_revokes.into_iter().map(
1693                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1694                    privilege_object,
1695                    privilege_acl_item,
1696                    variant: UpdatePrivilegeVariant::Revoke,
1697                },
1698            ))
1699            .chain(iter::once(catalog::Op::DropObjects(
1700                ids.into_iter()
1701                    .map(DropObjectInfo::manual_drop_from_object_id)
1702                    .collect(),
1703            )))
1704            .collect();
1705
1706        Ok(DropOps {
1707            ops,
1708            dropped_active_db,
1709            dropped_active_cluster,
1710            dropped_in_use_indexes,
1711        })
1712    }
1713
1714    pub(super) fn sequence_explain_schema(
1715        &self,
1716        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1717    ) -> Result<ExecuteResponse, AdapterError> {
1718        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1719            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1720        })?;
1721
1722        let json_string = json_string(&json_value);
1723        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1724        Ok(Self::send_immediate_rows(row))
1725    }
1726
1727    pub(super) fn sequence_show_all_variables(
1728        &self,
1729        session: &Session,
1730    ) -> Result<ExecuteResponse, AdapterError> {
1731        let mut rows = viewable_variables(self.catalog().state(), session)
1732            .map(|v| (v.name(), v.value(), v.description()))
1733            .collect::<Vec<_>>();
1734        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1735
1736        // TODO(parkmycar): Pack all of these into a single RowCollection.
1737        let rows: Vec<_> = rows
1738            .into_iter()
1739            .map(|(name, val, desc)| {
1740                Row::pack_slice(&[
1741                    Datum::String(name),
1742                    Datum::String(&val),
1743                    Datum::String(desc),
1744                ])
1745            })
1746            .collect();
1747        Ok(Self::send_immediate_rows(rows))
1748    }
1749
1750    pub(super) fn sequence_show_variable(
1751        &self,
1752        session: &Session,
1753        plan: plan::ShowVariablePlan,
1754    ) -> Result<ExecuteResponse, AdapterError> {
1755        if &plan.name == SCHEMA_ALIAS {
1756            let schemas = self.catalog.resolve_search_path(session);
1757            let schema = schemas.first();
1758            return match schema {
1759                Some((database_spec, schema_spec)) => {
1760                    let schema_name = &self
1761                        .catalog
1762                        .get_schema(database_spec, schema_spec, session.conn_id())
1763                        .name()
1764                        .schema;
1765                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1766                    Ok(Self::send_immediate_rows(row))
1767                }
1768                None => {
1769                    if session.vars().current_object_missing_warnings() {
1770                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1771                            search_path: session
1772                                .vars()
1773                                .search_path()
1774                                .into_iter()
1775                                .map(|schema| schema.to_string())
1776                                .collect(),
1777                        });
1778                    }
1779                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1780                }
1781            };
1782        }
1783
1784        let variable = session
1785            .vars()
1786            .get(self.catalog().system_config(), &plan.name)
1787            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1788
1789        // In lieu of plumbing the user to all system config functions, just check that the var is
1790        // visible.
1791        variable.visible(session.user(), self.catalog().system_config())?;
1792
1793        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1794        if variable.name() == vars::DATABASE.name()
1795            && matches!(
1796                self.catalog().resolve_database(&variable.value()),
1797                Err(CatalogError::UnknownDatabase(_))
1798            )
1799            && session.vars().current_object_missing_warnings()
1800        {
1801            let name = variable.value();
1802            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1803        } else if variable.name() == vars::CLUSTER.name()
1804            && matches!(
1805                self.catalog().resolve_cluster(&variable.value()),
1806                Err(CatalogError::UnknownCluster(_))
1807            )
1808            && session.vars().current_object_missing_warnings()
1809        {
1810            let name = variable.value();
1811            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1812        }
1813        Ok(Self::send_immediate_rows(row))
1814    }
1815
1816    #[instrument]
1817    pub(super) async fn sequence_inspect_shard(
1818        &self,
1819        session: &Session,
1820        plan: plan::InspectShardPlan,
1821    ) -> Result<ExecuteResponse, AdapterError> {
1822        // TODO: Not thrilled about this rbac special case here, but probably
1823        // sufficient for now.
1824        if !session.user().is_internal() {
1825            return Err(AdapterError::Unauthorized(
1826                rbac::UnauthorizedError::MzSystem {
1827                    action: "inspect".into(),
1828                },
1829            ));
1830        }
1831        let state = self
1832            .controller
1833            .storage
1834            .inspect_persist_state(plan.id)
1835            .await?;
1836        let jsonb = Jsonb::from_serde_json(state)?;
1837        Ok(Self::send_immediate_rows(jsonb.into_row()))
1838    }
1839
1840    #[instrument]
1841    pub(super) fn sequence_set_variable(
1842        &self,
1843        session: &mut Session,
1844        plan: plan::SetVariablePlan,
1845    ) -> Result<ExecuteResponse, AdapterError> {
1846        let (name, local) = (plan.name, plan.local);
1847        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1848            self.validate_set_isolation_level(session)?;
1849        }
1850        if &name == vars::CLUSTER.name() {
1851            self.validate_set_cluster(session)?;
1852        }
1853
1854        let vars = session.vars_mut();
1855        let values = match plan.value {
1856            plan::VariableValue::Default => None,
1857            plan::VariableValue::Values(values) => Some(values),
1858        };
1859
1860        match values {
1861            Some(values) => {
1862                vars.set(
1863                    self.catalog().system_config(),
1864                    &name,
1865                    VarInput::SqlSet(&values),
1866                    local,
1867                )?;
1868
1869                let vars = session.vars();
1870
1871                // Emit a warning when deprecated variables are used.
1872                // TODO(database-issues#8069) remove this after sufficient time has passed
1873                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1874                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1875                } else if name == vars::CLUSTER.name()
1876                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1877                {
1878                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1879                }
1880
1881                // Database or cluster value does not correspond to a catalog item.
1882                if name.as_str() == vars::DATABASE.name()
1883                    && matches!(
1884                        self.catalog().resolve_database(vars.database()),
1885                        Err(CatalogError::UnknownDatabase(_))
1886                    )
1887                    && session.vars().current_object_missing_warnings()
1888                {
1889                    let name = vars.database().to_string();
1890                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1891                } else if name.as_str() == vars::CLUSTER.name()
1892                    && matches!(
1893                        self.catalog().resolve_cluster(vars.cluster()),
1894                        Err(CatalogError::UnknownCluster(_))
1895                    )
1896                    && session.vars().current_object_missing_warnings()
1897                {
1898                    let name = vars.cluster().to_string();
1899                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1900                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1901                    let v = values.into_first().to_lowercase();
1902                    if v == IsolationLevel::ReadUncommitted.as_str()
1903                        || v == IsolationLevel::ReadCommitted.as_str()
1904                        || v == IsolationLevel::RepeatableRead.as_str()
1905                    {
1906                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1907                            isolation_level: v,
1908                        });
1909                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1910                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1911                    }
1912                }
1913            }
1914            None => vars.reset(self.catalog().system_config(), &name, local)?,
1915        }
1916
1917        Ok(ExecuteResponse::SetVariable { name, reset: false })
1918    }
1919
1920    pub(super) fn sequence_reset_variable(
1921        &self,
1922        session: &mut Session,
1923        plan: plan::ResetVariablePlan,
1924    ) -> Result<ExecuteResponse, AdapterError> {
1925        let name = plan.name;
1926        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1927            self.validate_set_isolation_level(session)?;
1928        }
1929        if &name == vars::CLUSTER.name() {
1930            self.validate_set_cluster(session)?;
1931        }
1932        session
1933            .vars_mut()
1934            .reset(self.catalog().system_config(), &name, false)?;
1935        Ok(ExecuteResponse::SetVariable { name, reset: true })
1936    }
1937
1938    pub(super) fn sequence_set_transaction(
1939        &self,
1940        session: &mut Session,
1941        plan: plan::SetTransactionPlan,
1942    ) -> Result<ExecuteResponse, AdapterError> {
1943        // TODO(jkosh44) Only supports isolation levels for now.
1944        for mode in plan.modes {
1945            match mode {
1946                TransactionMode::AccessMode(_) => {
1947                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1948                }
1949                TransactionMode::IsolationLevel(isolation_level) => {
1950                    self.validate_set_isolation_level(session)?;
1951
1952                    session.vars_mut().set(
1953                        self.catalog().system_config(),
1954                        TRANSACTION_ISOLATION_VAR_NAME,
1955                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1956                        plan.local,
1957                    )?
1958                }
1959            }
1960        }
1961        Ok(ExecuteResponse::SetVariable {
1962            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1963            reset: false,
1964        })
1965    }
1966
1967    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1968        if session.transaction().contains_ops() {
1969            Err(AdapterError::InvalidSetIsolationLevel)
1970        } else {
1971            Ok(())
1972        }
1973    }
1974
1975    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1976        if session.transaction().contains_ops() {
1977            Err(AdapterError::InvalidSetCluster)
1978        } else {
1979            Ok(())
1980        }
1981    }
1982
1983    #[instrument]
1984    pub(super) async fn sequence_end_transaction(
1985        &mut self,
1986        mut ctx: ExecuteContext,
1987        mut action: EndTransactionAction,
1988    ) {
1989        // If the transaction has failed, we can only rollback.
1990        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1991            (&action, ctx.session().transaction())
1992        {
1993            action = EndTransactionAction::Rollback;
1994        }
1995        let response = match action {
1996            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
1997                params: BTreeMap::new(),
1998            }),
1999            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2000                params: BTreeMap::new(),
2001            }),
2002        };
2003
2004        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2005
2006        let (response, action) = match result {
2007            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2008                (response, action)
2009            }
2010            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2011                // Make sure we have the correct set of write locks for this transaction.
2012                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2013                // transactions.
2014                let validated_locks = match write_lock_guards {
2015                    None => None,
2016                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2017                        Ok(locks) => Some(locks),
2018                        Err(missing) => {
2019                            tracing::error!(?missing, "programming error, missing write locks");
2020                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2021                        }
2022                    },
2023                };
2024
2025                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2026                for WriteOp { id, rows } in writes {
2027                    let total_rows = collected_writes.entry(id).or_default();
2028                    total_rows.push(rows);
2029                }
2030
2031                self.submit_write(PendingWriteTxn::User {
2032                    span: Span::current(),
2033                    writes: collected_writes,
2034                    write_locks: validated_locks,
2035                    pending_txn: PendingTxn {
2036                        ctx,
2037                        response,
2038                        action,
2039                    },
2040                });
2041                return;
2042            }
2043            Ok((
2044                Some(TransactionOps::Peeks {
2045                    determination,
2046                    requires_linearization: RequireLinearization::Required,
2047                    ..
2048                }),
2049                _,
2050            )) if ctx.session().vars().transaction_isolation()
2051                == &IsolationLevel::StrictSerializable =>
2052            {
2053                let conn_id = ctx.session().conn_id().clone();
2054                let pending_read_txn = PendingReadTxn {
2055                    txn: PendingRead::Read {
2056                        txn: PendingTxn {
2057                            ctx,
2058                            response,
2059                            action,
2060                        },
2061                    },
2062                    timestamp_context: determination.timestamp_context,
2063                    created: Instant::now(),
2064                    num_requeues: 0,
2065                    otel_ctx: OpenTelemetryContext::obtain(),
2066                };
2067                self.strict_serializable_reads_tx
2068                    .send((conn_id, pending_read_txn))
2069                    .expect("sending to strict_serializable_reads_tx cannot fail");
2070                return;
2071            }
2072            Ok((
2073                Some(TransactionOps::Peeks {
2074                    determination,
2075                    requires_linearization: RequireLinearization::Required,
2076                    ..
2077                }),
2078                _,
2079            )) if ctx.session().vars().transaction_isolation()
2080                == &IsolationLevel::StrongSessionSerializable =>
2081            {
2082                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2083                    ctx.session_mut()
2084                        .ensure_timestamp_oracle(timeline.clone())
2085                        .apply_write(*ts);
2086                }
2087                (response, action)
2088            }
2089            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2090                self.internal_cmd_tx
2091                    .send(Message::ExecuteSingleStatementTransaction {
2092                        ctx,
2093                        otel_ctx: OpenTelemetryContext::obtain(),
2094                        stmt,
2095                        params,
2096                    })
2097                    .expect("must send");
2098                return;
2099            }
2100            Ok((_, _)) => (response, action),
2101            Err(err) => (Err(err), EndTransactionAction::Rollback),
2102        };
2103        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2104        // Append any parameters that changed to the response.
2105        let response = response.map(|mut r| {
2106            r.extend_params(changed);
2107            ExecuteResponse::from(r)
2108        });
2109
2110        ctx.retire(response);
2111    }
2112
2113    #[instrument]
2114    async fn sequence_end_transaction_inner(
2115        &mut self,
2116        ctx: &mut ExecuteContext,
2117        action: EndTransactionAction,
2118    ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2119        let txn = self.clear_transaction(ctx.session_mut()).await;
2120
2121        if let EndTransactionAction::Commit = action {
2122            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2123                match &mut ops {
2124                    TransactionOps::Writes(writes) => {
2125                        for WriteOp { id, .. } in &mut writes.iter() {
2126                            // Re-verify this id exists.
2127                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2128                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2129                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2130                                })
2131                            })?;
2132                        }
2133
2134                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2135                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2136                    }
2137                    TransactionOps::DDL {
2138                        ops,
2139                        state: _,
2140                        side_effects,
2141                        revision,
2142                        snapshot: _,
2143                    } => {
2144                        // Make sure our catalog hasn't changed.
2145                        if *revision != self.catalog().transient_revision() {
2146                            return Err(AdapterError::DDLTransactionRace);
2147                        }
2148                        // Commit all of our queued ops.
2149                        let ops = std::mem::take(ops);
2150                        let side_effects = std::mem::take(side_effects);
2151                        self.catalog_transact_with_side_effects(
2152                            Some(ctx),
2153                            ops,
2154                            move |a, mut ctx| {
2155                                Box::pin(async move {
2156                                    for side_effect in side_effects {
2157                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2158                                    }
2159                                })
2160                            },
2161                        )
2162                        .await?;
2163                    }
2164                    _ => (),
2165                }
2166                return Ok((Some(ops), write_lock_guards));
2167            }
2168        }
2169
2170        Ok((None, None))
2171    }
2172
2173    pub(super) async fn sequence_side_effecting_func(
2174        &mut self,
2175        ctx: ExecuteContext,
2176        plan: SideEffectingFunc,
2177    ) {
2178        match plan {
2179            SideEffectingFunc::PgCancelBackend { connection_id } => {
2180                if ctx.session().conn_id().unhandled() == connection_id {
2181                    // As a special case, if we're canceling ourselves, we send
2182                    // back a canceled resposne to the client issuing the query,
2183                    // and so we need to do no further processing of the cancel.
2184                    ctx.retire(Err(AdapterError::Canceled));
2185                    return;
2186                }
2187
2188                let res = if let Some((id_handle, _conn_meta)) =
2189                    self.active_conns.get_key_value(&connection_id)
2190                {
2191                    // check_plan already verified role membership.
2192                    self.handle_privileged_cancel(id_handle.clone()).await;
2193                    Datum::True
2194                } else {
2195                    Datum::False
2196                };
2197                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2198            }
2199        }
2200    }
2201
2202    /// Execute a side-effecting function from the frontend peek path.
2203    /// This is separate from `sequence_side_effecting_func` because
2204    /// - It doesn't have an ExecuteContext.
2205    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2206    ///   peek sequencing can't look at `active_conns`.
2207    ///
2208    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2209    /// sequencing.
2210    pub(crate) async fn execute_side_effecting_func(
2211        &mut self,
2212        plan: SideEffectingFunc,
2213        conn_id: ConnectionId,
2214        current_role: RoleId,
2215    ) -> Result<ExecuteResponse, AdapterError> {
2216        match plan {
2217            SideEffectingFunc::PgCancelBackend { connection_id } => {
2218                if conn_id.unhandled() == connection_id {
2219                    // As a special case, if we're canceling ourselves, we return
2220                    // a canceled response to the client issuing the query,
2221                    // and so we need to do no further processing of the cancel.
2222                    return Err(AdapterError::Canceled);
2223                }
2224
2225                // Perform RBAC check: the current user must be a member of the role
2226                // that owns the connection being cancelled.
2227                if let Some((_id_handle, conn_meta)) =
2228                    self.active_conns.get_key_value(&connection_id)
2229                {
2230                    let target_role = *conn_meta.authenticated_role_id();
2231                    let role_membership = self
2232                        .catalog()
2233                        .state()
2234                        .collect_role_membership(&current_role);
2235                    if !role_membership.contains(&target_role) {
2236                        let target_role_name = self
2237                            .catalog()
2238                            .try_get_role(&target_role)
2239                            .map(|role| role.name().to_string())
2240                            .unwrap_or_else(|| target_role.to_string());
2241                        return Err(AdapterError::Unauthorized(
2242                            rbac::UnauthorizedError::RoleMembership {
2243                                role_names: vec![target_role_name],
2244                            },
2245                        ));
2246                    }
2247
2248                    // RBAC check passed, proceed with cancellation.
2249                    let id_handle = self
2250                        .active_conns
2251                        .get_key_value(&connection_id)
2252                        .map(|(id, _)| id.clone())
2253                        .expect("checked above");
2254                    self.handle_privileged_cancel(id_handle).await;
2255                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2256                } else {
2257                    // Connection not found, return false.
2258                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2259                }
2260            }
2261        }
2262    }
2263
2264    /// Inner method that performs the actual real-time recency timestamp determination.
2265    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2266    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2267    pub(crate) async fn determine_real_time_recent_timestamp(
2268        &self,
2269        source_ids: impl Iterator<Item = GlobalId>,
2270        real_time_recency_timeout: Duration,
2271    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2272        let item_ids = source_ids
2273            .map(|gid| {
2274                self.catalog
2275                    .try_resolve_item_id(&gid)
2276                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2277            })
2278            .collect::<Result<Vec<_>, _>>()?;
2279
2280        // Find all dependencies transitively because we need to ensure that
2281        // RTR queries determine the timestamp from the sources' (i.e.
2282        // storage objects that ingest data from external systems) remap
2283        // data. We "cheat" a little bit and filter out any IDs that aren't
2284        // user objects because we know they are not a RTR source.
2285        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2286        // If none of the sources are user objects, we don't need to provide
2287        // a RTR timestamp.
2288        if to_visit.is_empty() {
2289            return Ok(None);
2290        }
2291
2292        let mut timestamp_objects = BTreeSet::new();
2293
2294        while let Some(id) = to_visit.pop_front() {
2295            timestamp_objects.insert(id);
2296            to_visit.extend(
2297                self.catalog()
2298                    .get_entry(&id)
2299                    .uses()
2300                    .into_iter()
2301                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2302            );
2303        }
2304        let timestamp_objects = timestamp_objects
2305            .into_iter()
2306            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2307            .collect();
2308
2309        let r = self
2310            .controller
2311            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2312            .await?;
2313
2314        Ok(Some(r))
2315    }
2316
2317    pub(crate) async fn await_real_time_recent_timestamp<F>(
2318        catalog: Arc<Catalog>,
2319        fut: F,
2320    ) -> Result<Timestamp, AdapterError>
2321    where
2322        F: Future<Output = Result<Timestamp, StorageError>>,
2323    {
2324        fut.await
2325            .map_err(|error| Self::real_time_recent_timestamp_error(&catalog, error))
2326    }
2327
2328    fn real_time_recent_timestamp_error(catalog: &Catalog, error: StorageError) -> AdapterError {
2329        let rtr_name = |id: &GlobalId| {
2330            catalog
2331                .try_get_entry_by_global_id(id)
2332                .map(|e| e.name().item.clone())
2333                .unwrap_or_else(|| id.to_string())
2334        };
2335
2336        match error {
2337            StorageError::RtrTimeout(id) => AdapterError::RtrTimeout(rtr_name(&id)),
2338            StorageError::RtrDropFailure(id) => AdapterError::RtrDropFailure(rtr_name(&id)),
2339            error => error.into(),
2340        }
2341    }
2342
2343    /// Checks to see if the session needs a real time recency timestamp and if so returns
2344    /// a future that will return the timestamp.
2345    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2346        &self,
2347        session: &Session,
2348        source_ids: impl Iterator<Item = GlobalId>,
2349    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2350        let vars = session.vars();
2351
2352        if vars.real_time_recency()
2353            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2354            && !session.contains_read_timestamp()
2355        {
2356            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2357                .await
2358        } else {
2359            Ok(None)
2360        }
2361    }
2362
2363    #[instrument]
2364    pub(super) async fn sequence_explain_plan(
2365        &mut self,
2366        ctx: ExecuteContext,
2367        plan: plan::ExplainPlanPlan,
2368        target_cluster: TargetCluster,
2369    ) {
2370        match &plan.explainee {
2371            plan::Explainee::Statement(stmt) => match stmt {
2372                plan::ExplaineeStatement::CreateView { .. } => {
2373                    self.explain_create_view(ctx, plan).await;
2374                }
2375                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2376                    self.explain_create_materialized_view(ctx, plan).await;
2377                }
2378                plan::ExplaineeStatement::CreateIndex { .. } => {
2379                    self.explain_create_index(ctx, plan).await;
2380                }
2381                plan::ExplaineeStatement::Select { .. } => {
2382                    self.explain_peek(ctx, plan, target_cluster).await;
2383                }
2384                plan::ExplaineeStatement::Subscribe { .. } => {
2385                    self.explain_subscribe(ctx, plan, target_cluster).await;
2386                }
2387            },
2388            plan::Explainee::View(_) => {
2389                let result = self.explain_view(&ctx, plan);
2390                ctx.retire(result);
2391            }
2392            plan::Explainee::MaterializedView(_) => {
2393                let result = self.explain_materialized_view(&ctx, plan);
2394                ctx.retire(result);
2395            }
2396            plan::Explainee::Index(_) => {
2397                let result = self.explain_index(&ctx, plan);
2398                ctx.retire(result);
2399            }
2400            plan::Explainee::ReplanView(_) => {
2401                self.explain_replan_view(ctx, plan).await;
2402            }
2403            plan::Explainee::ReplanMaterializedView(_) => {
2404                self.explain_replan_materialized_view(ctx, plan).await;
2405            }
2406            plan::Explainee::ReplanIndex(_) => {
2407                self.explain_replan_index(ctx, plan).await;
2408            }
2409        };
2410    }
2411
2412    pub(super) async fn sequence_explain_pushdown(
2413        &mut self,
2414        ctx: ExecuteContext,
2415        plan: plan::ExplainPushdownPlan,
2416        target_cluster: TargetCluster,
2417    ) {
2418        match plan.explainee {
2419            Explainee::Statement(ExplaineeStatement::Select {
2420                broken: false,
2421                plan,
2422                desc: _,
2423            }) => {
2424                let stage = return_if_err!(
2425                    self.peek_validate(
2426                        ctx.session(),
2427                        plan,
2428                        target_cluster,
2429                        None,
2430                        ExplainContext::Pushdown,
2431                        Some(ctx.session().vars().max_query_result_size()),
2432                    ),
2433                    ctx
2434                );
2435                self.sequence_staged(ctx, Span::current(), stage).await;
2436            }
2437            Explainee::MaterializedView(item_id) => {
2438                self.explain_pushdown_materialized_view(ctx, item_id).await;
2439            }
2440            _ => {
2441                ctx.retire(Err(AdapterError::Unsupported(
2442                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2443                )));
2444            }
2445        };
2446    }
2447
2448    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2449    async fn execute_explain_pushdown_with_read_holds(
2450        &self,
2451        ctx: ExecuteContext,
2452        as_of: Antichain<Timestamp>,
2453        mz_now: ResultSpec<'static>,
2454        read_holds: Option<ReadHolds>,
2455        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2456    ) {
2457        let fut = self
2458            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2459            .await;
2460        task::spawn(|| "render explain pushdown", async move {
2461            // Transfer the necessary read holds over to the background task
2462            let _read_holds = read_holds;
2463            let res = fut.await;
2464            ctx.retire(res);
2465        });
2466    }
2467
2468    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2469    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2470        &self,
2471        session: &Session,
2472        as_of: Antichain<Timestamp>,
2473        mz_now: ResultSpec<'static>,
2474        imports: I,
2475    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2476        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2477        super::explain_pushdown_future_inner(
2478            session,
2479            &self.catalog,
2480            &self.controller.storage_collections,
2481            as_of,
2482            mz_now,
2483            imports,
2484        )
2485        .await
2486    }
2487
2488    #[instrument]
2489    pub(super) async fn sequence_insert(
2490        &mut self,
2491        mut ctx: ExecuteContext,
2492        plan: plan::InsertPlan,
2493    ) {
2494        // Normally, this would get checked when trying to add "write ops" to
2495        // the transaction but we go down diverging paths below, based on
2496        // whether the INSERT is only constant values or not.
2497        //
2498        // For the non-constant case we sequence an implicit read-then-write,
2499        // which messes with the transaction ops and would allow an implicit
2500        // read-then-write to sneak into a read-only transaction.
2501        if !ctx.session_mut().transaction().allows_writes() {
2502            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2503            return;
2504        }
2505
2506        // The structure of this code originates from a time where
2507        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2508        // optimized `MirRelationExpr`.
2509        //
2510        // Ideally, we would like to make the `selection.as_const().is_some()`
2511        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2512        // are planned as a Wrap($n, $vals) call, so until we can reduce
2513        // HirRelationExpr this will always returns false.
2514        //
2515        // Unfortunately, hitting the default path of the match below also
2516        // causes a lot of tests to fail, so we opted to go with the extra
2517        // `plan.values.clone()` statements when producing the `optimized_mir`
2518        // and re-optimize the values in the `sequence_read_then_write` call.
2519        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2520            // We don't perform any optimizations on an expression that is already
2521            // a constant for writes, as we want to maximize bulk-insert throughput.
2522            let expr = return_if_err!(
2523                plan.values
2524                    .clone()
2525                    .lower(self.catalog().system_config(), None),
2526                ctx
2527            );
2528            OptimizedMirRelationExpr(expr)
2529        } else {
2530            // Collect optimizer parameters.
2531            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2532
2533            // (`optimize::view::Optimizer` has a special case for constant queries.)
2534            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2535
2536            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2537            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2538        };
2539
2540        match optimized_mir.into_inner() {
2541            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2542                let catalog = self.owned_catalog();
2543                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2544                    let result =
2545                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2546                    ctx.retire(result);
2547                });
2548            }
2549            // All non-constant values must be planned as read-then-writes.
2550            _ => {
2551                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2552                    Some(table) => {
2553                        // Inserts always occur at the latest version of the table.
2554                        let desc = table.relation_desc_latest().expect("table has a desc");
2555                        desc.arity()
2556                    }
2557                    None => {
2558                        ctx.retire(Err(AdapterError::Catalog(
2559                            mz_catalog::memory::error::Error {
2560                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2561                                    plan.id.to_string(),
2562                                )),
2563                            },
2564                        )));
2565                        return;
2566                    }
2567                };
2568
2569                let finishing = RowSetFinishing {
2570                    order_by: vec![],
2571                    limit: None,
2572                    offset: 0,
2573                    project: (0..desc_arity).collect(),
2574                };
2575
2576                let read_then_write_plan = plan::ReadThenWritePlan {
2577                    id: plan.id,
2578                    selection: plan.values,
2579                    finishing,
2580                    assignments: BTreeMap::new(),
2581                    kind: MutationKind::Insert,
2582                    returning: plan.returning,
2583                };
2584
2585                self.sequence_read_then_write(ctx, read_then_write_plan)
2586                    .await;
2587            }
2588        }
2589    }
2590
2591    /// ReadThenWrite is a plan whose writes depend on the results of a
2592    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2593    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2594    /// serializability violation could occur.
2595    #[instrument]
2596    pub(super) async fn sequence_read_then_write(
2597        &mut self,
2598        mut ctx: ExecuteContext,
2599        plan: plan::ReadThenWritePlan,
2600    ) {
2601        let mut source_ids: BTreeSet<_> = plan
2602            .selection
2603            .depends_on()
2604            .into_iter()
2605            .map(|gid| self.catalog().resolve_item_id(&gid))
2606            .collect();
2607        source_ids.insert(plan.id);
2608
2609        // If the transaction doesn't already have write locks, acquire them.
2610        if ctx.session().transaction().write_locks().is_none() {
2611            // Pre-define all of the locks we need.
2612            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2613
2614            // Try acquiring all of our locks.
2615            for id in &source_ids {
2616                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2617                    write_locks.insert_lock(*id, lock);
2618                }
2619            }
2620
2621            // See if we acquired all of the neccessary locks.
2622            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2623                Ok(locks) => locks,
2624                Err(missing) => {
2625                    // Defer our write if we couldn't acquire all of the locks.
2626                    let role_metadata = ctx.session().role_metadata().clone();
2627                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2628                    let plan = DeferredPlan {
2629                        ctx,
2630                        plan: Plan::ReadThenWrite(plan),
2631                        validity: PlanValidity::new(
2632                            self.catalog.transient_revision(),
2633                            source_ids.clone(),
2634                            None,
2635                            None,
2636                            role_metadata,
2637                        ),
2638                        requires_locks: source_ids,
2639                    };
2640                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2641                }
2642            };
2643
2644            ctx.session_mut()
2645                .try_grant_write_locks(write_locks)
2646                .expect("session has already been granted write locks");
2647        }
2648
2649        let plan::ReadThenWritePlan {
2650            id,
2651            kind,
2652            selection,
2653            mut assignments,
2654            finishing,
2655            mut returning,
2656        } = plan;
2657
2658        // Read then writes can be queued, so re-verify the id exists.
2659        let desc = match self.catalog().try_get_entry(&id) {
2660            Some(table) => {
2661                // Inserts always occur at the latest version of the table.
2662                table
2663                    .relation_desc_latest()
2664                    .expect("table has a desc")
2665                    .into_owned()
2666            }
2667            None => {
2668                ctx.retire(Err(AdapterError::Catalog(
2669                    mz_catalog::memory::error::Error {
2670                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2671                    },
2672                )));
2673                return;
2674            }
2675        };
2676
2677        // Disallow mz_now in any position because read time and write time differ.
2678        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2679            || assignments.values().any(|e| e.contains_temporal())
2680            || returning.iter().any(|e| e.contains_temporal());
2681        if contains_temporal {
2682            ctx.retire(Err(AdapterError::Unsupported(
2683                "calls to mz_now in write statements",
2684            )));
2685            return;
2686        }
2687
2688        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations.
2689        for gid in selection.depends_on() {
2690            let item_id = self.catalog().resolve_item_id(&gid);
2691            if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2692                ctx.retire(Err(err));
2693                return;
2694            }
2695        }
2696
2697        let (peek_tx, peek_rx) = oneshot::channel();
2698        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2699        let (tx, _, session, extra) = ctx.into_parts();
2700        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2701        // execution context, because this peek does not directly correspond to an execute,
2702        // and so we don't need to take any action on its retirement.
2703        // TODO[btv]: we might consider extending statement logging to log the inner
2704        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2705        // and mint a new "real" execution context here. We'd also have to add some logic to
2706        // make sure such "sub-statements" are always sampled when the top-level statement is
2707        //
2708        // It's debatable whether this makes sense conceptually,
2709        // because the inner fragment here is not actually a
2710        // "statement" in its own right.
2711        let peek_ctx = ExecuteContext::from_parts(
2712            peek_client_tx,
2713            self.internal_cmd_tx.clone(),
2714            session,
2715            Default::default(),
2716        );
2717
2718        self.sequence_peek(
2719            peek_ctx,
2720            plan::SelectPlan {
2721                select: None,
2722                source: selection,
2723                when: QueryWhen::FreshestTableWrite,
2724                finishing,
2725                copy_to: None,
2726            },
2727            TargetCluster::Active,
2728            None,
2729        )
2730        .await;
2731
2732        let internal_cmd_tx = self.internal_cmd_tx.clone();
2733        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2734        let catalog = self.owned_catalog();
2735        let max_result_size = self.catalog().system_config().max_result_size();
2736
2737        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2738            let (peek_response, session) = match peek_rx.await {
2739                Ok(Response {
2740                    result: Ok(resp),
2741                    session,
2742                    otel_ctx,
2743                }) => {
2744                    otel_ctx.attach_as_parent();
2745                    (resp, session)
2746                }
2747                Ok(Response {
2748                    result: Err(e),
2749                    session,
2750                    otel_ctx,
2751                }) => {
2752                    let ctx =
2753                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2754                    otel_ctx.attach_as_parent();
2755                    ctx.retire(Err(e));
2756                    return;
2757                }
2758                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2759                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2760            };
2761            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2762            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2763
2764            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2765            if timeout_dur == Duration::ZERO {
2766                timeout_dur = Duration::MAX;
2767            }
2768
2769            let style = ExprPrepOneShot {
2770                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2771                session: ctx.session(),
2772                catalog_state: catalog.state(),
2773            };
2774            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2775                return_if_err!(style.prep_scalar_expr(expr), ctx);
2776            }
2777
2778            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2779                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2780                    let arena = RowArena::new();
2781                    let mut diffs = Vec::new();
2782                    let mut datum_vec = mz_repr::DatumVec::new();
2783
2784                    while let Some(row) = rows.next() {
2785                        if !assignments.is_empty() {
2786                            assert!(
2787                                matches!(kind, MutationKind::Update),
2788                                "only updates support assignments"
2789                            );
2790                            let mut datums = datum_vec.borrow_with(row);
2791                            let mut updates = vec![];
2792                            for (idx, expr) in &assignments {
2793                                let updated = match expr.eval(&datums, &arena) {
2794                                    Ok(updated) => updated,
2795                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2796                                };
2797                                updates.push((*idx, updated));
2798                            }
2799                            for (idx, new_value) in updates {
2800                                datums[idx] = new_value;
2801                            }
2802                            let updated = Row::pack_slice(&datums);
2803                            diffs.push((updated, Diff::ONE));
2804                        }
2805                        match kind {
2806                            // Updates and deletes always remove the
2807                            // current row. Updates will also add an
2808                            // updated value.
2809                            MutationKind::Update | MutationKind::Delete => {
2810                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2811                            }
2812                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2813                        }
2814                    }
2815
2816                    // Sum of all the rows' byte size, for checking if we go
2817                    // above the max_result_size threshold.
2818                    let mut byte_size: u64 = 0;
2819                    for (row, diff) in &diffs {
2820                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2821                        if diff.is_positive() {
2822                            for (idx, datum) in row.iter().enumerate() {
2823                                desc.constraints_met(idx, &datum)?;
2824                            }
2825                        }
2826                    }
2827                    Ok((diffs, byte_size))
2828                };
2829
2830            let diffs = match peek_response {
2831                ExecuteResponse::SendingRowsStreaming {
2832                    rows: mut rows_stream,
2833                    ..
2834                } => {
2835                    let mut byte_size: u64 = 0;
2836                    let mut diffs = Vec::new();
2837                    let result = loop {
2838                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2839                            Ok(Some(res)) => match res {
2840                                PeekResponseUnary::Rows(new_rows) => {
2841                                    match make_diffs(new_rows) {
2842                                        Ok((mut new_diffs, new_byte_size)) => {
2843                                            byte_size = byte_size.saturating_add(new_byte_size);
2844                                            if byte_size > max_result_size {
2845                                                break Err(AdapterError::ResultSize(format!(
2846                                                    "result exceeds max size of {max_result_size}"
2847                                                )));
2848                                            }
2849                                            diffs.append(&mut new_diffs)
2850                                        }
2851                                        Err(e) => break Err(e),
2852                                    };
2853                                }
2854                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2855                                PeekResponseUnary::Error(e) => {
2856                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2857                                }
2858                                PeekResponseUnary::DependencyDropped(dep) => {
2859                                    break Err(dep.to_concurrent_dependency_drop());
2860                                }
2861                            },
2862                            Ok(None) => break Ok(diffs),
2863                            Err(_) => {
2864                                // We timed out, so remove the pending peek. This is
2865                                // best-effort and doesn't guarantee we won't
2866                                // receive a response.
2867                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2868                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2869                                    conn_id: ctx.session().conn_id().clone(),
2870                                });
2871                                if let Err(e) = result {
2872                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2873                                }
2874                                break Err(AdapterError::StatementTimeout);
2875                            }
2876                        }
2877                    };
2878
2879                    result
2880                }
2881                ExecuteResponse::SendingRowsImmediate { rows } => {
2882                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2883                }
2884                resp => Err(AdapterError::Unstructured(anyhow!(
2885                    "unexpected peek response: {resp:?}"
2886                ))),
2887            };
2888
2889            let mut returning_rows = Vec::new();
2890            let mut diff_err: Option<AdapterError> = None;
2891            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2892                let arena = RowArena::new();
2893                for (row, diff) in diffs {
2894                    if !diff.is_positive() {
2895                        continue;
2896                    }
2897                    let mut returning_row = Row::with_capacity(returning.len());
2898                    let mut packer = returning_row.packer();
2899                    for expr in &returning {
2900                        let datums: Vec<_> = row.iter().collect();
2901                        match expr.eval(&datums, &arena) {
2902                            Ok(datum) => {
2903                                packer.push(datum);
2904                            }
2905                            Err(err) => {
2906                                diff_err = Some(err.into());
2907                                break;
2908                            }
2909                        }
2910                    }
2911                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2912                    let diff = match NonZeroUsize::try_from(diff) {
2913                        Ok(diff) => diff,
2914                        Err(err) => {
2915                            diff_err = Some(err.into());
2916                            break;
2917                        }
2918                    };
2919                    returning_rows.push((returning_row, diff));
2920                    if diff_err.is_some() {
2921                        break;
2922                    }
2923                }
2924            }
2925            let diffs = if let Some(err) = diff_err {
2926                Err(err)
2927            } else {
2928                diffs
2929            };
2930
2931            // We need to clear out the timestamp context so the write doesn't fail due to a
2932            // read only transaction.
2933            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2934            // No matter what isolation level the client is using, we must linearize this
2935            // read. The write will be performed right after this, as part of a single
2936            // transaction, so the write must have a timestamp greater than or equal to the
2937            // read.
2938            //
2939            // Note: It's only OK for the write to have a greater timestamp than the read
2940            // because the write lock prevents any other writes from happening in between
2941            // the read and write.
2942            if let Some(timestamp_context) = timestamp_context {
2943                let (tx, rx) = tokio::sync::oneshot::channel();
2944                let conn_id = ctx.session().conn_id().clone();
2945                let pending_read_txn = PendingReadTxn {
2946                    txn: PendingRead::ReadThenWrite { ctx, tx },
2947                    timestamp_context,
2948                    created: Instant::now(),
2949                    num_requeues: 0,
2950                    otel_ctx: OpenTelemetryContext::obtain(),
2951                };
2952                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2953                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
2954                if let Err(e) = result {
2955                    warn!(
2956                        "strict_serializable_reads_tx dropped before we could send: {:?}",
2957                        e
2958                    );
2959                    return;
2960                }
2961                let result = rx.await;
2962                // It is not an error for these results to be ready after `tx` has been dropped.
2963                ctx = match result {
2964                    Ok(Some(ctx)) => ctx,
2965                    Ok(None) => {
2966                        // Coordinator took our context and will handle responding to the client.
2967                        // This usually indicates that our transaction was aborted.
2968                        return;
2969                    }
2970                    Err(e) => {
2971                        warn!(
2972                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2973                            e
2974                        );
2975                        return;
2976                    }
2977                };
2978            }
2979
2980            match diffs {
2981                Ok(diffs) => {
2982                    let result = Self::send_diffs(
2983                        ctx.session_mut(),
2984                        plan::SendDiffsPlan {
2985                            id,
2986                            updates: diffs,
2987                            kind,
2988                            returning: returning_rows,
2989                            max_result_size,
2990                        },
2991                    );
2992                    ctx.retire(result);
2993                }
2994                Err(e) => {
2995                    ctx.retire(Err(e));
2996                }
2997            }
2998        });
2999    }
3000
3001    #[instrument]
3002    pub(super) async fn sequence_alter_item_rename(
3003        &mut self,
3004        ctx: &mut ExecuteContext,
3005        plan: plan::AlterItemRenamePlan,
3006    ) -> Result<ExecuteResponse, AdapterError> {
3007        let op = catalog::Op::RenameItem {
3008            id: plan.id,
3009            current_full_name: plan.current_full_name,
3010            to_name: plan.to_name,
3011        };
3012        match self
3013            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3014            .await
3015        {
3016            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3017            Err(err) => Err(err),
3018        }
3019    }
3020
3021    #[instrument]
3022    pub(super) async fn sequence_alter_retain_history(
3023        &mut self,
3024        ctx: &mut ExecuteContext,
3025        plan: plan::AlterRetainHistoryPlan,
3026    ) -> Result<ExecuteResponse, AdapterError> {
3027        let ops = vec![catalog::Op::AlterRetainHistory {
3028            id: plan.id,
3029            value: plan.value,
3030            window: plan.window,
3031        }];
3032        self.catalog_transact_with_context(None, Some(ctx), ops)
3033            .await?;
3034        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3035    }
3036
3037    #[instrument]
3038    pub(super) async fn sequence_alter_source_timestamp_interval(
3039        &mut self,
3040        ctx: &mut ExecuteContext,
3041        plan: plan::AlterSourceTimestampIntervalPlan,
3042    ) -> Result<ExecuteResponse, AdapterError> {
3043        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3044            id: plan.id,
3045            value: plan.value,
3046            interval: plan.interval,
3047        }];
3048        self.catalog_transact_with_context(None, Some(ctx), ops)
3049            .await?;
3050        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3051    }
3052
3053    #[instrument]
3054    pub(super) async fn sequence_alter_schema_rename(
3055        &mut self,
3056        ctx: &mut ExecuteContext,
3057        plan: plan::AlterSchemaRenamePlan,
3058    ) -> Result<ExecuteResponse, AdapterError> {
3059        let (database_spec, schema_spec) = plan.cur_schema_spec;
3060        let op = catalog::Op::RenameSchema {
3061            database_spec,
3062            schema_spec,
3063            new_name: plan.new_schema_name,
3064            check_reserved_names: true,
3065        };
3066        match self
3067            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3068            .await
3069        {
3070            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3071            Err(err) => Err(err),
3072        }
3073    }
3074
3075    #[instrument]
3076    pub(super) async fn sequence_alter_schema_swap(
3077        &mut self,
3078        ctx: &mut ExecuteContext,
3079        plan: plan::AlterSchemaSwapPlan,
3080    ) -> Result<ExecuteResponse, AdapterError> {
3081        let plan::AlterSchemaSwapPlan {
3082            schema_a_spec: (schema_a_db, schema_a),
3083            schema_a_name,
3084            schema_b_spec: (schema_b_db, schema_b),
3085            schema_b_name,
3086            name_temp,
3087        } = plan;
3088
3089        let op_a = catalog::Op::RenameSchema {
3090            database_spec: schema_a_db,
3091            schema_spec: schema_a,
3092            new_name: name_temp,
3093            check_reserved_names: false,
3094        };
3095        let op_b = catalog::Op::RenameSchema {
3096            database_spec: schema_b_db,
3097            schema_spec: schema_b,
3098            new_name: schema_a_name,
3099            check_reserved_names: false,
3100        };
3101        let op_c = catalog::Op::RenameSchema {
3102            database_spec: schema_a_db,
3103            schema_spec: schema_a,
3104            new_name: schema_b_name,
3105            check_reserved_names: false,
3106        };
3107
3108        match self
3109            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3110                Box::pin(async {})
3111            })
3112            .await
3113        {
3114            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3115            Err(err) => Err(err),
3116        }
3117    }
3118
3119    #[instrument]
3120    pub(super) async fn sequence_alter_role(
3121        &mut self,
3122        session: &Session,
3123        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3124    ) -> Result<ExecuteResponse, AdapterError> {
3125        let catalog = self.catalog().for_session(session);
3126        let role = catalog.get_role(&id);
3127
3128        // We'll send these notices to the user, if the operation is successful.
3129        let mut notices = vec![];
3130
3131        // Get the attributes and variables from the role, as they currently are.
3132        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3133        let mut vars = role.vars().clone();
3134
3135        // Whether to set the password to NULL. This is a special case since the existing
3136        // password is not stored in the role attributes.
3137        let mut nopassword = false;
3138
3139        // Apply our updates.
3140        match option {
3141            PlannedAlterRoleOption::Attributes(attrs) => {
3142                self.validate_role_attributes(&attrs.clone().into())?;
3143
3144                if let Some(inherit) = attrs.inherit {
3145                    attributes.inherit = inherit;
3146                }
3147
3148                if let Some(password) = attrs.password {
3149                    attributes.password = Some(password);
3150                    attributes.scram_iterations =
3151                        Some(self.catalog().system_config().scram_iterations())
3152                }
3153
3154                if let Some(superuser) = attrs.superuser {
3155                    attributes.superuser = Some(superuser);
3156                }
3157
3158                if let Some(login) = attrs.login {
3159                    attributes.login = Some(login);
3160                }
3161
3162                if attrs.nopassword.unwrap_or(false) {
3163                    nopassword = true;
3164                }
3165
3166                if let Some(notice) = self.should_emit_rbac_notice(session) {
3167                    notices.push(notice);
3168                }
3169            }
3170            PlannedAlterRoleOption::Variable(variable) => {
3171                // Get the variable to make sure it's valid and visible.
3172                let session_var = session.vars().inspect(variable.name())?;
3173                // Return early if it's not visible.
3174                session_var.visible(session.user(), catalog.system_vars())?;
3175
3176                // Emit a warning when deprecated variables are used.
3177                // TODO(database-issues#8069) remove this after sufficient time has passed
3178                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3179                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3180                } else if let PlannedRoleVariable::Set {
3181                    name,
3182                    value: VariableValue::Values(vals),
3183                } = &variable
3184                {
3185                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3186                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3187                    }
3188                }
3189
3190                let var_name = match variable {
3191                    PlannedRoleVariable::Set { name, value } => {
3192                        // Update our persisted set.
3193                        match &value {
3194                            VariableValue::Default => {
3195                                vars.remove(&name);
3196                            }
3197                            VariableValue::Values(vals) => {
3198                                let var = match &vals[..] {
3199                                    [val] => OwnedVarInput::Flat(val.clone()),
3200                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3201                                };
3202                                // Make sure the input is valid.
3203                                session_var.check(var.borrow())?;
3204
3205                                vars.insert(name.clone(), var);
3206                            }
3207                        };
3208                        name
3209                    }
3210                    PlannedRoleVariable::Reset { name } => {
3211                        // Remove it from our persisted values.
3212                        vars.remove(&name);
3213                        name
3214                    }
3215                };
3216
3217                // Emit a notice that they need to reconnect to see the change take effect.
3218                notices.push(AdapterNotice::VarDefaultUpdated {
3219                    role: Some(name.clone()),
3220                    var_name: Some(var_name),
3221                });
3222            }
3223        }
3224
3225        let op = catalog::Op::AlterRole {
3226            id,
3227            name,
3228            attributes,
3229            nopassword,
3230            vars: RoleVars { map: vars },
3231        };
3232        let response = self
3233            .catalog_transact(Some(session), vec![op])
3234            .await
3235            .map(|_| ExecuteResponse::AlteredRole)?;
3236
3237        // Send all of our queued notices.
3238        session.add_notices(notices);
3239
3240        Ok(response)
3241    }
3242
3243    #[instrument]
3244    pub(super) async fn sequence_alter_sink_prepare(
3245        &mut self,
3246        ctx: ExecuteContext,
3247        plan: plan::AlterSinkPlan,
3248    ) {
3249        // Put a read hold on the new relation
3250        let id_bundle = crate::CollectionIdBundle {
3251            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3252            compute_ids: BTreeMap::new(),
3253        };
3254        let read_hold = self.acquire_read_holds(&id_bundle);
3255
3256        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3257            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3258            return;
3259        };
3260
3261        let otel_ctx = OpenTelemetryContext::obtain();
3262        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3263
3264        let plan_validity = PlanValidity::new(
3265            self.catalog().transient_revision(),
3266            BTreeSet::from_iter([plan.item_id, from_item_id]),
3267            Some(plan.in_cluster),
3268            None,
3269            ctx.session().role_metadata().clone(),
3270        );
3271
3272        info!(
3273            "preparing alter sink for {}: frontiers={:?} export={:?}",
3274            plan.global_id,
3275            self.controller
3276                .storage_collections
3277                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3278            self.controller.storage.export(plan.global_id)
3279        );
3280
3281        // Now we must wait for the sink to make enough progress such that there is overlap between
3282        // the new `from` collection's read hold and the sink's write frontier.
3283        //
3284        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3285        // the watch set never completes and neither does the `ALTER SINK` command.
3286        self.install_storage_watch_set(
3287            ctx.session().conn_id().clone(),
3288            BTreeSet::from_iter([plan.global_id]),
3289            read_ts,
3290            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3291                ctx: Some(ctx),
3292                otel_ctx,
3293                plan,
3294                plan_validity,
3295                read_hold,
3296            }),
3297        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3298    }
3299
3300    #[instrument]
3301    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3302        ctx.otel_ctx.attach_as_parent();
3303
3304        let plan::AlterSinkPlan {
3305            item_id,
3306            global_id,
3307            sink: sink_plan,
3308            with_snapshot,
3309            in_cluster,
3310        } = ctx.plan.clone();
3311
3312        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3313        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3314        // arbitrarily changed since we performed planning, and we must re-assert that it still
3315        // matches our requirements.
3316        //
3317        // The `PlanValidity` check ensures that both the sink and the new source relation still
3318        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3319        // time, which we do by comparing the catalog sink version to the one in the plan.
3320        match ctx.plan_validity.check(self.catalog()) {
3321            Ok(()) => {}
3322            Err(err) => {
3323                ctx.retire(Err(err));
3324                return;
3325            }
3326        }
3327
3328        let entry = self.catalog().get_entry(&item_id);
3329        let CatalogItem::Sink(old_sink) = entry.item() else {
3330            panic!("invalid item kind for `AlterSinkPlan`");
3331        };
3332
3333        if sink_plan.version != old_sink.version + 1 {
3334            ctx.retire(Err(AdapterError::ChangedPlan(
3335                "sink was altered concurrently".into(),
3336            )));
3337            return;
3338        }
3339
3340        info!(
3341            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3342            self.controller
3343                .storage_collections
3344                .collections_frontiers(vec![global_id, sink_plan.from]),
3345            self.controller.storage.export(global_id),
3346        );
3347
3348        // Assert that we can recover the updates that happened at the timestamps of the write
3349        // frontier. This must be true in this call.
3350        let write_frontier = &self
3351            .controller
3352            .storage
3353            .export(global_id)
3354            .expect("sink known to exist")
3355            .write_frontier;
3356        let as_of = ctx.read_hold.least_valid_read();
3357        assert!(
3358            write_frontier.iter().all(|t| as_of.less_than(t)),
3359            "{:?} should be strictly less than {:?}",
3360            &*as_of,
3361            &**write_frontier
3362        );
3363
3364        // Parse the `create_sql` so we can update it to the new sink definition.
3365        //
3366        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3367        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3368        // names in the `create_sql` may have changed, for example due to a schema swap.
3369        let create_sql = &old_sink.create_sql;
3370        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3371        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3372            unreachable!("invalid statement kind for sink");
3373        };
3374
3375        // Update the sink version.
3376        stmt.with_options
3377            .retain(|o| o.name != CreateSinkOptionName::Version);
3378        stmt.with_options.push(CreateSinkOption {
3379            name: CreateSinkOptionName::Version,
3380            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3381                sink_plan.version.to_string(),
3382            ))),
3383        });
3384
3385        let conn_catalog = self.catalog().for_system_session();
3386        let (mut stmt, resolved_ids) =
3387            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3388
3389        // Update the `from` relation.
3390        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3391        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3392        stmt.from = ResolvedItemName::Item {
3393            id: from_entry.id(),
3394            qualifiers: from_entry.name.qualifiers.clone(),
3395            full_name,
3396            print_id: true,
3397            version: from_entry.version,
3398        };
3399
3400        let new_sink = Sink {
3401            create_sql: stmt.to_ast_string_stable(),
3402            global_id,
3403            from: sink_plan.from,
3404            connection: sink_plan.connection.clone(),
3405            envelope: sink_plan.envelope,
3406            version: sink_plan.version,
3407            with_snapshot,
3408            resolved_ids: resolved_ids.clone(),
3409            cluster_id: in_cluster,
3410            commit_interval: sink_plan.commit_interval,
3411        };
3412
3413        let ops = vec![catalog::Op::UpdateItem {
3414            id: item_id,
3415            name: entry.name().clone(),
3416            to_item: CatalogItem::Sink(new_sink),
3417        }];
3418
3419        match self
3420            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3421            .await
3422        {
3423            Ok(()) => {}
3424            Err(err) => {
3425                ctx.retire(Err(err));
3426                return;
3427            }
3428        }
3429
3430        let storage_sink_desc = StorageSinkDesc {
3431            from: sink_plan.from,
3432            from_desc: from_entry
3433                .relation_desc()
3434                .expect("sinks can only be built on items with descs")
3435                .into_owned(),
3436            connection: sink_plan
3437                .connection
3438                .clone()
3439                .into_inline_connection(self.catalog().state()),
3440            envelope: sink_plan.envelope,
3441            as_of,
3442            with_snapshot,
3443            version: sink_plan.version,
3444            from_storage_metadata: (),
3445            to_storage_metadata: (),
3446            commit_interval: sink_plan.commit_interval,
3447        };
3448
3449        self.controller
3450            .storage
3451            .alter_export(
3452                global_id,
3453                ExportDescription {
3454                    sink: storage_sink_desc,
3455                    instance_id: in_cluster,
3456                },
3457            )
3458            .await
3459            .unwrap_or_terminate("cannot fail to alter source desc");
3460
3461        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3462    }
3463
3464    #[instrument]
3465    pub(super) async fn sequence_alter_connection(
3466        &mut self,
3467        ctx: ExecuteContext,
3468        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3469    ) {
3470        match action {
3471            AlterConnectionAction::RotateKeys => {
3472                self.sequence_rotate_keys(ctx, id).await;
3473            }
3474            AlterConnectionAction::AlterOptions {
3475                set_options,
3476                drop_options,
3477                validate,
3478            } => {
3479                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3480                    .await
3481            }
3482        }
3483    }
3484
3485    #[instrument]
3486    async fn sequence_alter_connection_options(
3487        &mut self,
3488        mut ctx: ExecuteContext,
3489        id: CatalogItemId,
3490        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3491        drop_options: BTreeSet<ConnectionOptionName>,
3492        validate: bool,
3493    ) {
3494        let cur_entry = self.catalog().get_entry(&id);
3495        let cur_conn = cur_entry.connection().expect("known to be connection");
3496        let connection_gid = cur_conn.global_id();
3497
3498        let inner = || -> Result<Connection, AdapterError> {
3499            // Parse statement.
3500            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3501                .expect("invalid create sql persisted to catalog")
3502                .into_element()
3503                .ast
3504            {
3505                Statement::CreateConnection(stmt) => stmt,
3506                _ => unreachable!("proved type is source"),
3507            };
3508
3509            let catalog = self.catalog().for_system_session();
3510
3511            // Resolve items in statement
3512            let (mut create_conn_stmt, resolved_ids) =
3513                mz_sql::names::resolve(&catalog, create_conn_stmt)
3514                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3515
3516            // Retain options that are neither set nor dropped.
3517            create_conn_stmt
3518                .values
3519                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3520
3521            // Set new values
3522            create_conn_stmt.values.extend(
3523                set_options
3524                    .into_iter()
3525                    .map(|(name, value)| ConnectionOption { name, value }),
3526            );
3527
3528            // Open a new catalog, which we will use to re-plan our
3529            // statement with the desired config.
3530            let mut catalog = self.catalog().for_system_session();
3531            catalog.mark_id_unresolvable_for_replanning(id);
3532
3533            // Re-define our source in terms of the amended statement
3534            let plan = match mz_sql::plan::plan(
3535                None,
3536                &catalog,
3537                Statement::CreateConnection(create_conn_stmt),
3538                &Params::empty(),
3539                &resolved_ids,
3540            )
3541            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3542            {
3543                (Plan::CreateConnection(plan), _sql_impl_ids) => plan,
3544                (p, _) => {
3545                    unreachable!("create connection plan is only valid response, got {:?}", p)
3546                }
3547            };
3548
3549            // Parse statement.
3550            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3551                .expect("invalid create sql persisted to catalog")
3552                .into_element()
3553                .ast
3554            {
3555                Statement::CreateConnection(stmt) => stmt,
3556                _ => unreachable!("proved type is source"),
3557            };
3558
3559            let catalog = self.catalog().for_system_session();
3560
3561            // Resolve items in statement
3562            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3563                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3564
3565            Ok(Connection {
3566                create_sql: plan.connection.create_sql,
3567                global_id: cur_conn.global_id,
3568                details: plan.connection.details,
3569                resolved_ids: new_deps,
3570            })
3571        };
3572
3573        let conn = match inner() {
3574            Ok(conn) => conn,
3575            Err(e) => {
3576                return ctx.retire(Err(e));
3577            }
3578        };
3579
3580        if validate {
3581            let connection = conn
3582                .details
3583                .to_connection()
3584                .into_inline_connection(self.catalog().state());
3585
3586            let internal_cmd_tx = self.internal_cmd_tx.clone();
3587            let transient_revision = self.catalog().transient_revision();
3588            let conn_id = ctx.session().conn_id().clone();
3589            let otel_ctx = OpenTelemetryContext::obtain();
3590            let role_metadata = ctx.session().role_metadata().clone();
3591            let current_storage_parameters = self.controller.storage.config().clone();
3592
3593            task::spawn(
3594                || format!("validate_alter_connection:{conn_id}"),
3595                async move {
3596                    let resolved_ids = conn.resolved_ids.clone();
3597                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3598                    let result = match std::panic::AssertUnwindSafe(
3599                        connection.validate(id, &current_storage_parameters),
3600                    )
3601                    .ore_catch_unwind()
3602                    .await
3603                    {
3604                        Ok(Ok(())) => Ok(conn),
3605                        Ok(Err(err)) => Err(err.into()),
3606                        Err(_panic) => {
3607                            tracing::error!("alter connection validation panicked");
3608                            Err(AdapterError::Internal(
3609                                "connection validation panicked".into(),
3610                            ))
3611                        }
3612                    };
3613
3614                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3615                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3616                        AlterConnectionValidationReady {
3617                            ctx,
3618                            result,
3619                            connection_id: id,
3620                            connection_gid,
3621                            plan_validity: PlanValidity::new(
3622                                transient_revision,
3623                                dependency_ids.clone(),
3624                                None,
3625                                None,
3626                                role_metadata,
3627                            ),
3628                            otel_ctx,
3629                            resolved_ids,
3630                        },
3631                    ));
3632                    if let Err(e) = result {
3633                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3634                    }
3635                },
3636            );
3637        } else {
3638            let result = self
3639                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3640                .await;
3641            ctx.retire(result);
3642        }
3643    }
3644
3645    #[instrument]
3646    pub(crate) async fn sequence_alter_connection_stage_finish(
3647        &mut self,
3648        session: &Session,
3649        id: CatalogItemId,
3650        connection: Connection,
3651    ) -> Result<ExecuteResponse, AdapterError> {
3652        match self.catalog.get_entry(&id).item() {
3653            CatalogItem::Connection(curr_conn) => {
3654                curr_conn
3655                    .details
3656                    .to_connection()
3657                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3658                    .map_err(StorageError::from)?;
3659            }
3660            _ => unreachable!("known to be a connection"),
3661        };
3662
3663        let ops = vec![catalog::Op::UpdateItem {
3664            id,
3665            name: self.catalog.get_entry(&id).name().clone(),
3666            to_item: CatalogItem::Connection(connection.clone()),
3667        }];
3668
3669        self.catalog_transact(Some(session), ops).await?;
3670
3671        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3672        // and propagating connection changes to dependent sources, sinks, and
3673        // tables) is handled in `apply_catalog_implications` via
3674        // `handle_alter_connection`. The catalog transact above triggers that
3675        // code path.
3676
3677        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3678    }
3679
3680    #[instrument]
3681    pub(super) async fn sequence_alter_source(
3682        &mut self,
3683        session: &Session,
3684        plan::AlterSourcePlan {
3685            item_id,
3686            ingestion_id,
3687            action,
3688        }: plan::AlterSourcePlan,
3689    ) -> Result<ExecuteResponse, AdapterError> {
3690        let cur_entry = self.catalog().get_entry(&item_id);
3691        let cur_source = cur_entry.source().expect("known to be source");
3692
3693        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3694            // Parse statement.
3695            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3696                .expect("invalid create sql persisted to catalog")
3697                .into_element()
3698                .ast
3699            {
3700                Statement::CreateSource(stmt) => stmt,
3701                _ => unreachable!("proved type is source"),
3702            };
3703
3704            let catalog = coord.catalog().for_system_session();
3705
3706            // Resolve items in statement
3707            mz_sql::names::resolve(&catalog, create_source_stmt)
3708                .map_err(|e| AdapterError::internal(err_cx, e))
3709        };
3710
3711        match action {
3712            plan::AlterSourceAction::AddSubsourceExports {
3713                subsources,
3714                options,
3715            } => {
3716                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3717
3718                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3719                    text_columns: mut new_text_columns,
3720                    exclude_columns: mut new_exclude_columns,
3721                    ..
3722                } = options.try_into()?;
3723
3724                // Resolve items in statement
3725                let (mut create_source_stmt, resolved_ids) =
3726                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3727
3728                // Get all currently referred-to items
3729                let catalog = self.catalog();
3730                let curr_references: BTreeSet<_> = catalog
3731                    .get_entry(&item_id)
3732                    .used_by()
3733                    .into_iter()
3734                    .filter_map(|subsource| {
3735                        catalog
3736                            .get_entry(subsource)
3737                            .subsource_details()
3738                            .map(|(_id, reference, _details)| reference)
3739                    })
3740                    .collect();
3741
3742                // We are doing a lot of unwrapping, so just make an error to reference; all of
3743                // these invariants are guaranteed to be true because of how we plan subsources.
3744                let purification_err =
3745                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3746
3747                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3748                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3749                // statement.
3750                match &mut create_source_stmt.connection {
3751                    CreateSourceConnection::Postgres {
3752                        options: curr_options,
3753                        ..
3754                    } => {
3755                        let mz_sql::plan::PgConfigOptionExtracted {
3756                            mut text_columns, ..
3757                        } = curr_options.clone().try_into()?;
3758
3759                        // Drop text columns; we will add them back in
3760                        // as appropriate below.
3761                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3762
3763                        // Drop all text columns that are not currently referred to.
3764                        text_columns.retain(|column_qualified_reference| {
3765                            mz_ore::soft_assert_eq_or_log!(
3766                                column_qualified_reference.0.len(),
3767                                4,
3768                                "all TEXT COLUMNS values must be column-qualified references"
3769                            );
3770                            let mut table = column_qualified_reference.clone();
3771                            table.0.truncate(3);
3772                            curr_references.contains(&table)
3773                        });
3774
3775                        // Merge the current text columns into the new text columns.
3776                        new_text_columns.extend(text_columns);
3777
3778                        // If we have text columns, add them to the options.
3779                        if !new_text_columns.is_empty() {
3780                            new_text_columns.sort();
3781                            let new_text_columns = new_text_columns
3782                                .into_iter()
3783                                .map(WithOptionValue::UnresolvedItemName)
3784                                .collect();
3785
3786                            curr_options.push(PgConfigOption {
3787                                name: PgConfigOptionName::TextColumns,
3788                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3789                            });
3790                        }
3791                    }
3792                    CreateSourceConnection::MySql {
3793                        options: curr_options,
3794                        ..
3795                    } => {
3796                        let mz_sql::plan::MySqlConfigOptionExtracted {
3797                            mut text_columns,
3798                            mut exclude_columns,
3799                            ..
3800                        } = curr_options.clone().try_into()?;
3801
3802                        // Drop both ignore and text columns; we will add them back in
3803                        // as appropriate below.
3804                        curr_options.retain(|o| {
3805                            !matches!(
3806                                o.name,
3807                                MySqlConfigOptionName::TextColumns
3808                                    | MySqlConfigOptionName::ExcludeColumns
3809                            )
3810                        });
3811
3812                        // Drop all text / exclude columns that are not currently referred to.
3813                        let column_referenced =
3814                            |column_qualified_reference: &UnresolvedItemName| {
3815                                mz_ore::soft_assert_eq_or_log!(
3816                                    column_qualified_reference.0.len(),
3817                                    3,
3818                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3819                                );
3820                                let mut table = column_qualified_reference.clone();
3821                                table.0.truncate(2);
3822                                curr_references.contains(&table)
3823                            };
3824                        text_columns.retain(column_referenced);
3825                        exclude_columns.retain(column_referenced);
3826
3827                        // Merge the current text / exclude columns into the new text / exclude columns.
3828                        new_text_columns.extend(text_columns);
3829                        new_exclude_columns.extend(exclude_columns);
3830
3831                        // If we have text columns, add them to the options.
3832                        if !new_text_columns.is_empty() {
3833                            new_text_columns.sort();
3834                            let new_text_columns = new_text_columns
3835                                .into_iter()
3836                                .map(WithOptionValue::UnresolvedItemName)
3837                                .collect();
3838
3839                            curr_options.push(MySqlConfigOption {
3840                                name: MySqlConfigOptionName::TextColumns,
3841                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3842                            });
3843                        }
3844                        // If we have exclude columns, add them to the options.
3845                        if !new_exclude_columns.is_empty() {
3846                            new_exclude_columns.sort();
3847                            let new_exclude_columns = new_exclude_columns
3848                                .into_iter()
3849                                .map(WithOptionValue::UnresolvedItemName)
3850                                .collect();
3851
3852                            curr_options.push(MySqlConfigOption {
3853                                name: MySqlConfigOptionName::ExcludeColumns,
3854                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3855                            });
3856                        }
3857                    }
3858                    CreateSourceConnection::SqlServer {
3859                        options: curr_options,
3860                        ..
3861                    } => {
3862                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3863                            mut text_columns,
3864                            mut exclude_columns,
3865                            ..
3866                        } = curr_options.clone().try_into()?;
3867
3868                        // Drop both ignore and text columns; we will add them back in
3869                        // as appropriate below.
3870                        curr_options.retain(|o| {
3871                            !matches!(
3872                                o.name,
3873                                SqlServerConfigOptionName::TextColumns
3874                                    | SqlServerConfigOptionName::ExcludeColumns
3875                            )
3876                        });
3877
3878                        // Drop all text / exclude columns that are not currently referred to.
3879                        // SQL Server text/exclude column refs are 3-part (schema.table.col),
3880                        // which truncate to 2-part (schema.table). But external references
3881                        // are 3-part (database.schema.table). Use suffix matching since
3882                        // a SQL Server source connects to a single database.
3883                        let column_referenced =
3884                            |column_qualified_reference: &UnresolvedItemName| {
3885                                mz_ore::soft_assert_eq_or_log!(
3886                                    column_qualified_reference.0.len(),
3887                                    3,
3888                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3889                                );
3890                                let mut table = column_qualified_reference.clone();
3891                                table.0.truncate(2);
3892                                curr_references.iter().any(|r| r.0.ends_with(&table.0))
3893                            };
3894                        text_columns.retain(column_referenced);
3895                        exclude_columns.retain(column_referenced);
3896
3897                        // Merge the current text / exclude columns into the new text / exclude columns.
3898                        new_text_columns.extend(text_columns);
3899                        new_exclude_columns.extend(exclude_columns);
3900
3901                        // If we have text columns, add them to the options.
3902                        if !new_text_columns.is_empty() {
3903                            new_text_columns.sort();
3904                            let new_text_columns = new_text_columns
3905                                .into_iter()
3906                                .map(WithOptionValue::UnresolvedItemName)
3907                                .collect();
3908
3909                            curr_options.push(SqlServerConfigOption {
3910                                name: SqlServerConfigOptionName::TextColumns,
3911                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3912                            });
3913                        }
3914                        // If we have exclude columns, add them to the options.
3915                        if !new_exclude_columns.is_empty() {
3916                            new_exclude_columns.sort();
3917                            let new_exclude_columns = new_exclude_columns
3918                                .into_iter()
3919                                .map(WithOptionValue::UnresolvedItemName)
3920                                .collect();
3921
3922                            curr_options.push(SqlServerConfigOption {
3923                                name: SqlServerConfigOptionName::ExcludeColumns,
3924                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3925                            });
3926                        }
3927                    }
3928                    _ => return Err(purification_err()),
3929                };
3930
3931                let mut catalog = self.catalog().for_system_session();
3932                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3933
3934                // Re-define our source in terms of the amended statement
3935                let planned = mz_sql::plan::plan(
3936                    None,
3937                    &catalog,
3938                    Statement::CreateSource(create_source_stmt),
3939                    &Params::empty(),
3940                    &resolved_ids,
3941                )
3942                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3943                let plan = match planned {
3944                    (Plan::CreateSource(plan), _sql_impl_ids) => plan,
3945                    (p, _) => {
3946                        unreachable!("create source plan is only valid response, got {:?}", p)
3947                    }
3948                };
3949
3950                // Asserting that we've done the right thing with dependencies
3951                // here requires mocking out objects in the catalog, which is a
3952                // large task for an operation we have to cover in tests anyway.
3953                let source = Source::new(
3954                    plan,
3955                    cur_source.global_id,
3956                    resolved_ids,
3957                    cur_source.custom_logical_compaction_window,
3958                    cur_source.is_retained_metrics_object,
3959                );
3960
3961                // Get new ingestion description for storage.
3962                let desc = match &source.data_source {
3963                    DataSourceDesc::Ingestion { desc, .. }
3964                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3965                        desc.clone().into_inline_connection(self.catalog().state())
3966                    }
3967                    _ => unreachable!("already verified of type ingestion"),
3968                };
3969
3970                self.controller
3971                    .storage
3972                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
3973                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3974
3975                // Redefine source. This must be done before we create any new
3976                // subsources so that it has the right ingestion.
3977                let mut ops = vec![catalog::Op::UpdateItem {
3978                    id: item_id,
3979                    // Look this up again so we don't have to hold an immutable reference to the
3980                    // entry for so long.
3981                    name: self.catalog.get_entry(&item_id).name().clone(),
3982                    to_item: CatalogItem::Source(source),
3983                }];
3984
3985                let CreateSourceInner {
3986                    ops: new_ops,
3987                    sources: _,
3988                    if_not_exists_ids,
3989                } = self.create_source_inner(session, subsources).await?;
3990
3991                ops.extend(new_ops.into_iter());
3992
3993                assert!(
3994                    if_not_exists_ids.is_empty(),
3995                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
3996                );
3997
3998                self.catalog_transact(Some(session), ops).await?;
3999            }
4000            plan::AlterSourceAction::RefreshReferences { references } => {
4001                self.catalog_transact(
4002                    Some(session),
4003                    vec![catalog::Op::UpdateSourceReferences {
4004                        source_id: item_id,
4005                        references: references.into(),
4006                    }],
4007                )
4008                .await?;
4009            }
4010        }
4011
4012        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4013    }
4014
4015    #[instrument]
4016    pub(super) async fn sequence_alter_system_set(
4017        &mut self,
4018        session: &Session,
4019        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4020    ) -> Result<ExecuteResponse, AdapterError> {
4021        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4022        // We want to ensure that the network policy we're switching too actually exists.
4023        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4024            self.validate_alter_system_network_policy(session, &value)?;
4025        }
4026
4027        let op = match value {
4028            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4029                name: name.clone(),
4030                value: OwnedVarInput::SqlSet(values),
4031            },
4032            plan::VariableValue::Default => {
4033                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4034            }
4035        };
4036        self.catalog_transact(Some(session), vec![op]).await?;
4037
4038        session.add_notice(AdapterNotice::VarDefaultUpdated {
4039            role: None,
4040            var_name: Some(name),
4041        });
4042        Ok(ExecuteResponse::AlteredSystemConfiguration)
4043    }
4044
4045    #[instrument]
4046    pub(super) async fn sequence_alter_system_reset(
4047        &mut self,
4048        session: &Session,
4049        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4050    ) -> Result<ExecuteResponse, AdapterError> {
4051        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4052        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4053        self.catalog_transact(Some(session), vec![op]).await?;
4054        session.add_notice(AdapterNotice::VarDefaultUpdated {
4055            role: None,
4056            var_name: Some(name),
4057        });
4058        Ok(ExecuteResponse::AlteredSystemConfiguration)
4059    }
4060
4061    #[instrument]
4062    pub(super) async fn sequence_alter_system_reset_all(
4063        &mut self,
4064        session: &Session,
4065        _: plan::AlterSystemResetAllPlan,
4066    ) -> Result<ExecuteResponse, AdapterError> {
4067        self.is_user_allowed_to_alter_system(session, None)?;
4068        let op = catalog::Op::ResetAllSystemConfiguration;
4069        self.catalog_transact(Some(session), vec![op]).await?;
4070        session.add_notice(AdapterNotice::VarDefaultUpdated {
4071            role: None,
4072            var_name: None,
4073        });
4074        Ok(ExecuteResponse::AlteredSystemConfiguration)
4075    }
4076
4077    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4078    fn is_user_allowed_to_alter_system(
4079        &self,
4080        session: &Session,
4081        var_name: Option<&str>,
4082    ) -> Result<(), AdapterError> {
4083        match (session.user().kind(), var_name) {
4084            // Only internal superusers can reset all system variables.
4085            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4086            // Whether or not a variable can be modified depends if we're an internal superuser.
4087            (UserKind::Superuser, Some(name))
4088                if session.user().is_internal()
4089                    || self.catalog().system_config().user_modifiable(name) =>
4090            {
4091                // In lieu of plumbing the user to all system config functions, just check that
4092                // the var is visible.
4093                let var = self.catalog().system_config().get(name)?;
4094                var.visible(session.user(), self.catalog().system_config())?;
4095                Ok(())
4096            }
4097            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4098            // session variables.
4099            (UserKind::Regular, Some(name))
4100                if self.catalog().system_config().user_modifiable(name) =>
4101            {
4102                Err(AdapterError::Unauthorized(
4103                    rbac::UnauthorizedError::Superuser {
4104                        action: format!("toggle the '{name}' system configuration parameter"),
4105                    },
4106                ))
4107            }
4108            _ => Err(AdapterError::Unauthorized(
4109                rbac::UnauthorizedError::MzSystem {
4110                    action: "alter system".into(),
4111                },
4112            )),
4113        }
4114    }
4115
4116    fn validate_alter_system_network_policy(
4117        &self,
4118        session: &Session,
4119        policy_value: &plan::VariableValue,
4120    ) -> Result<(), AdapterError> {
4121        let policy_name = match &policy_value {
4122            // Make sure the compiled in default still exists.
4123            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4124            plan::VariableValue::Values(values) if values.len() == 1 => {
4125                values.iter().next().cloned()
4126            }
4127            plan::VariableValue::Values(values) => {
4128                tracing::warn!(?values, "can't set multiple network policies at once");
4129                None
4130            }
4131        };
4132        let maybe_network_policy = policy_name
4133            .as_ref()
4134            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4135        let Some(network_policy) = maybe_network_policy else {
4136            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4137                VarError::InvalidParameterValue {
4138                    name: NETWORK_POLICY.name(),
4139                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4140                    reason: "no network policy with such name exists".to_string(),
4141                },
4142            )));
4143        };
4144        self.validate_alter_network_policy(session, &network_policy.rules)
4145    }
4146
4147    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4148    ///
4149    /// This helps prevent users from modifying network policies in a way that would lock out their
4150    /// current connection.
4151    fn validate_alter_network_policy(
4152        &self,
4153        session: &Session,
4154        policy_rules: &Vec<NetworkPolicyRule>,
4155    ) -> Result<(), AdapterError> {
4156        // If the user is not an internal user attempt to protect them from
4157        // blocking themselves.
4158        if session.user().is_internal() {
4159            return Ok(());
4160        }
4161        if let Some(ip) = session.meta().client_ip() {
4162            validate_ip_with_policy_rules(ip, policy_rules)
4163                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4164        } else {
4165            // Sessions without IPs are only temporarily constructed for default values
4166            // they should not be permitted here.
4167            return Err(AdapterError::NetworkPolicyDenied(
4168                NetworkPolicyError::MissingIp,
4169            ));
4170        }
4171        Ok(())
4172    }
4173
4174    // Returns the name of the portal to execute.
4175    #[instrument]
4176    pub(super) fn sequence_execute(
4177        &self,
4178        session: &mut Session,
4179        plan: plan::ExecutePlan,
4180    ) -> Result<String, AdapterError> {
4181        // Verify the stmt is still valid.
4182        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4183        let ps = session
4184            .get_prepared_statement_unverified(&plan.name)
4185            .expect("known to exist");
4186        let stmt = ps.stmt().cloned();
4187        let desc = ps.desc().clone();
4188        let state_revision = ps.state_revision;
4189        let logging = Arc::clone(ps.logging());
4190        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4191    }
4192
4193    #[instrument]
4194    pub(super) async fn sequence_grant_privileges(
4195        &mut self,
4196        session: &Session,
4197        plan::GrantPrivilegesPlan {
4198            update_privileges,
4199            grantees,
4200        }: plan::GrantPrivilegesPlan,
4201    ) -> Result<ExecuteResponse, AdapterError> {
4202        self.sequence_update_privileges(
4203            session,
4204            update_privileges,
4205            grantees,
4206            UpdatePrivilegeVariant::Grant,
4207        )
4208        .await
4209    }
4210
4211    #[instrument]
4212    pub(super) async fn sequence_revoke_privileges(
4213        &mut self,
4214        session: &Session,
4215        plan::RevokePrivilegesPlan {
4216            update_privileges,
4217            revokees,
4218        }: plan::RevokePrivilegesPlan,
4219    ) -> Result<ExecuteResponse, AdapterError> {
4220        self.sequence_update_privileges(
4221            session,
4222            update_privileges,
4223            revokees,
4224            UpdatePrivilegeVariant::Revoke,
4225        )
4226        .await
4227    }
4228
4229    #[instrument]
4230    async fn sequence_update_privileges(
4231        &mut self,
4232        session: &Session,
4233        update_privileges: Vec<UpdatePrivilege>,
4234        grantees: Vec<RoleId>,
4235        variant: UpdatePrivilegeVariant,
4236    ) -> Result<ExecuteResponse, AdapterError> {
4237        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4238        let mut warnings = Vec::new();
4239        let catalog = self.catalog().for_session(session);
4240
4241        for UpdatePrivilege {
4242            acl_mode,
4243            target_id,
4244            grantor,
4245        } in update_privileges
4246        {
4247            let actual_object_type = catalog.get_system_object_type(&target_id);
4248            // For all relations we allow all applicable table privileges, but send a warning if the
4249            // privilege isn't actually applicable to the object type.
4250            if actual_object_type.is_relation() {
4251                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4252                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4253                if !non_applicable_privileges.is_empty() {
4254                    let object_description =
4255                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4256                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4257                        non_applicable_privileges,
4258                        object_description,
4259                    })
4260                }
4261            }
4262
4263            if let SystemObjectId::Object(object_id) = &target_id {
4264                self.catalog()
4265                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4266            }
4267
4268            let privileges = self
4269                .catalog()
4270                .get_privileges(&target_id, session.conn_id())
4271                // Should be unreachable since the parser will refuse to parse grant/revoke
4272                // statements on objects without privileges.
4273                .ok_or(AdapterError::Unsupported(
4274                    "GRANTs/REVOKEs on an object type with no privileges",
4275                ))?;
4276
4277            for grantee in &grantees {
4278                self.catalog().ensure_not_system_role(grantee)?;
4279                self.catalog().ensure_not_predefined_role(grantee)?;
4280                let existing_privilege = privileges
4281                    .get_acl_item(grantee, &grantor)
4282                    .map(Cow::Borrowed)
4283                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4284
4285                match variant {
4286                    UpdatePrivilegeVariant::Grant
4287                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4288                    {
4289                        ops.push(catalog::Op::UpdatePrivilege {
4290                            target_id: target_id.clone(),
4291                            privilege: MzAclItem {
4292                                grantee: *grantee,
4293                                grantor,
4294                                acl_mode,
4295                            },
4296                            variant,
4297                        });
4298                    }
4299                    UpdatePrivilegeVariant::Revoke
4300                        if !existing_privilege
4301                            .acl_mode
4302                            .intersection(acl_mode)
4303                            .is_empty() =>
4304                    {
4305                        ops.push(catalog::Op::UpdatePrivilege {
4306                            target_id: target_id.clone(),
4307                            privilege: MzAclItem {
4308                                grantee: *grantee,
4309                                grantor,
4310                                acl_mode,
4311                            },
4312                            variant,
4313                        });
4314                    }
4315                    // no-op
4316                    _ => {}
4317                }
4318            }
4319        }
4320
4321        if ops.is_empty() {
4322            session.add_notices(warnings);
4323            return Ok(variant.into());
4324        }
4325
4326        let res = self
4327            .catalog_transact(Some(session), ops)
4328            .await
4329            .map(|_| match variant {
4330                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4331                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4332            });
4333        if res.is_ok() {
4334            session.add_notices(warnings);
4335        }
4336        res
4337    }
4338
4339    #[instrument]
4340    pub(super) async fn sequence_alter_default_privileges(
4341        &mut self,
4342        session: &Session,
4343        plan::AlterDefaultPrivilegesPlan {
4344            privilege_objects,
4345            privilege_acl_items,
4346            is_grant,
4347        }: plan::AlterDefaultPrivilegesPlan,
4348    ) -> Result<ExecuteResponse, AdapterError> {
4349        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4350        let variant = if is_grant {
4351            UpdatePrivilegeVariant::Grant
4352        } else {
4353            UpdatePrivilegeVariant::Revoke
4354        };
4355        for privilege_object in &privilege_objects {
4356            self.catalog()
4357                .ensure_not_system_role(&privilege_object.role_id)?;
4358            self.catalog()
4359                .ensure_not_predefined_role(&privilege_object.role_id)?;
4360            if let Some(database_id) = privilege_object.database_id {
4361                self.catalog()
4362                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4363            }
4364            if let Some(schema_id) = privilege_object.schema_id {
4365                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4366                let schema_spec: SchemaSpecifier = schema_id.into();
4367
4368                self.catalog().ensure_not_reserved_object(
4369                    &(database_spec, schema_spec).into(),
4370                    session.conn_id(),
4371                )?;
4372            }
4373            for privilege_acl_item in &privilege_acl_items {
4374                self.catalog()
4375                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4376                self.catalog()
4377                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4378                ops.push(catalog::Op::UpdateDefaultPrivilege {
4379                    privilege_object: privilege_object.clone(),
4380                    privilege_acl_item: privilege_acl_item.clone(),
4381                    variant,
4382                })
4383            }
4384        }
4385
4386        self.catalog_transact(Some(session), ops).await?;
4387        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4388    }
4389
4390    #[instrument]
4391    pub(super) async fn sequence_grant_role(
4392        &mut self,
4393        session: &Session,
4394        plan::GrantRolePlan {
4395            role_ids,
4396            member_ids,
4397            grantor_id,
4398        }: plan::GrantRolePlan,
4399    ) -> Result<ExecuteResponse, AdapterError> {
4400        let catalog = self.catalog();
4401        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4402        for role_id in role_ids {
4403            for member_id in &member_ids {
4404                let member_membership: BTreeSet<_> =
4405                    catalog.get_role(member_id).membership().keys().collect();
4406                if member_membership.contains(&role_id) {
4407                    let role_name = catalog.get_role(&role_id).name().to_string();
4408                    let member_name = catalog.get_role(member_id).name().to_string();
4409                    // We need this check so we don't accidentally return a success on a reserved role.
4410                    catalog.ensure_not_reserved_role(member_id)?;
4411                    catalog.ensure_grantable_role(&role_id)?;
4412                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4413                        role_name,
4414                        member_name,
4415                    });
4416                } else {
4417                    ops.push(catalog::Op::GrantRole {
4418                        role_id,
4419                        member_id: *member_id,
4420                        grantor_id,
4421                    });
4422                }
4423            }
4424        }
4425
4426        if ops.is_empty() {
4427            return Ok(ExecuteResponse::GrantedRole);
4428        }
4429
4430        self.catalog_transact(Some(session), ops)
4431            .await
4432            .map(|_| ExecuteResponse::GrantedRole)
4433    }
4434
4435    #[instrument]
4436    pub(super) async fn sequence_revoke_role(
4437        &mut self,
4438        session: &Session,
4439        plan::RevokeRolePlan {
4440            role_ids,
4441            member_ids,
4442            grantor_id,
4443        }: plan::RevokeRolePlan,
4444    ) -> Result<ExecuteResponse, AdapterError> {
4445        let catalog = self.catalog();
4446        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4447        for role_id in role_ids {
4448            for member_id in &member_ids {
4449                let member_membership: BTreeSet<_> =
4450                    catalog.get_role(member_id).membership().keys().collect();
4451                if !member_membership.contains(&role_id) {
4452                    let role_name = catalog.get_role(&role_id).name().to_string();
4453                    let member_name = catalog.get_role(member_id).name().to_string();
4454                    // We need this check so we don't accidentally return a success on a reserved role.
4455                    catalog.ensure_not_reserved_role(member_id)?;
4456                    catalog.ensure_grantable_role(&role_id)?;
4457                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4458                        role_name,
4459                        member_name,
4460                    });
4461                } else {
4462                    ops.push(catalog::Op::RevokeRole {
4463                        role_id,
4464                        member_id: *member_id,
4465                        grantor_id,
4466                    });
4467                }
4468            }
4469        }
4470
4471        if ops.is_empty() {
4472            return Ok(ExecuteResponse::RevokedRole);
4473        }
4474
4475        self.catalog_transact(Some(session), ops)
4476            .await
4477            .map(|_| ExecuteResponse::RevokedRole)
4478    }
4479
4480    #[instrument]
4481    pub(super) async fn sequence_alter_owner(
4482        &mut self,
4483        session: &Session,
4484        plan::AlterOwnerPlan {
4485            id,
4486            object_type,
4487            new_owner,
4488        }: plan::AlterOwnerPlan,
4489    ) -> Result<ExecuteResponse, AdapterError> {
4490        let mut ops = vec![catalog::Op::UpdateOwner {
4491            id: id.clone(),
4492            new_owner,
4493        }];
4494
4495        match &id {
4496            ObjectId::Item(global_id) => {
4497                let entry = self.catalog().get_entry(global_id);
4498
4499                // Cannot directly change the owner of an index.
4500                if entry.is_index() {
4501                    let name = self
4502                        .catalog()
4503                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4504                        .to_string();
4505                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4506                    return Ok(ExecuteResponse::AlteredObject(object_type));
4507                }
4508
4509                // Alter owner cascades down to dependent indexes.
4510                let dependent_index_ops = entry
4511                    .used_by()
4512                    .into_iter()
4513                    .filter(|id| self.catalog().get_entry(id).is_index())
4514                    .map(|id| catalog::Op::UpdateOwner {
4515                        id: ObjectId::Item(*id),
4516                        new_owner,
4517                    });
4518                ops.extend(dependent_index_ops);
4519
4520                // Alter owner cascades down to progress collections.
4521                let dependent_subsources =
4522                    entry
4523                        .progress_id()
4524                        .into_iter()
4525                        .map(|item_id| catalog::Op::UpdateOwner {
4526                            id: ObjectId::Item(item_id),
4527                            new_owner,
4528                        });
4529                ops.extend(dependent_subsources);
4530            }
4531            ObjectId::Cluster(cluster_id) => {
4532                let cluster = self.catalog().get_cluster(*cluster_id);
4533                // Alter owner cascades down to cluster replicas.
4534                let managed_cluster_replica_ops =
4535                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4536                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4537                        new_owner,
4538                    });
4539                ops.extend(managed_cluster_replica_ops);
4540            }
4541            _ => {}
4542        }
4543
4544        self.catalog_transact(Some(session), ops)
4545            .await
4546            .map(|_| ExecuteResponse::AlteredObject(object_type))
4547    }
4548
4549    #[instrument]
4550    pub(super) async fn sequence_reassign_owned(
4551        &mut self,
4552        session: &Session,
4553        plan::ReassignOwnedPlan {
4554            old_roles,
4555            new_role,
4556            reassign_ids,
4557        }: plan::ReassignOwnedPlan,
4558    ) -> Result<ExecuteResponse, AdapterError> {
4559        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4560            self.catalog().ensure_not_reserved_role(role_id)?;
4561        }
4562
4563        let ops = reassign_ids
4564            .into_iter()
4565            .map(|id| catalog::Op::UpdateOwner {
4566                id,
4567                new_owner: new_role,
4568            })
4569            .collect();
4570
4571        self.catalog_transact(Some(session), ops)
4572            .await
4573            .map(|_| ExecuteResponse::ReassignOwned)
4574    }
4575
4576    #[instrument]
4577    pub(crate) async fn handle_deferred_statement(&mut self) {
4578        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4579        // was processed, removing the single element from deferred_statements, so it is expected
4580        // that this is sometimes empty.
4581        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4582            return;
4583        };
4584        match ps {
4585            crate::coord::PlanStatement::Statement { stmt, params } => {
4586                self.handle_execute_inner(stmt, params, ctx).await;
4587            }
4588            crate::coord::PlanStatement::Plan {
4589                plan,
4590                resolved_ids,
4591                sql_impl_resolved_ids,
4592            } => {
4593                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_resolved_ids)
4594                    .await;
4595            }
4596        }
4597    }
4598
4599    #[instrument]
4600    // TODO(parkmycar): Remove this once we have an actual implementation.
4601    #[allow(clippy::unused_async)]
4602    pub(super) async fn sequence_alter_table(
4603        &mut self,
4604        ctx: &mut ExecuteContext,
4605        plan: plan::AlterTablePlan,
4606    ) -> Result<ExecuteResponse, AdapterError> {
4607        let plan::AlterTablePlan {
4608            relation_id,
4609            column_name,
4610            column_type,
4611            raw_sql_type,
4612        } = plan;
4613
4614        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4615        let (_, new_global_id) = self.allocate_user_id().await?;
4616        let ops = vec![catalog::Op::AlterAddColumn {
4617            id: relation_id,
4618            new_global_id,
4619            name: column_name,
4620            typ: column_type,
4621            sql: raw_sql_type,
4622        }];
4623
4624        self.catalog_transact_with_context(None, Some(ctx), ops)
4625            .await?;
4626
4627        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4628    }
4629
4630    /// Prepares to apply a replacement materialized view.
4631    #[instrument]
4632    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4633        &mut self,
4634        ctx: ExecuteContext,
4635        plan: AlterMaterializedViewApplyReplacementPlan,
4636    ) {
4637        // To ensure there is no time gap in the output, we can only apply a replacement if the
4638        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4639        // might not be the case initially, so we have to wait. To this end, we install a watch set
4640        // waiting for the target MV's write frontier to advance sufficiently.
4641        //
4642        // Note that the replacement's dataflow is not performing any writes, so it can only be
4643        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4644        // replacement's write frontier is always <= the target's.
4645
4646        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4647
4648        let plan_validity = PlanValidity::new(
4649            self.catalog().transient_revision(),
4650            BTreeSet::from_iter([id, replacement_id]),
4651            None,
4652            None,
4653            ctx.session().role_metadata().clone(),
4654        );
4655
4656        let target = self.catalog.get_entry(&id);
4657        let target_gid = target.latest_global_id();
4658
4659        let replacement = self.catalog.get_entry(&replacement_id);
4660        let replacement_gid = replacement.latest_global_id();
4661
4662        let target_upper = self
4663            .controller
4664            .storage_collections
4665            .collection_frontiers(target_gid)
4666            .expect("target MV exists")
4667            .write_frontier;
4668        let replacement_upper = self
4669            .controller
4670            .compute
4671            .collection_frontiers(replacement_gid, replacement.cluster_id())
4672            .expect("replacement MV exists")
4673            .write_frontier;
4674
4675        info!(
4676            %id, %replacement_id, ?target_upper, ?replacement_upper,
4677            "preparing materialized view replacement application",
4678        );
4679
4680        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4681            // A replacement's write frontier can only become empty if the target's write frontier
4682            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4683            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4684            // the user by returning an error, rather than applying the useless replacement.
4685            //
4686            // Note that we can't assert on `target_upper` being empty here, because the reporting
4687            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4688            // from persist, which we cannot do without incurring I/O.
4689            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4690                name: target.name().item.clone(),
4691            }));
4692            return;
4693        };
4694
4695        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4696        // specified timestamp. Since we only need to wait until the target frontier is >= the
4697        // replacement's frontier, we can step back the timestamp.
4698        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4699
4700        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4701        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4702        // command.
4703        self.install_storage_watch_set(
4704            ctx.session().conn_id().clone(),
4705            BTreeSet::from_iter([target_gid]),
4706            replacement_upper_ts,
4707            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4708                ctx: Some(ctx),
4709                otel_ctx: OpenTelemetryContext::obtain(),
4710                plan,
4711                plan_validity,
4712            }),
4713        )
4714        .expect("target collection exists");
4715    }
4716
4717    /// Finishes applying a replacement materialized view after the frontier wait completed.
4718    #[instrument]
4719    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4720        &mut self,
4721        mut ctx: AlterMaterializedViewReadyContext,
4722    ) {
4723        ctx.otel_ctx.attach_as_parent();
4724
4725        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4726
4727        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4728        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4729        // world has arbitrarily changed since we performed planning, and we must re-assert
4730        // that it still matches our requirements.
4731        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4732            ctx.retire(Err(err));
4733            return;
4734        }
4735
4736        info!(
4737            %id, %replacement_id,
4738            "finishing materialized view replacement application",
4739        );
4740
4741        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4742        match self
4743            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4744            .await
4745        {
4746            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4747                ObjectType::MaterializedView,
4748            ))),
4749            Err(err) => ctx.retire(Err(err)),
4750        }
4751    }
4752
4753    pub(super) async fn statistics_oracle(
4754        &self,
4755        session: &Session,
4756        source_ids: &BTreeSet<GlobalId>,
4757        query_as_of: &Antichain<Timestamp>,
4758        is_oneshot: bool,
4759    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4760        super::statistics_oracle(
4761            session,
4762            source_ids,
4763            query_as_of,
4764            is_oneshot,
4765            self.catalog().system_config(),
4766            self.controller.storage_collections.as_ref(),
4767        )
4768        .await
4769    }
4770}
4771
4772impl Coordinator {
4773    /// Emit the raw optimizer notices in `notices` to the user's session, if
4774    /// any.
4775    ///
4776    /// This intentionally consumes `RawOptimizerNotice`s (not pre-rendered
4777    /// ones) because the user-facing rendering goes through the user's
4778    /// session-aware humanizer, which produces e.g. schema-qualified names
4779    /// relative to the user's current database/schema.
4780    pub(crate) fn emit_raw_optimizer_notices_to_user(
4781        &self,
4782        ctx: &ExecuteContext,
4783        notices: &[RawOptimizerNotice],
4784    ) {
4785        emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4786    }
4787
4788    /// Persist already-rendered optimizer notices for a newly created
4789    /// non-transient dataflow.
4790    ///
4791    /// This:
4792    /// - packs builtin-table updates for `mz_optimizer_notices` (if enabled),
4793    /// - stores the rendered metainfo on the catalog object via
4794    ///   `set_dataflow_metainfo`,
4795    /// - and returns a future that resolves once the builtin-table append
4796    ///   has been observed, or `None` if nothing was appended.
4797    async fn persist_dataflow_metainfo(
4798        &mut self,
4799        df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4800        export_id: GlobalId,
4801    ) -> Option<BuiltinTableAppendNotify> {
4802        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4803        // in-memory state.
4804        if self.catalog().state().system_config().enable_mz_notices()
4805            && !df_meta.optimizer_notices.is_empty()
4806        {
4807            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4808            self.catalog().state().pack_optimizer_notices(
4809                &mut builtin_table_updates,
4810                df_meta.optimizer_notices.iter(),
4811                Diff::ONE,
4812            );
4813
4814            // Save the metainfo.
4815            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4816
4817            Some(
4818                self.builtin_table_update()
4819                    .execute(builtin_table_updates)
4820                    .await
4821                    .0,
4822            )
4823        } else {
4824            // Save the metainfo.
4825            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4826
4827            None
4828        }
4829    }
4830}