mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_expr::{
28    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
29};
30use mz_ore::cast::CastFrom;
31use mz_ore::collections::{CollectionExt, HashSet};
32use mz_ore::task::{self, JoinHandle, spawn};
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{assert_none, instrument};
35use mz_repr::adt::jsonb::Jsonb;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::explain::ExprHumanizer;
38use mz_repr::explain::json::json_string;
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
42    RowIterator, Timestamp,
43};
44use mz_sql::ast::{
45    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
46    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
47    SqlServerConfigOptionName,
48};
49use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
50use mz_sql::catalog::{
51    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
52    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
53    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
54};
55use mz_sql::names::{
56    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
57    SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{
60    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
61    StatementContext,
62};
63use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
64use mz_storage_types::sinks::StorageSinkDesc;
65use mz_storage_types::sources::GenericSourceConnection;
66// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
67use mz_sql::plan::{
68    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
69    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
70    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
71};
72use mz_sql::session::metadata::SessionMetadata;
73use mz_sql::session::user::UserKind;
74use mz_sql::session::vars::{
75    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
76    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
77};
78use mz_sql::{plan, rbac};
79use mz_sql_parser::ast::display::AstDisplay;
80use mz_sql_parser::ast::{
81    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
82    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
83    WithOptionValue,
84};
85use mz_ssh_util::keys::SshKeyPairSet;
86use mz_storage_client::controller::ExportDescription;
87use mz_storage_types::AlterCompatible;
88use mz_storage_types::connections::inline::IntoInlineConnection;
89use mz_storage_types::controller::StorageError;
90use mz_transform::dataflow::DataflowMetainfo;
91use smallvec::SmallVec;
92use timely::progress::Antichain;
93use tokio::sync::{oneshot, watch};
94use tracing::{Instrument, Span, info, warn};
95
96use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
97use crate::command::{ExecuteResponse, Response};
98use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
99use crate::coord::sequencer::emit_optimizer_notices;
100use crate::coord::{
101    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
102    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
103    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
104    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
105    validate_ip_with_policy_rules,
106};
107use crate::error::AdapterError;
108use crate::notice::{AdapterNotice, DroppedInUseIndex};
109use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
110use crate::optimize::{self, Optimize};
111use crate::session::{
112    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
113    WriteLocks, WriteOp,
114};
115use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
116use crate::{PeekResponseUnary, ReadHolds};
117
118mod cluster;
119mod copy_from;
120mod create_continual_task;
121mod create_index;
122mod create_materialized_view;
123mod create_view;
124mod explain_timestamp;
125mod peek;
126mod secret;
127mod subscribe;
128
129/// Attempts to evaluate an expression. If an error is returned then the error is sent
130/// to the client and the function is exited.
131macro_rules! return_if_err {
132    ($expr:expr, $ctx:expr) => {
133        match $expr {
134            Ok(v) => v,
135            Err(e) => return $ctx.retire(Err(e.into())),
136        }
137    };
138}
139
140pub(super) use return_if_err;
141
142struct DropOps {
143    ops: Vec<catalog::Op>,
144    dropped_active_db: bool,
145    dropped_active_cluster: bool,
146    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
147}
148
149// A bundle of values returned from create_source_inner
150struct CreateSourceInner {
151    ops: Vec<catalog::Op>,
152    sources: Vec<(CatalogItemId, Source)>,
153    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
154}
155
156impl Coordinator {
157    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
158    /// execute both on and off of the coordinator thread. Stages can either produce another stage
159    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
160    /// tracing.
161    pub(crate) async fn sequence_staged<S>(
162        &mut self,
163        mut ctx: S::Ctx,
164        parent_span: Span,
165        mut stage: S,
166    ) where
167        S: Staged + 'static,
168        S::Ctx: Send + 'static,
169    {
170        return_if_err!(stage.validity().check(self.catalog()), ctx);
171        loop {
172            let mut cancel_enabled = stage.cancel_enabled();
173            if let Some(session) = ctx.session() {
174                if cancel_enabled {
175                    // Channel to await cancellation. Insert a new channel, but check if the previous one
176                    // was already canceled.
177                    if let Some((_prev_tx, prev_rx)) = self
178                        .staged_cancellation
179                        .insert(session.conn_id().clone(), watch::channel(false))
180                    {
181                        let was_canceled = *prev_rx.borrow();
182                        if was_canceled {
183                            ctx.retire(Err(AdapterError::Canceled));
184                            return;
185                        }
186                    }
187                } else {
188                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
189                    // when cancel_enabled may have been true on an earlier stage.
190                    self.staged_cancellation.remove(session.conn_id());
191                }
192            } else {
193                cancel_enabled = false
194            };
195            let next = stage
196                .stage(self, &mut ctx)
197                .instrument(parent_span.clone())
198                .await;
199            let res = return_if_err!(next, ctx);
200            stage = match res {
201                StageResult::Handle(handle) => {
202                    let internal_cmd_tx = self.internal_cmd_tx.clone();
203                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
204                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
205                    });
206                    return;
207                }
208                StageResult::HandleRetire(handle) => {
209                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
210                        ctx.retire(Ok(resp));
211                    });
212                    return;
213                }
214                StageResult::Response(resp) => {
215                    ctx.retire(Ok(resp));
216                    return;
217                }
218                StageResult::Immediate(stage) => *stage,
219            }
220        }
221    }
222
223    fn handle_spawn<C, T, F>(
224        &self,
225        ctx: C,
226        handle: JoinHandle<Result<T, AdapterError>>,
227        cancel_enabled: bool,
228        f: F,
229    ) where
230        C: StagedContext + Send + 'static,
231        T: Send + 'static,
232        F: FnOnce(C, T) + Send + 'static,
233    {
234        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
235            .session()
236            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
237        {
238            let mut rx = rx.clone();
239            Box::pin(async move {
240                // Wait for true or dropped sender.
241                let _ = rx.wait_for(|v| *v).await;
242                ()
243            })
244        } else {
245            Box::pin(future::pending())
246        };
247        spawn(|| "sequence_staged", async move {
248            tokio::select! {
249                res = handle => {
250                    let next = return_if_err!(res, ctx);
251                    f(ctx, next);
252                }
253                _ = rx, if cancel_enabled => {
254                    ctx.retire(Err(AdapterError::Canceled));
255                }
256            }
257        });
258    }
259
260    async fn create_source_inner(
261        &self,
262        session: &Session,
263        plans: Vec<plan::CreateSourcePlanBundle>,
264    ) -> Result<CreateSourceInner, AdapterError> {
265        let mut ops = vec![];
266        let mut sources = vec![];
267
268        let if_not_exists_ids = plans
269            .iter()
270            .filter_map(
271                |plan::CreateSourcePlanBundle {
272                     item_id,
273                     global_id: _,
274                     plan,
275                     resolved_ids: _,
276                     available_source_references: _,
277                 }| {
278                    if plan.if_not_exists {
279                        Some((*item_id, plan.name.clone()))
280                    } else {
281                        None
282                    }
283                },
284            )
285            .collect::<BTreeMap<_, _>>();
286
287        for plan::CreateSourcePlanBundle {
288            item_id,
289            global_id,
290            mut plan,
291            resolved_ids,
292            available_source_references,
293        } in plans
294        {
295            let name = plan.name.clone();
296
297            match plan.source.data_source {
298                plan::DataSourceDesc::Ingestion(ref desc)
299                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
300                    let cluster_id = plan
301                        .in_cluster
302                        .expect("ingestion plans must specify cluster");
303                    match desc.connection {
304                        GenericSourceConnection::Postgres(_)
305                        | GenericSourceConnection::MySql(_)
306                        | GenericSourceConnection::SqlServer(_)
307                        | GenericSourceConnection::Kafka(_)
308                        | GenericSourceConnection::LoadGenerator(_) => {
309                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
310                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
311                                    .get(self.catalog().system_config().dyncfgs());
312
313                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
314                                {
315                                    return Err(AdapterError::Unsupported(
316                                        "sources in clusters with >1 replicas",
317                                    ));
318                                }
319                            }
320                        }
321                    }
322                }
323                plan::DataSourceDesc::Webhook { .. } => {
324                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
325                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
326                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
327                            .get(self.catalog().system_config().dyncfgs());
328
329                        if !enable_multi_replica_sources {
330                            if cluster.replica_ids().len() > 1 {
331                                return Err(AdapterError::Unsupported(
332                                    "webhook sources in clusters with >1 replicas",
333                                ));
334                            }
335                        }
336                    }
337                }
338                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
339            }
340
341            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
342            if let mz_sql::plan::DataSourceDesc::Webhook {
343                validate_using: Some(validate),
344                ..
345            } = &mut plan.source.data_source
346            {
347                if let Err(reason) = validate.reduce_expression().await {
348                    self.metrics
349                        .webhook_validation_reduce_failures
350                        .with_label_values(&[reason])
351                        .inc();
352                    return Err(AdapterError::Internal(format!(
353                        "failed to reduce check expression, {reason}"
354                    )));
355                }
356            }
357
358            // If this source contained a set of available source references, update the
359            // source references catalog table.
360            let mut reference_ops = vec![];
361            if let Some(references) = &available_source_references {
362                reference_ops.push(catalog::Op::UpdateSourceReferences {
363                    source_id: item_id,
364                    references: references.clone().into(),
365                });
366            }
367
368            let source = Source::new(plan, global_id, resolved_ids, None, false);
369            ops.push(catalog::Op::CreateItem {
370                id: item_id,
371                name,
372                item: CatalogItem::Source(source.clone()),
373                owner_id: *session.current_role_id(),
374            });
375            sources.push((item_id, source));
376            // These operations must be executed after the source is added to the catalog.
377            ops.extend(reference_ops);
378        }
379
380        Ok(CreateSourceInner {
381            ops,
382            sources,
383            if_not_exists_ids,
384        })
385    }
386
387    /// Subsources are planned differently from other statements because they
388    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
389    /// Because of this, we have usually "missed" the opportunity to plan them
390    /// through the normal statement execution life cycle (the exception being
391    /// during bootstrapping).
392    ///
393    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
394    pub(crate) fn plan_subsource(
395        &self,
396        session: &Session,
397        params: &mz_sql::plan::Params,
398        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
399        item_id: CatalogItemId,
400        global_id: GlobalId,
401    ) -> Result<CreateSourcePlanBundle, AdapterError> {
402        let catalog = self.catalog().for_session(session);
403        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
404
405        let plan = self.plan_statement(
406            session,
407            Statement::CreateSubsource(subsource_stmt),
408            params,
409            &resolved_ids,
410        )?;
411        let plan = match plan {
412            Plan::CreateSource(plan) => plan,
413            _ => unreachable!(),
414        };
415        Ok(CreateSourcePlanBundle {
416            item_id,
417            global_id,
418            plan,
419            resolved_ids,
420            available_source_references: None,
421        })
422    }
423
424    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
425    pub(crate) async fn plan_purified_alter_source_add_subsource(
426        &mut self,
427        session: &Session,
428        params: Params,
429        source_name: ResolvedItemName,
430        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
431        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
432    ) -> Result<(Plan, ResolvedIds), AdapterError> {
433        let mut subsource_plans = Vec::with_capacity(subsources.len());
434
435        // Generate subsource statements
436        let conn_catalog = self.catalog().for_system_session();
437        let pcx = plan::PlanContext::zero();
438        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
439
440        let entry = self.catalog().get_entry(source_name.item_id());
441        let source = entry.source().ok_or_else(|| {
442            AdapterError::internal(
443                "plan alter source",
444                format!("expected Source found {entry:?}"),
445            )
446        })?;
447
448        let item_id = entry.id();
449        let ingestion_id = source.global_id();
450        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
451
452        let id_ts = self.get_catalog_write_ts().await;
453        let ids = self
454            .catalog()
455            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
456            .await?;
457        for (subsource_stmt, (item_id, global_id)) in
458            subsource_stmts.into_iter().zip_eq(ids.into_iter())
459        {
460            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
461            subsource_plans.push(s);
462        }
463
464        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
465            subsources: subsource_plans,
466            options,
467        };
468
469        Ok((
470            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
471                item_id,
472                ingestion_id,
473                action,
474            }),
475            ResolvedIds::empty(),
476        ))
477    }
478
479    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
480    pub(crate) fn plan_purified_alter_source_refresh_references(
481        &self,
482        _session: &Session,
483        _params: Params,
484        source_name: ResolvedItemName,
485        available_source_references: plan::SourceReferences,
486    ) -> Result<(Plan, ResolvedIds), AdapterError> {
487        let entry = self.catalog().get_entry(source_name.item_id());
488        let source = entry.source().ok_or_else(|| {
489            AdapterError::internal(
490                "plan alter source",
491                format!("expected Source found {entry:?}"),
492            )
493        })?;
494        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
495            references: available_source_references,
496        };
497
498        Ok((
499            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
500                item_id: entry.id(),
501                ingestion_id: source.global_id(),
502                action,
503            }),
504            ResolvedIds::empty(),
505        ))
506    }
507
508    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
509    /// the primary source, and any ingestion export subsources (e.g. PG
510    /// tables).
511    pub(crate) async fn plan_purified_create_source(
512        &mut self,
513        ctx: &ExecuteContext,
514        params: Params,
515        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
516        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
517        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
518        available_source_references: plan::SourceReferences,
519    ) -> Result<(Plan, ResolvedIds), AdapterError> {
520        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
521
522        // 1. First plan the progress subsource, if any.
523        if let Some(progress_stmt) = progress_stmt {
524            // The primary source depends on this subsource because the primary
525            // source needs to know its shard ID, and the easiest way of
526            // guaranteeing that the shard ID is discoverable is to create this
527            // collection first.
528            assert_none!(progress_stmt.of_source);
529            let id_ts = self.get_catalog_write_ts().await;
530            let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
531            let progress_plan =
532                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
533            let progress_full_name = self
534                .catalog()
535                .resolve_full_name(&progress_plan.plan.name, None);
536            let progress_subsource = ResolvedItemName::Item {
537                id: progress_plan.item_id,
538                qualifiers: progress_plan.plan.name.qualifiers.clone(),
539                full_name: progress_full_name,
540                print_id: true,
541                version: RelationVersionSelector::Latest,
542            };
543
544            create_source_plans.push(progress_plan);
545
546            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
547        }
548
549        let catalog = self.catalog().for_session(ctx.session());
550        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
551
552        let propagated_with_options: Vec<_> = source_stmt
553            .with_options
554            .iter()
555            .filter_map(|opt| match opt.name {
556                CreateSourceOptionName::TimestampInterval => None,
557                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
558                    name: CreateSubsourceOptionName::RetainHistory,
559                    value: opt.value.clone(),
560                }),
561            })
562            .collect();
563
564        // 2. Then plan the main source.
565        let source_plan = match self.plan_statement(
566            ctx.session(),
567            Statement::CreateSource(source_stmt),
568            &params,
569            &resolved_ids,
570        )? {
571            Plan::CreateSource(plan) => plan,
572            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
573        };
574
575        let id_ts = self.get_catalog_write_ts().await;
576        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
577
578        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579        let of_source = ResolvedItemName::Item {
580            id: item_id,
581            qualifiers: source_plan.name.qualifiers.clone(),
582            full_name: source_full_name,
583            print_id: true,
584            version: RelationVersionSelector::Latest,
585        };
586
587        // Generate subsource statements
588        let conn_catalog = self.catalog().for_system_session();
589        let pcx = plan::PlanContext::zero();
590        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594        for subsource_stmt in subsource_stmts.iter_mut() {
595            subsource_stmt
596                .with_options
597                .extend(propagated_with_options.iter().cloned())
598        }
599
600        create_source_plans.push(CreateSourcePlanBundle {
601            item_id,
602            global_id,
603            plan: source_plan,
604            resolved_ids: resolved_ids.clone(),
605            available_source_references: Some(available_source_references),
606        });
607
608        // 3. Finally, plan all the subsources
609        let id_ts = self.get_catalog_write_ts().await;
610        let ids = self
611            .catalog()
612            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
613            .await?;
614        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
615            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
616            create_source_plans.push(plan);
617        }
618
619        Ok((
620            Plan::CreateSources(create_source_plans),
621            ResolvedIds::empty(),
622        ))
623    }
624
625    #[instrument]
626    pub(super) async fn sequence_create_source(
627        &mut self,
628        ctx: &mut ExecuteContext,
629        plans: Vec<plan::CreateSourcePlanBundle>,
630    ) -> Result<ExecuteResponse, AdapterError> {
631        let CreateSourceInner {
632            ops,
633            sources,
634            if_not_exists_ids,
635        } = self.create_source_inner(ctx.session(), plans).await?;
636
637        let transact_result = self
638            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
639            .await;
640
641        // Check if any sources are webhook sources and report them as created.
642        for (item_id, source) in &sources {
643            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
644                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
645                    ctx.session()
646                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
647                }
648            }
649        }
650
651        match transact_result {
652            Ok(()) => Ok(ExecuteResponse::CreatedSource),
653            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
654                kind:
655                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
656            })) if if_not_exists_ids.contains_key(&id) => {
657                ctx.session()
658                    .add_notice(AdapterNotice::ObjectAlreadyExists {
659                        name: if_not_exists_ids[&id].item.clone(),
660                        ty: "source",
661                    });
662                Ok(ExecuteResponse::CreatedSource)
663            }
664            Err(err) => Err(err),
665        }
666    }
667
668    #[instrument]
669    pub(super) async fn sequence_create_connection(
670        &mut self,
671        mut ctx: ExecuteContext,
672        plan: plan::CreateConnectionPlan,
673        resolved_ids: ResolvedIds,
674    ) {
675        let id_ts = self.get_catalog_write_ts().await;
676        let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
677            Ok(item_id) => item_id,
678            Err(err) => return ctx.retire(Err(err.into())),
679        };
680
681        match &plan.connection.details {
682            ConnectionDetails::Ssh { key_1, key_2, .. } => {
683                let key_1 = match key_1.as_key_pair() {
684                    Some(key_1) => key_1.clone(),
685                    None => {
686                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
687                            "the PUBLIC KEY 1 option cannot be explicitly specified"
688                        ))));
689                    }
690                };
691
692                let key_2 = match key_2.as_key_pair() {
693                    Some(key_2) => key_2.clone(),
694                    None => {
695                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
696                            "the PUBLIC KEY 2 option cannot be explicitly specified"
697                        ))));
698                    }
699                };
700
701                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
702                let secret = key_set.to_bytes();
703                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
704                    return ctx.retire(Err(err.into()));
705                }
706            }
707            _ => (),
708        };
709
710        if plan.validate {
711            let internal_cmd_tx = self.internal_cmd_tx.clone();
712            let transient_revision = self.catalog().transient_revision();
713            let conn_id = ctx.session().conn_id().clone();
714            let otel_ctx = OpenTelemetryContext::obtain();
715            let role_metadata = ctx.session().role_metadata().clone();
716
717            let connection = plan
718                .connection
719                .details
720                .to_connection()
721                .into_inline_connection(self.catalog().state());
722
723            let current_storage_parameters = self.controller.storage.config().clone();
724            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
725                let result = match connection
726                    .validate(connection_id, &current_storage_parameters)
727                    .await
728                {
729                    Ok(()) => Ok(plan),
730                    Err(err) => Err(err.into()),
731                };
732
733                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
734                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
735                    CreateConnectionValidationReady {
736                        ctx,
737                        result,
738                        connection_id,
739                        connection_gid,
740                        plan_validity: PlanValidity::new(
741                            transient_revision,
742                            resolved_ids.items().copied().collect(),
743                            None,
744                            None,
745                            role_metadata,
746                        ),
747                        otel_ctx,
748                        resolved_ids: resolved_ids.clone(),
749                    },
750                ));
751                if let Err(e) = result {
752                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
753                }
754            });
755        } else {
756            let result = self
757                .sequence_create_connection_stage_finish(
758                    &mut ctx,
759                    connection_id,
760                    connection_gid,
761                    plan,
762                    resolved_ids,
763                )
764                .await;
765            ctx.retire(result);
766        }
767    }
768
769    #[instrument]
770    pub(crate) async fn sequence_create_connection_stage_finish(
771        &mut self,
772        ctx: &mut ExecuteContext,
773        connection_id: CatalogItemId,
774        connection_gid: GlobalId,
775        plan: plan::CreateConnectionPlan,
776        resolved_ids: ResolvedIds,
777    ) -> Result<ExecuteResponse, AdapterError> {
778        let ops = vec![catalog::Op::CreateItem {
779            id: connection_id,
780            name: plan.name.clone(),
781            item: CatalogItem::Connection(Connection {
782                create_sql: plan.connection.create_sql,
783                global_id: connection_gid,
784                details: plan.connection.details.clone(),
785                resolved_ids,
786            }),
787            owner_id: *ctx.session().current_role_id(),
788        }];
789
790        // VPC endpoint creation for AWS PrivateLink connections is now handled
791        // in apply_catalog_implications.
792        let conn_id = ctx.session().conn_id().clone();
793        let transact_result = self
794            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
795            .await;
796
797        match transact_result {
798            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
799            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
800                kind:
801                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
802            })) if plan.if_not_exists => {
803                ctx.session()
804                    .add_notice(AdapterNotice::ObjectAlreadyExists {
805                        name: plan.name.item,
806                        ty: "connection",
807                    });
808                Ok(ExecuteResponse::CreatedConnection)
809            }
810            Err(err) => Err(err),
811        }
812    }
813
814    #[instrument]
815    pub(super) async fn sequence_create_database(
816        &mut self,
817        session: &Session,
818        plan: plan::CreateDatabasePlan,
819    ) -> Result<ExecuteResponse, AdapterError> {
820        let ops = vec![catalog::Op::CreateDatabase {
821            name: plan.name.clone(),
822            owner_id: *session.current_role_id(),
823        }];
824        match self.catalog_transact(Some(session), ops).await {
825            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
826            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
827                kind:
828                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
829            })) if plan.if_not_exists => {
830                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
831                Ok(ExecuteResponse::CreatedDatabase)
832            }
833            Err(err) => Err(err),
834        }
835    }
836
837    #[instrument]
838    pub(super) async fn sequence_create_schema(
839        &mut self,
840        session: &Session,
841        plan: plan::CreateSchemaPlan,
842    ) -> Result<ExecuteResponse, AdapterError> {
843        let op = catalog::Op::CreateSchema {
844            database_id: plan.database_spec,
845            schema_name: plan.schema_name.clone(),
846            owner_id: *session.current_role_id(),
847        };
848        match self.catalog_transact(Some(session), vec![op]).await {
849            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
850            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851                kind:
852                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853            })) if plan.if_not_exists => {
854                session.add_notice(AdapterNotice::SchemaAlreadyExists {
855                    name: plan.schema_name,
856                });
857                Ok(ExecuteResponse::CreatedSchema)
858            }
859            Err(err) => Err(err),
860        }
861    }
862
863    /// Validates the role attributes for a `CREATE ROLE` statement.
864    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866            if attributes.superuser.is_some()
867                || attributes.password.is_some()
868                || attributes.login.is_some()
869            {
870                return Err(AdapterError::UnavailableFeature {
871                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873                });
874            }
875        }
876        Ok(())
877    }
878
879    #[instrument]
880    pub(super) async fn sequence_create_role(
881        &mut self,
882        conn_id: Option<&ConnectionId>,
883        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884    ) -> Result<ExecuteResponse, AdapterError> {
885        self.validate_role_attributes(&attributes.clone())?;
886        let op = catalog::Op::CreateRole { name, attributes };
887        self.catalog_transact_with_context(conn_id, None, vec![op])
888            .await
889            .map(|_| ExecuteResponse::CreatedRole)
890    }
891
892    #[instrument]
893    pub(super) async fn sequence_create_network_policy(
894        &mut self,
895        session: &Session,
896        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897    ) -> Result<ExecuteResponse, AdapterError> {
898        let op = catalog::Op::CreateNetworkPolicy {
899            rules,
900            name,
901            owner_id: *session.current_role_id(),
902        };
903        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904            .await
905            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906    }
907
908    #[instrument]
909    pub(super) async fn sequence_alter_network_policy(
910        &mut self,
911        session: &Session,
912        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913    ) -> Result<ExecuteResponse, AdapterError> {
914        // TODO(network_policy): Consider role based network policies here.
915        let current_network_policy_name =
916            self.catalog().system_config().default_network_policy_name();
917        // Check if the way we're alerting the policy is still valid for the current connection.
918        if current_network_policy_name == name {
919            self.validate_alter_network_policy(session, &rules)?;
920        }
921
922        let op = catalog::Op::AlterNetworkPolicy {
923            id,
924            rules,
925            name,
926            owner_id: *session.current_role_id(),
927        };
928        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929            .await
930            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931    }
932
933    #[instrument]
934    pub(super) async fn sequence_create_table(
935        &mut self,
936        ctx: &mut ExecuteContext,
937        plan: plan::CreateTablePlan,
938        resolved_ids: ResolvedIds,
939    ) -> Result<ExecuteResponse, AdapterError> {
940        let plan::CreateTablePlan {
941            name,
942            table,
943            if_not_exists,
944        } = plan;
945
946        let conn_id = if table.temporary {
947            Some(ctx.session().conn_id())
948        } else {
949            None
950        };
951        let id_ts = self.get_catalog_write_ts().await;
952        let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955        let data_source = match table.data_source {
956            plan::TableDataSource::TableWrites { defaults } => {
957                TableDataSource::TableWrites { defaults }
958            }
959            plan::TableDataSource::DataSource {
960                desc: data_source_plan,
961                timeline,
962            } => match data_source_plan {
963                plan::DataSourceDesc::IngestionExport {
964                    ingestion_id,
965                    external_reference,
966                    details,
967                    data_config,
968                } => TableDataSource::DataSource {
969                    desc: DataSourceDesc::IngestionExport {
970                        ingestion_id,
971                        external_reference,
972                        details,
973                        data_config,
974                    },
975                    timeline,
976                },
977                plan::DataSourceDesc::Webhook {
978                    validate_using,
979                    body_format,
980                    headers,
981                    cluster_id,
982                } => TableDataSource::DataSource {
983                    desc: DataSourceDesc::Webhook {
984                        validate_using,
985                        body_format,
986                        headers,
987                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988                    },
989                    timeline,
990                },
991                o => {
992                    unreachable!("CREATE TABLE data source got {:?}", o)
993                }
994            },
995        };
996
997        let is_webhook = if let TableDataSource::DataSource {
998            desc: DataSourceDesc::Webhook { .. },
999            timeline: _,
1000        } = &data_source
1001        {
1002            true
1003        } else {
1004            false
1005        };
1006
1007        let table = Table {
1008            create_sql: Some(table.create_sql),
1009            desc: table.desc,
1010            collections,
1011            conn_id: conn_id.cloned(),
1012            resolved_ids,
1013            custom_logical_compaction_window: table.compaction_window,
1014            is_retained_metrics_object: false,
1015            data_source,
1016        };
1017        let ops = vec![catalog::Op::CreateItem {
1018            id: table_id,
1019            name: name.clone(),
1020            item: CatalogItem::Table(table.clone()),
1021            owner_id: *ctx.session().current_role_id(),
1022        }];
1023
1024        let catalog_result = self
1025            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026            .await;
1027
1028        if is_webhook {
1029            // try_get_webhook_url will make up a URL for things that are not
1030            // webhooks, so we guard against that here.
1031            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032                ctx.session()
1033                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034            }
1035        }
1036
1037        match catalog_result {
1038            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040                kind:
1041                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1042            })) if if_not_exists => {
1043                ctx.session_mut()
1044                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1045                        name: name.item,
1046                        ty: "table",
1047                    });
1048                Ok(ExecuteResponse::CreatedTable)
1049            }
1050            Err(err) => Err(err),
1051        }
1052    }
1053
1054    #[instrument]
1055    pub(super) async fn sequence_create_sink(
1056        &mut self,
1057        ctx: ExecuteContext,
1058        plan: plan::CreateSinkPlan,
1059        resolved_ids: ResolvedIds,
1060    ) {
1061        let plan::CreateSinkPlan {
1062            name,
1063            sink,
1064            with_snapshot,
1065            if_not_exists,
1066            in_cluster,
1067        } = plan;
1068
1069        // First try to allocate an ID and an OID. If either fails, we're done.
1070        let id_ts = self.get_catalog_write_ts().await;
1071        let (item_id, global_id) =
1072            return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1073
1074        let catalog_sink = Sink {
1075            create_sql: sink.create_sql,
1076            global_id,
1077            from: sink.from,
1078            connection: sink.connection,
1079            envelope: sink.envelope,
1080            version: sink.version,
1081            with_snapshot,
1082            resolved_ids,
1083            cluster_id: in_cluster,
1084            commit_interval: sink.commit_interval,
1085        };
1086
1087        let ops = vec![catalog::Op::CreateItem {
1088            id: item_id,
1089            name: name.clone(),
1090            item: CatalogItem::Sink(catalog_sink.clone()),
1091            owner_id: *ctx.session().current_role_id(),
1092        }];
1093
1094        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1095
1096        match result {
1097            Ok(()) => {}
1098            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1099                kind:
1100                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1101            })) if if_not_exists => {
1102                ctx.session()
1103                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1104                        name: name.item,
1105                        ty: "sink",
1106                    });
1107                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1108                return;
1109            }
1110            Err(e) => {
1111                ctx.retire(Err(e));
1112                return;
1113            }
1114        };
1115
1116        self.create_storage_export(global_id, &catalog_sink)
1117            .await
1118            .unwrap_or_terminate("cannot fail to create exports");
1119
1120        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1121            .await;
1122
1123        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1124    }
1125
1126    /// Validates that a view definition does not contain any expressions that may lead to
1127    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1128    ///
1129    /// We prevent these expressions so that we can add columns to system tables without
1130    /// changing the definition of the view.
1131    ///
1132    /// Here is a bit of a hand wavy proof as to why we only need to check the
1133    /// immediate view definition for system objects and ambiguous column
1134    /// references, and not the entire dependency tree:
1135    ///
1136    ///   - A view with no object references cannot have any ambiguous column
1137    ///   references to a system object, because it has no system objects.
1138    ///   - A view with a direct reference to a system object and a * or
1139    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1140    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1141    ///   any ambiguous column references to a system object, because all column
1142    ///   references are explicitly named.
1143    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1144    ///   system object cannot have any ambiguous column references to a system
1145    ///   object, because there are no system objects in the top level view and
1146    ///   all sub-views are guaranteed to have no ambiguous column references to
1147    ///   system objects.
1148    pub(super) fn validate_system_column_references(
1149        &self,
1150        uses_ambiguous_columns: bool,
1151        depends_on: &BTreeSet<GlobalId>,
1152    ) -> Result<(), AdapterError> {
1153        if uses_ambiguous_columns
1154            && depends_on
1155                .iter()
1156                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1157        {
1158            Err(AdapterError::AmbiguousSystemColumnReference)
1159        } else {
1160            Ok(())
1161        }
1162    }
1163
1164    #[instrument]
1165    pub(super) async fn sequence_create_type(
1166        &mut self,
1167        session: &Session,
1168        plan: plan::CreateTypePlan,
1169        resolved_ids: ResolvedIds,
1170    ) -> Result<ExecuteResponse, AdapterError> {
1171        let id_ts = self.get_catalog_write_ts().await;
1172        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1173        // Validate the type definition (e.g., composite columns) before storing.
1174        plan.typ
1175            .inner
1176            .desc(&self.catalog().for_session(session))
1177            .map_err(AdapterError::from)?;
1178        let typ = Type {
1179            create_sql: Some(plan.typ.create_sql),
1180            global_id,
1181            details: CatalogTypeDetails {
1182                array_id: None,
1183                typ: plan.typ.inner,
1184                pg_metadata: None,
1185            },
1186            resolved_ids,
1187        };
1188        let op = catalog::Op::CreateItem {
1189            id: item_id,
1190            name: plan.name,
1191            item: CatalogItem::Type(typ),
1192            owner_id: *session.current_role_id(),
1193        };
1194        match self.catalog_transact(Some(session), vec![op]).await {
1195            Ok(()) => Ok(ExecuteResponse::CreatedType),
1196            Err(err) => Err(err),
1197        }
1198    }
1199
1200    #[instrument]
1201    pub(super) async fn sequence_comment_on(
1202        &mut self,
1203        session: &Session,
1204        plan: plan::CommentPlan,
1205    ) -> Result<ExecuteResponse, AdapterError> {
1206        let op = catalog::Op::Comment {
1207            object_id: plan.object_id,
1208            sub_component: plan.sub_component,
1209            comment: plan.comment,
1210        };
1211        self.catalog_transact(Some(session), vec![op]).await?;
1212        Ok(ExecuteResponse::Comment)
1213    }
1214
1215    #[instrument]
1216    pub(super) async fn sequence_drop_objects(
1217        &mut self,
1218        ctx: &mut ExecuteContext,
1219        plan::DropObjectsPlan {
1220            drop_ids,
1221            object_type,
1222            referenced_ids,
1223        }: plan::DropObjectsPlan,
1224    ) -> Result<ExecuteResponse, AdapterError> {
1225        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1226        let mut objects = Vec::new();
1227        for obj_id in &drop_ids {
1228            if !referenced_ids_hashset.contains(obj_id) {
1229                let object_info = ErrorMessageObjectDescription::from_id(
1230                    obj_id,
1231                    &self.catalog().for_session(ctx.session()),
1232                )
1233                .to_string();
1234                objects.push(object_info);
1235            }
1236        }
1237
1238        if !objects.is_empty() {
1239            ctx.session()
1240                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1241        }
1242
1243        let DropOps {
1244            ops,
1245            dropped_active_db,
1246            dropped_active_cluster,
1247            dropped_in_use_indexes,
1248        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1249
1250        self.catalog_transact_with_context(None, Some(ctx), ops)
1251            .await?;
1252
1253        fail::fail_point!("after_sequencer_drop_replica");
1254
1255        if dropped_active_db {
1256            ctx.session()
1257                .add_notice(AdapterNotice::DroppedActiveDatabase {
1258                    name: ctx.session().vars().database().to_string(),
1259                });
1260        }
1261        if dropped_active_cluster {
1262            ctx.session()
1263                .add_notice(AdapterNotice::DroppedActiveCluster {
1264                    name: ctx.session().vars().cluster().to_string(),
1265                });
1266        }
1267        for dropped_in_use_index in dropped_in_use_indexes {
1268            ctx.session()
1269                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1270            self.metrics
1271                .optimization_notices
1272                .with_label_values(&["DroppedInUseIndex"])
1273                .inc_by(1);
1274        }
1275        Ok(ExecuteResponse::DroppedObject(object_type))
1276    }
1277
1278    fn validate_dropped_role_ownership(
1279        &self,
1280        session: &Session,
1281        dropped_roles: &BTreeMap<RoleId, &str>,
1282    ) -> Result<(), AdapterError> {
1283        fn privilege_check(
1284            privileges: &PrivilegeMap,
1285            dropped_roles: &BTreeMap<RoleId, &str>,
1286            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1287            object_id: &SystemObjectId,
1288            catalog: &ConnCatalog,
1289        ) {
1290            for privilege in privileges.all_values() {
1291                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1292                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1293                    let object_description =
1294                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1295                    dependent_objects
1296                        .entry(role_name.to_string())
1297                        .or_default()
1298                        .push(format!(
1299                            "privileges on {object_description} granted by {grantor_name}",
1300                        ));
1301                }
1302                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1303                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1304                    let object_description =
1305                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1306                    dependent_objects
1307                        .entry(role_name.to_string())
1308                        .or_default()
1309                        .push(format!(
1310                            "privileges granted on {object_description} to {grantee_name}"
1311                        ));
1312                }
1313            }
1314        }
1315
1316        let catalog = self.catalog().for_session(session);
1317        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1318        for entry in self.catalog.entries() {
1319            let id = SystemObjectId::Object(entry.id().into());
1320            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1321                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1322                dependent_objects
1323                    .entry(role_name.to_string())
1324                    .or_default()
1325                    .push(format!("owner of {object_description}"));
1326            }
1327            privilege_check(
1328                entry.privileges(),
1329                dropped_roles,
1330                &mut dependent_objects,
1331                &id,
1332                &catalog,
1333            );
1334        }
1335        for database in self.catalog.databases() {
1336            let database_id = SystemObjectId::Object(database.id().into());
1337            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1338                let object_description =
1339                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1340                dependent_objects
1341                    .entry(role_name.to_string())
1342                    .or_default()
1343                    .push(format!("owner of {object_description}"));
1344            }
1345            privilege_check(
1346                &database.privileges,
1347                dropped_roles,
1348                &mut dependent_objects,
1349                &database_id,
1350                &catalog,
1351            );
1352            for schema in database.schemas_by_id.values() {
1353                let schema_id = SystemObjectId::Object(
1354                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1355                );
1356                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1357                    let object_description =
1358                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1359                    dependent_objects
1360                        .entry(role_name.to_string())
1361                        .or_default()
1362                        .push(format!("owner of {object_description}"));
1363                }
1364                privilege_check(
1365                    &schema.privileges,
1366                    dropped_roles,
1367                    &mut dependent_objects,
1368                    &schema_id,
1369                    &catalog,
1370                );
1371            }
1372        }
1373        for cluster in self.catalog.clusters() {
1374            let cluster_id = SystemObjectId::Object(cluster.id().into());
1375            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1376                let object_description =
1377                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1378                dependent_objects
1379                    .entry(role_name.to_string())
1380                    .or_default()
1381                    .push(format!("owner of {object_description}"));
1382            }
1383            privilege_check(
1384                &cluster.privileges,
1385                dropped_roles,
1386                &mut dependent_objects,
1387                &cluster_id,
1388                &catalog,
1389            );
1390            for replica in cluster.replicas() {
1391                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1392                    let replica_id =
1393                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1394                    let object_description =
1395                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1396                    dependent_objects
1397                        .entry(role_name.to_string())
1398                        .or_default()
1399                        .push(format!("owner of {object_description}"));
1400                }
1401            }
1402        }
1403        privilege_check(
1404            self.catalog().system_privileges(),
1405            dropped_roles,
1406            &mut dependent_objects,
1407            &SystemObjectId::System,
1408            &catalog,
1409        );
1410        for (default_privilege_object, default_privilege_acl_items) in
1411            self.catalog.default_privileges()
1412        {
1413            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1414                dependent_objects
1415                    .entry(role_name.to_string())
1416                    .or_default()
1417                    .push(format!(
1418                        "default privileges on {}S created by {}",
1419                        default_privilege_object.object_type, role_name
1420                    ));
1421            }
1422            for default_privilege_acl_item in default_privilege_acl_items {
1423                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1424                    dependent_objects
1425                        .entry(role_name.to_string())
1426                        .or_default()
1427                        .push(format!(
1428                            "default privileges on {}S granted to {}",
1429                            default_privilege_object.object_type, role_name
1430                        ));
1431                }
1432            }
1433        }
1434
1435        if !dependent_objects.is_empty() {
1436            Err(AdapterError::DependentObject(dependent_objects))
1437        } else {
1438            Ok(())
1439        }
1440    }
1441
1442    #[instrument]
1443    pub(super) async fn sequence_drop_owned(
1444        &mut self,
1445        session: &Session,
1446        plan: plan::DropOwnedPlan,
1447    ) -> Result<ExecuteResponse, AdapterError> {
1448        for role_id in &plan.role_ids {
1449            self.catalog().ensure_not_reserved_role(role_id)?;
1450        }
1451
1452        let mut privilege_revokes = plan.privilege_revokes;
1453
1454        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1455        let session_catalog = self.catalog().for_session(session);
1456        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1457            && !session.is_superuser()
1458        {
1459            // Obtain all roles that the current session is a member of.
1460            let role_membership =
1461                session_catalog.collect_role_membership(session.current_role_id());
1462            let invalid_revokes: BTreeSet<_> = privilege_revokes
1463                .extract_if(.., |(_, privilege)| {
1464                    !role_membership.contains(&privilege.grantor)
1465                })
1466                .map(|(object_id, _)| object_id)
1467                .collect();
1468            for invalid_revoke in invalid_revokes {
1469                let object_description =
1470                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1471                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1472            }
1473        }
1474
1475        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1476            catalog::Op::UpdatePrivilege {
1477                target_id: object_id,
1478                privilege,
1479                variant: UpdatePrivilegeVariant::Revoke,
1480            }
1481        });
1482        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1483            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1484                privilege_object,
1485                privilege_acl_item,
1486                variant: UpdatePrivilegeVariant::Revoke,
1487            },
1488        );
1489        let DropOps {
1490            ops: drop_ops,
1491            dropped_active_db,
1492            dropped_active_cluster,
1493            dropped_in_use_indexes,
1494        } = self.sequence_drop_common(session, plan.drop_ids)?;
1495
1496        let ops = privilege_revoke_ops
1497            .chain(default_privilege_revoke_ops)
1498            .chain(drop_ops.into_iter())
1499            .collect();
1500
1501        self.catalog_transact(Some(session), ops).await?;
1502
1503        if dropped_active_db {
1504            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1505                name: session.vars().database().to_string(),
1506            });
1507        }
1508        if dropped_active_cluster {
1509            session.add_notice(AdapterNotice::DroppedActiveCluster {
1510                name: session.vars().cluster().to_string(),
1511            });
1512        }
1513        for dropped_in_use_index in dropped_in_use_indexes {
1514            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1515        }
1516        Ok(ExecuteResponse::DroppedOwned)
1517    }
1518
1519    fn sequence_drop_common(
1520        &self,
1521        session: &Session,
1522        ids: Vec<ObjectId>,
1523    ) -> Result<DropOps, AdapterError> {
1524        let mut dropped_active_db = false;
1525        let mut dropped_active_cluster = false;
1526        let mut dropped_in_use_indexes = Vec::new();
1527        let mut dropped_roles = BTreeMap::new();
1528        let mut dropped_databases = BTreeSet::new();
1529        let mut dropped_schemas = BTreeSet::new();
1530        // Dropping either the group role or the member role of a role membership will trigger a
1531        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1532        // role membership twice.
1533        let mut role_revokes = BTreeSet::new();
1534        // Dropping a database or a schema will revoke all default roles associated with that
1535        // database or schema.
1536        let mut default_privilege_revokes = BTreeSet::new();
1537
1538        // Clusters we're dropping
1539        let mut clusters_to_drop = BTreeSet::new();
1540
1541        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1542        for id in &ids {
1543            match id {
1544                ObjectId::Database(id) => {
1545                    let name = self.catalog().get_database(id).name();
1546                    if name == session.vars().database() {
1547                        dropped_active_db = true;
1548                    }
1549                    dropped_databases.insert(id);
1550                }
1551                ObjectId::Schema((_, spec)) => {
1552                    if let SchemaSpecifier::Id(id) = spec {
1553                        dropped_schemas.insert(id);
1554                    }
1555                }
1556                ObjectId::Cluster(id) => {
1557                    clusters_to_drop.insert(*id);
1558                    if let Some(active_id) = self
1559                        .catalog()
1560                        .active_cluster(session)
1561                        .ok()
1562                        .map(|cluster| cluster.id())
1563                    {
1564                        if id == &active_id {
1565                            dropped_active_cluster = true;
1566                        }
1567                    }
1568                }
1569                ObjectId::Role(id) => {
1570                    let role = self.catalog().get_role(id);
1571                    let name = role.name();
1572                    dropped_roles.insert(*id, name);
1573                    // We must revoke all role memberships that the dropped roles belongs to.
1574                    for (group_id, grantor_id) in &role.membership.map {
1575                        role_revokes.insert((*group_id, *id, *grantor_id));
1576                    }
1577                }
1578                ObjectId::Item(id) => {
1579                    if let Some(index) = self.catalog().get_entry(id).index() {
1580                        let humanizer = self.catalog().for_session(session);
1581                        let dependants = self
1582                            .controller
1583                            .compute
1584                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1585                            .ok()
1586                            .into_iter()
1587                            .flatten()
1588                            .filter(|dependant_id| {
1589                                // Transient Ids belong to Peeks. We are not interested for now in
1590                                // peeks depending on a dropped index.
1591                                // TODO: show a different notice in this case. Something like
1592                                // "There is an in-progress ad hoc SELECT that uses the dropped
1593                                // index. The resources used by the index will be freed when all
1594                                // such SELECTs complete."
1595                                if dependant_id.is_transient() {
1596                                    return false;
1597                                }
1598                                // The item should exist, but don't panic if it doesn't.
1599                                let Some(dependent_id) = humanizer
1600                                    .try_get_item_by_global_id(dependant_id)
1601                                    .map(|item| item.id())
1602                                else {
1603                                    return false;
1604                                };
1605                                // If the dependent object is also being dropped, then there is no
1606                                // problem, so we don't want a notice.
1607                                !ids_set.contains(&ObjectId::Item(dependent_id))
1608                            })
1609                            .flat_map(|dependant_id| {
1610                                // If we are not able to find a name for this ID it probably means
1611                                // we have already dropped the compute collection, in which case we
1612                                // can ignore it.
1613                                humanizer.humanize_id(dependant_id)
1614                            })
1615                            .collect_vec();
1616                        if !dependants.is_empty() {
1617                            dropped_in_use_indexes.push(DroppedInUseIndex {
1618                                index_name: humanizer
1619                                    .humanize_id(index.global_id())
1620                                    .unwrap_or_else(|| id.to_string()),
1621                                dependant_objects: dependants,
1622                            });
1623                        }
1624                    }
1625                }
1626                _ => {}
1627            }
1628        }
1629
1630        for id in &ids {
1631            match id {
1632                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1633                // unless they are internal replicas, which exist outside the scope
1634                // of managed clusters.
1635                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1636                    if !clusters_to_drop.contains(cluster_id) {
1637                        let cluster = self.catalog.get_cluster(*cluster_id);
1638                        if cluster.is_managed() {
1639                            let replica =
1640                                cluster.replica(*replica_id).expect("Catalog out of sync");
1641                            if !replica.config.location.internal() {
1642                                coord_bail!("cannot drop replica of managed cluster");
1643                            }
1644                        }
1645                    }
1646                }
1647                _ => {}
1648            }
1649        }
1650
1651        for role_id in dropped_roles.keys() {
1652            self.catalog().ensure_not_reserved_role(role_id)?;
1653        }
1654        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1655        // If any role is a member of a dropped role, then we must revoke that membership.
1656        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1657        for role in self.catalog().user_roles() {
1658            for dropped_role_id in
1659                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1660            {
1661                role_revokes.insert((
1662                    **dropped_role_id,
1663                    role.id(),
1664                    *role
1665                        .membership
1666                        .map
1667                        .get(*dropped_role_id)
1668                        .expect("included in keys above"),
1669                ));
1670            }
1671        }
1672
1673        for (default_privilege_object, default_privilege_acls) in
1674            self.catalog().default_privileges()
1675        {
1676            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1677                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1678            {
1679                for default_privilege_acl in default_privilege_acls {
1680                    default_privilege_revokes.insert((
1681                        default_privilege_object.clone(),
1682                        default_privilege_acl.clone(),
1683                    ));
1684                }
1685            }
1686        }
1687
1688        let ops = role_revokes
1689            .into_iter()
1690            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1691                role_id,
1692                member_id,
1693                grantor_id,
1694            })
1695            .chain(default_privilege_revokes.into_iter().map(
1696                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1697                    privilege_object,
1698                    privilege_acl_item,
1699                    variant: UpdatePrivilegeVariant::Revoke,
1700                },
1701            ))
1702            .chain(iter::once(catalog::Op::DropObjects(
1703                ids.into_iter()
1704                    .map(DropObjectInfo::manual_drop_from_object_id)
1705                    .collect(),
1706            )))
1707            .collect();
1708
1709        Ok(DropOps {
1710            ops,
1711            dropped_active_db,
1712            dropped_active_cluster,
1713            dropped_in_use_indexes,
1714        })
1715    }
1716
1717    pub(super) fn sequence_explain_schema(
1718        &self,
1719        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1720    ) -> Result<ExecuteResponse, AdapterError> {
1721        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1722            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1723        })?;
1724
1725        let json_string = json_string(&json_value);
1726        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1727        Ok(Self::send_immediate_rows(row))
1728    }
1729
1730    pub(super) fn sequence_show_all_variables(
1731        &self,
1732        session: &Session,
1733    ) -> Result<ExecuteResponse, AdapterError> {
1734        let mut rows = viewable_variables(self.catalog().state(), session)
1735            .map(|v| (v.name(), v.value(), v.description()))
1736            .collect::<Vec<_>>();
1737        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1738
1739        // TODO(parkmycar): Pack all of these into a single RowCollection.
1740        let rows: Vec<_> = rows
1741            .into_iter()
1742            .map(|(name, val, desc)| {
1743                Row::pack_slice(&[
1744                    Datum::String(name),
1745                    Datum::String(&val),
1746                    Datum::String(desc),
1747                ])
1748            })
1749            .collect();
1750        Ok(Self::send_immediate_rows(rows))
1751    }
1752
1753    pub(super) fn sequence_show_variable(
1754        &self,
1755        session: &Session,
1756        plan: plan::ShowVariablePlan,
1757    ) -> Result<ExecuteResponse, AdapterError> {
1758        if &plan.name == SCHEMA_ALIAS {
1759            let schemas = self.catalog.resolve_search_path(session);
1760            let schema = schemas.first();
1761            return match schema {
1762                Some((database_spec, schema_spec)) => {
1763                    let schema_name = &self
1764                        .catalog
1765                        .get_schema(database_spec, schema_spec, session.conn_id())
1766                        .name()
1767                        .schema;
1768                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1769                    Ok(Self::send_immediate_rows(row))
1770                }
1771                None => {
1772                    if session.vars().current_object_missing_warnings() {
1773                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1774                            search_path: session
1775                                .vars()
1776                                .search_path()
1777                                .into_iter()
1778                                .map(|schema| schema.to_string())
1779                                .collect(),
1780                        });
1781                    }
1782                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1783                }
1784            };
1785        }
1786
1787        let variable = session
1788            .vars()
1789            .get(self.catalog().system_config(), &plan.name)
1790            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1791
1792        // In lieu of plumbing the user to all system config functions, just check that the var is
1793        // visible.
1794        variable.visible(session.user(), self.catalog().system_config())?;
1795
1796        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1797        if variable.name() == vars::DATABASE.name()
1798            && matches!(
1799                self.catalog().resolve_database(&variable.value()),
1800                Err(CatalogError::UnknownDatabase(_))
1801            )
1802            && session.vars().current_object_missing_warnings()
1803        {
1804            let name = variable.value();
1805            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1806        } else if variable.name() == vars::CLUSTER.name()
1807            && matches!(
1808                self.catalog().resolve_cluster(&variable.value()),
1809                Err(CatalogError::UnknownCluster(_))
1810            )
1811            && session.vars().current_object_missing_warnings()
1812        {
1813            let name = variable.value();
1814            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1815        }
1816        Ok(Self::send_immediate_rows(row))
1817    }
1818
1819    #[instrument]
1820    pub(super) async fn sequence_inspect_shard(
1821        &self,
1822        session: &Session,
1823        plan: plan::InspectShardPlan,
1824    ) -> Result<ExecuteResponse, AdapterError> {
1825        // TODO: Not thrilled about this rbac special case here, but probably
1826        // sufficient for now.
1827        if !session.user().is_internal() {
1828            return Err(AdapterError::Unauthorized(
1829                rbac::UnauthorizedError::MzSystem {
1830                    action: "inspect".into(),
1831                },
1832            ));
1833        }
1834        let state = self
1835            .controller
1836            .storage
1837            .inspect_persist_state(plan.id)
1838            .await?;
1839        let jsonb = Jsonb::from_serde_json(state)?;
1840        Ok(Self::send_immediate_rows(jsonb.into_row()))
1841    }
1842
1843    #[instrument]
1844    pub(super) fn sequence_set_variable(
1845        &self,
1846        session: &mut Session,
1847        plan: plan::SetVariablePlan,
1848    ) -> Result<ExecuteResponse, AdapterError> {
1849        let (name, local) = (plan.name, plan.local);
1850        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1851            self.validate_set_isolation_level(session)?;
1852        }
1853        if &name == vars::CLUSTER.name() {
1854            self.validate_set_cluster(session)?;
1855        }
1856
1857        let vars = session.vars_mut();
1858        let values = match plan.value {
1859            plan::VariableValue::Default => None,
1860            plan::VariableValue::Values(values) => Some(values),
1861        };
1862
1863        match values {
1864            Some(values) => {
1865                vars.set(
1866                    self.catalog().system_config(),
1867                    &name,
1868                    VarInput::SqlSet(&values),
1869                    local,
1870                )?;
1871
1872                let vars = session.vars();
1873
1874                // Emit a warning when deprecated variables are used.
1875                // TODO(database-issues#8069) remove this after sufficient time has passed
1876                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1877                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1878                } else if name == vars::CLUSTER.name()
1879                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1880                {
1881                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1882                }
1883
1884                // Database or cluster value does not correspond to a catalog item.
1885                if name.as_str() == vars::DATABASE.name()
1886                    && matches!(
1887                        self.catalog().resolve_database(vars.database()),
1888                        Err(CatalogError::UnknownDatabase(_))
1889                    )
1890                    && session.vars().current_object_missing_warnings()
1891                {
1892                    let name = vars.database().to_string();
1893                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1894                } else if name.as_str() == vars::CLUSTER.name()
1895                    && matches!(
1896                        self.catalog().resolve_cluster(vars.cluster()),
1897                        Err(CatalogError::UnknownCluster(_))
1898                    )
1899                    && session.vars().current_object_missing_warnings()
1900                {
1901                    let name = vars.cluster().to_string();
1902                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1903                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1904                    let v = values.into_first().to_lowercase();
1905                    if v == IsolationLevel::ReadUncommitted.as_str()
1906                        || v == IsolationLevel::ReadCommitted.as_str()
1907                        || v == IsolationLevel::RepeatableRead.as_str()
1908                    {
1909                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1910                            isolation_level: v,
1911                        });
1912                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1913                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1914                    }
1915                }
1916            }
1917            None => vars.reset(self.catalog().system_config(), &name, local)?,
1918        }
1919
1920        Ok(ExecuteResponse::SetVariable { name, reset: false })
1921    }
1922
1923    pub(super) fn sequence_reset_variable(
1924        &self,
1925        session: &mut Session,
1926        plan: plan::ResetVariablePlan,
1927    ) -> Result<ExecuteResponse, AdapterError> {
1928        let name = plan.name;
1929        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1930            self.validate_set_isolation_level(session)?;
1931        }
1932        if &name == vars::CLUSTER.name() {
1933            self.validate_set_cluster(session)?;
1934        }
1935        session
1936            .vars_mut()
1937            .reset(self.catalog().system_config(), &name, false)?;
1938        Ok(ExecuteResponse::SetVariable { name, reset: true })
1939    }
1940
1941    pub(super) fn sequence_set_transaction(
1942        &self,
1943        session: &mut Session,
1944        plan: plan::SetTransactionPlan,
1945    ) -> Result<ExecuteResponse, AdapterError> {
1946        // TODO(jkosh44) Only supports isolation levels for now.
1947        for mode in plan.modes {
1948            match mode {
1949                TransactionMode::AccessMode(_) => {
1950                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1951                }
1952                TransactionMode::IsolationLevel(isolation_level) => {
1953                    self.validate_set_isolation_level(session)?;
1954
1955                    session.vars_mut().set(
1956                        self.catalog().system_config(),
1957                        TRANSACTION_ISOLATION_VAR_NAME,
1958                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1959                        plan.local,
1960                    )?
1961                }
1962            }
1963        }
1964        Ok(ExecuteResponse::SetVariable {
1965            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1966            reset: false,
1967        })
1968    }
1969
1970    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1971        if session.transaction().contains_ops() {
1972            Err(AdapterError::InvalidSetIsolationLevel)
1973        } else {
1974            Ok(())
1975        }
1976    }
1977
1978    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1979        if session.transaction().contains_ops() {
1980            Err(AdapterError::InvalidSetCluster)
1981        } else {
1982            Ok(())
1983        }
1984    }
1985
1986    #[instrument]
1987    pub(super) async fn sequence_end_transaction(
1988        &mut self,
1989        mut ctx: ExecuteContext,
1990        mut action: EndTransactionAction,
1991    ) {
1992        // If the transaction has failed, we can only rollback.
1993        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1994            (&action, ctx.session().transaction())
1995        {
1996            action = EndTransactionAction::Rollback;
1997        }
1998        let response = match action {
1999            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2000                params: BTreeMap::new(),
2001            }),
2002            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2003                params: BTreeMap::new(),
2004            }),
2005        };
2006
2007        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2008
2009        let (response, action) = match result {
2010            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2011                (response, action)
2012            }
2013            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2014                // Make sure we have the correct set of write locks for this transaction.
2015                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2016                // transactions.
2017                let validated_locks = match write_lock_guards {
2018                    None => None,
2019                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2020                        Ok(locks) => Some(locks),
2021                        Err(missing) => {
2022                            tracing::error!(?missing, "programming error, missing write locks");
2023                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2024                        }
2025                    },
2026                };
2027
2028                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2029                for WriteOp { id, rows } in writes {
2030                    let total_rows = collected_writes.entry(id).or_default();
2031                    total_rows.push(rows);
2032                }
2033
2034                self.submit_write(PendingWriteTxn::User {
2035                    span: Span::current(),
2036                    writes: collected_writes,
2037                    write_locks: validated_locks,
2038                    pending_txn: PendingTxn {
2039                        ctx,
2040                        response,
2041                        action,
2042                    },
2043                });
2044                return;
2045            }
2046            Ok((
2047                Some(TransactionOps::Peeks {
2048                    determination,
2049                    requires_linearization: RequireLinearization::Required,
2050                    ..
2051                }),
2052                _,
2053            )) if ctx.session().vars().transaction_isolation()
2054                == &IsolationLevel::StrictSerializable =>
2055            {
2056                let conn_id = ctx.session().conn_id().clone();
2057                let pending_read_txn = PendingReadTxn {
2058                    txn: PendingRead::Read {
2059                        txn: PendingTxn {
2060                            ctx,
2061                            response,
2062                            action,
2063                        },
2064                    },
2065                    timestamp_context: determination.timestamp_context,
2066                    created: Instant::now(),
2067                    num_requeues: 0,
2068                    otel_ctx: OpenTelemetryContext::obtain(),
2069                };
2070                self.strict_serializable_reads_tx
2071                    .send((conn_id, pending_read_txn))
2072                    .expect("sending to strict_serializable_reads_tx cannot fail");
2073                return;
2074            }
2075            Ok((
2076                Some(TransactionOps::Peeks {
2077                    determination,
2078                    requires_linearization: RequireLinearization::Required,
2079                    ..
2080                }),
2081                _,
2082            )) if ctx.session().vars().transaction_isolation()
2083                == &IsolationLevel::StrongSessionSerializable =>
2084            {
2085                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2086                    ctx.session_mut()
2087                        .ensure_timestamp_oracle(timeline.clone())
2088                        .apply_write(*ts);
2089                }
2090                (response, action)
2091            }
2092            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2093                self.internal_cmd_tx
2094                    .send(Message::ExecuteSingleStatementTransaction {
2095                        ctx,
2096                        otel_ctx: OpenTelemetryContext::obtain(),
2097                        stmt,
2098                        params,
2099                    })
2100                    .expect("must send");
2101                return;
2102            }
2103            Ok((_, _)) => (response, action),
2104            Err(err) => (Err(err), EndTransactionAction::Rollback),
2105        };
2106        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2107        // Append any parameters that changed to the response.
2108        let response = response.map(|mut r| {
2109            r.extend_params(changed);
2110            ExecuteResponse::from(r)
2111        });
2112
2113        ctx.retire(response);
2114    }
2115
2116    #[instrument]
2117    async fn sequence_end_transaction_inner(
2118        &mut self,
2119        ctx: &mut ExecuteContext,
2120        action: EndTransactionAction,
2121    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2122        let txn = self.clear_transaction(ctx.session_mut()).await;
2123
2124        if let EndTransactionAction::Commit = action {
2125            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2126                match &mut ops {
2127                    TransactionOps::Writes(writes) => {
2128                        for WriteOp { id, .. } in &mut writes.iter() {
2129                            // Re-verify this id exists.
2130                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2131                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2132                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2133                                        CatalogError::UnknownItem(id.to_string()),
2134                                    ),
2135                                })
2136                            })?;
2137                        }
2138
2139                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2140                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2141                    }
2142                    TransactionOps::DDL {
2143                        ops,
2144                        state: _,
2145                        side_effects,
2146                        revision,
2147                    } => {
2148                        // Make sure our catalog hasn't changed.
2149                        if *revision != self.catalog().transient_revision() {
2150                            return Err(AdapterError::DDLTransactionRace);
2151                        }
2152                        // Commit all of our queued ops.
2153                        let ops = std::mem::take(ops);
2154                        let side_effects = std::mem::take(side_effects);
2155                        self.catalog_transact_with_side_effects(
2156                            Some(ctx),
2157                            ops,
2158                            move |a, mut ctx| {
2159                                Box::pin(async move {
2160                                    for side_effect in side_effects {
2161                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2162                                    }
2163                                })
2164                            },
2165                        )
2166                        .await?;
2167                    }
2168                    _ => (),
2169                }
2170                return Ok((Some(ops), write_lock_guards));
2171            }
2172        }
2173
2174        Ok((None, None))
2175    }
2176
2177    pub(super) async fn sequence_side_effecting_func(
2178        &mut self,
2179        ctx: ExecuteContext,
2180        plan: SideEffectingFunc,
2181    ) {
2182        match plan {
2183            SideEffectingFunc::PgCancelBackend { connection_id } => {
2184                if ctx.session().conn_id().unhandled() == connection_id {
2185                    // As a special case, if we're canceling ourselves, we send
2186                    // back a canceled resposne to the client issuing the query,
2187                    // and so we need to do no further processing of the cancel.
2188                    ctx.retire(Err(AdapterError::Canceled));
2189                    return;
2190                }
2191
2192                let res = if let Some((id_handle, _conn_meta)) =
2193                    self.active_conns.get_key_value(&connection_id)
2194                {
2195                    // check_plan already verified role membership.
2196                    self.handle_privileged_cancel(id_handle.clone()).await;
2197                    Datum::True
2198                } else {
2199                    Datum::False
2200                };
2201                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2202            }
2203        }
2204    }
2205
2206    /// Execute a side-effecting function from the frontend peek path.
2207    /// This is separate from `sequence_side_effecting_func` because
2208    /// - It doesn't have an ExecuteContext.
2209    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2210    ///   peek sequencing can't look at `active_conns`.
2211    ///
2212    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2213    /// sequencing.
2214    pub(crate) async fn execute_side_effecting_func(
2215        &mut self,
2216        plan: SideEffectingFunc,
2217        conn_id: ConnectionId,
2218        current_role: RoleId,
2219    ) -> Result<ExecuteResponse, AdapterError> {
2220        match plan {
2221            SideEffectingFunc::PgCancelBackend { connection_id } => {
2222                if conn_id.unhandled() == connection_id {
2223                    // As a special case, if we're canceling ourselves, we return
2224                    // a canceled response to the client issuing the query,
2225                    // and so we need to do no further processing of the cancel.
2226                    return Err(AdapterError::Canceled);
2227                }
2228
2229                // Perform RBAC check: the current user must be a member of the role
2230                // that owns the connection being cancelled.
2231                if let Some((_id_handle, conn_meta)) =
2232                    self.active_conns.get_key_value(&connection_id)
2233                {
2234                    let target_role = *conn_meta.authenticated_role_id();
2235                    let role_membership = self
2236                        .catalog()
2237                        .state()
2238                        .collect_role_membership(&current_role);
2239                    if !role_membership.contains(&target_role) {
2240                        let target_role_name = self
2241                            .catalog()
2242                            .try_get_role(&target_role)
2243                            .map(|role| role.name().to_string())
2244                            .unwrap_or_else(|| target_role.to_string());
2245                        return Err(AdapterError::Unauthorized(
2246                            rbac::UnauthorizedError::RoleMembership {
2247                                role_names: vec![target_role_name],
2248                            },
2249                        ));
2250                    }
2251
2252                    // RBAC check passed, proceed with cancellation.
2253                    let id_handle = self
2254                        .active_conns
2255                        .get_key_value(&connection_id)
2256                        .map(|(id, _)| id.clone())
2257                        .expect("checked above");
2258                    self.handle_privileged_cancel(id_handle).await;
2259                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2260                } else {
2261                    // Connection not found, return false.
2262                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2263                }
2264            }
2265        }
2266    }
2267
2268    /// Inner method that performs the actual real-time recency timestamp determination.
2269    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2270    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2271    pub(crate) async fn determine_real_time_recent_timestamp(
2272        &self,
2273        source_ids: impl Iterator<Item = GlobalId>,
2274        real_time_recency_timeout: Duration,
2275    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2276    {
2277        let item_ids = source_ids
2278            .map(|gid| {
2279                self.catalog
2280                    .try_resolve_item_id(&gid)
2281                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2282            })
2283            .collect::<Result<Vec<_>, _>>()?;
2284
2285        // Find all dependencies transitively because we need to ensure that
2286        // RTR queries determine the timestamp from the sources' (i.e.
2287        // storage objects that ingest data from external systems) remap
2288        // data. We "cheat" a little bit and filter out any IDs that aren't
2289        // user objects because we know they are not a RTR source.
2290        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2291        // If none of the sources are user objects, we don't need to provide
2292        // a RTR timestamp.
2293        if to_visit.is_empty() {
2294            return Ok(None);
2295        }
2296
2297        let mut timestamp_objects = BTreeSet::new();
2298
2299        while let Some(id) = to_visit.pop_front() {
2300            timestamp_objects.insert(id);
2301            to_visit.extend(
2302                self.catalog()
2303                    .get_entry(&id)
2304                    .uses()
2305                    .into_iter()
2306                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2307            );
2308        }
2309        let timestamp_objects = timestamp_objects
2310            .into_iter()
2311            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2312            .collect();
2313
2314        let r = self
2315            .controller
2316            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2317            .await?;
2318
2319        Ok(Some(r))
2320    }
2321
2322    /// Checks to see if the session needs a real time recency timestamp and if so returns
2323    /// a future that will return the timestamp.
2324    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2325        &self,
2326        session: &Session,
2327        source_ids: impl Iterator<Item = GlobalId>,
2328    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2329    {
2330        let vars = session.vars();
2331
2332        if vars.real_time_recency()
2333            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2334            && !session.contains_read_timestamp()
2335        {
2336            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2337                .await
2338        } else {
2339            Ok(None)
2340        }
2341    }
2342
2343    #[instrument]
2344    pub(super) async fn sequence_explain_plan(
2345        &mut self,
2346        ctx: ExecuteContext,
2347        plan: plan::ExplainPlanPlan,
2348        target_cluster: TargetCluster,
2349    ) {
2350        match &plan.explainee {
2351            plan::Explainee::Statement(stmt) => match stmt {
2352                plan::ExplaineeStatement::CreateView { .. } => {
2353                    self.explain_create_view(ctx, plan).await;
2354                }
2355                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2356                    self.explain_create_materialized_view(ctx, plan).await;
2357                }
2358                plan::ExplaineeStatement::CreateIndex { .. } => {
2359                    self.explain_create_index(ctx, plan).await;
2360                }
2361                plan::ExplaineeStatement::Select { .. } => {
2362                    self.explain_peek(ctx, plan, target_cluster).await;
2363                }
2364            },
2365            plan::Explainee::View(_) => {
2366                let result = self.explain_view(&ctx, plan);
2367                ctx.retire(result);
2368            }
2369            plan::Explainee::MaterializedView(_) => {
2370                let result = self.explain_materialized_view(&ctx, plan);
2371                ctx.retire(result);
2372            }
2373            plan::Explainee::Index(_) => {
2374                let result = self.explain_index(&ctx, plan);
2375                ctx.retire(result);
2376            }
2377            plan::Explainee::ReplanView(_) => {
2378                self.explain_replan_view(ctx, plan).await;
2379            }
2380            plan::Explainee::ReplanMaterializedView(_) => {
2381                self.explain_replan_materialized_view(ctx, plan).await;
2382            }
2383            plan::Explainee::ReplanIndex(_) => {
2384                self.explain_replan_index(ctx, plan).await;
2385            }
2386        };
2387    }
2388
2389    pub(super) async fn sequence_explain_pushdown(
2390        &mut self,
2391        ctx: ExecuteContext,
2392        plan: plan::ExplainPushdownPlan,
2393        target_cluster: TargetCluster,
2394    ) {
2395        match plan.explainee {
2396            Explainee::Statement(ExplaineeStatement::Select {
2397                broken: false,
2398                plan,
2399                desc: _,
2400            }) => {
2401                let stage = return_if_err!(
2402                    self.peek_validate(
2403                        ctx.session(),
2404                        plan,
2405                        target_cluster,
2406                        None,
2407                        ExplainContext::Pushdown,
2408                        Some(ctx.session().vars().max_query_result_size()),
2409                    ),
2410                    ctx
2411                );
2412                self.sequence_staged(ctx, Span::current(), stage).await;
2413            }
2414            Explainee::MaterializedView(item_id) => {
2415                self.explain_pushdown_materialized_view(ctx, item_id).await;
2416            }
2417            _ => {
2418                ctx.retire(Err(AdapterError::Unsupported(
2419                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2420                )));
2421            }
2422        };
2423    }
2424
2425    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2426    async fn execute_explain_pushdown_with_read_holds(
2427        &self,
2428        ctx: ExecuteContext,
2429        as_of: Antichain<Timestamp>,
2430        mz_now: ResultSpec<'static>,
2431        read_holds: Option<ReadHolds<Timestamp>>,
2432        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2433    ) {
2434        let fut = self
2435            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2436            .await;
2437        task::spawn(|| "render explain pushdown", async move {
2438            // Transfer the necessary read holds over to the background task
2439            let _read_holds = read_holds;
2440            let res = fut.await;
2441            ctx.retire(res);
2442        });
2443    }
2444
2445    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2446    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2447        &self,
2448        session: &Session,
2449        as_of: Antichain<Timestamp>,
2450        mz_now: ResultSpec<'static>,
2451        imports: I,
2452    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2453        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2454        super::explain_pushdown_future_inner(
2455            session,
2456            &self.catalog,
2457            &self.controller.storage_collections,
2458            as_of,
2459            mz_now,
2460            imports,
2461        )
2462        .await
2463    }
2464
2465    #[instrument]
2466    pub(super) async fn sequence_insert(
2467        &mut self,
2468        mut ctx: ExecuteContext,
2469        plan: plan::InsertPlan,
2470    ) {
2471        // Normally, this would get checked when trying to add "write ops" to
2472        // the transaction but we go down diverging paths below, based on
2473        // whether the INSERT is only constant values or not.
2474        //
2475        // For the non-constant case we sequence an implicit read-then-write,
2476        // which messes with the transaction ops and would allow an implicit
2477        // read-then-write to sneak into a read-only transaction.
2478        if !ctx.session_mut().transaction().allows_writes() {
2479            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2480            return;
2481        }
2482
2483        // The structure of this code originates from a time where
2484        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2485        // optimized `MirRelationExpr`.
2486        //
2487        // Ideally, we would like to make the `selection.as_const().is_some()`
2488        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2489        // are planned as a Wrap($n, $vals) call, so until we can reduce
2490        // HirRelationExpr this will always returns false.
2491        //
2492        // Unfortunately, hitting the default path of the match below also
2493        // causes a lot of tests to fail, so we opted to go with the extra
2494        // `plan.values.clone()` statements when producing the `optimized_mir`
2495        // and re-optimize the values in the `sequence_read_then_write` call.
2496        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2497            // We don't perform any optimizations on an expression that is already
2498            // a constant for writes, as we want to maximize bulk-insert throughput.
2499            let expr = return_if_err!(
2500                plan.values
2501                    .clone()
2502                    .lower(self.catalog().system_config(), None),
2503                ctx
2504            );
2505            OptimizedMirRelationExpr(expr)
2506        } else {
2507            // Collect optimizer parameters.
2508            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2509
2510            // (`optimize::view::Optimizer` has a special case for constant queries.)
2511            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2512
2513            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2514            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2515        };
2516
2517        match optimized_mir.into_inner() {
2518            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2519                let catalog = self.owned_catalog();
2520                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2521                    let result =
2522                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2523                    ctx.retire(result);
2524                });
2525            }
2526            // All non-constant values must be planned as read-then-writes.
2527            _ => {
2528                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2529                    Some(table) => {
2530                        // Inserts always occur at the latest version of the table.
2531                        let desc = table.relation_desc_latest().expect("table has a desc");
2532                        desc.arity()
2533                    }
2534                    None => {
2535                        ctx.retire(Err(AdapterError::Catalog(
2536                            mz_catalog::memory::error::Error {
2537                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2538                                    CatalogError::UnknownItem(plan.id.to_string()),
2539                                ),
2540                            },
2541                        )));
2542                        return;
2543                    }
2544                };
2545
2546                let finishing = RowSetFinishing {
2547                    order_by: vec![],
2548                    limit: None,
2549                    offset: 0,
2550                    project: (0..desc_arity).collect(),
2551                };
2552
2553                let read_then_write_plan = plan::ReadThenWritePlan {
2554                    id: plan.id,
2555                    selection: plan.values,
2556                    finishing,
2557                    assignments: BTreeMap::new(),
2558                    kind: MutationKind::Insert,
2559                    returning: plan.returning,
2560                };
2561
2562                self.sequence_read_then_write(ctx, read_then_write_plan)
2563                    .await;
2564            }
2565        }
2566    }
2567
2568    /// ReadThenWrite is a plan whose writes depend on the results of a
2569    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2570    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2571    /// serializability violation could occur.
2572    #[instrument]
2573    pub(super) async fn sequence_read_then_write(
2574        &mut self,
2575        mut ctx: ExecuteContext,
2576        plan: plan::ReadThenWritePlan,
2577    ) {
2578        let mut source_ids: BTreeSet<_> = plan
2579            .selection
2580            .depends_on()
2581            .into_iter()
2582            .map(|gid| self.catalog().resolve_item_id(&gid))
2583            .collect();
2584        source_ids.insert(plan.id);
2585
2586        // If the transaction doesn't already have write locks, acquire them.
2587        if ctx.session().transaction().write_locks().is_none() {
2588            // Pre-define all of the locks we need.
2589            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2590
2591            // Try acquiring all of our locks.
2592            for id in &source_ids {
2593                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2594                    write_locks.insert_lock(*id, lock);
2595                }
2596            }
2597
2598            // See if we acquired all of the neccessary locks.
2599            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2600                Ok(locks) => locks,
2601                Err(missing) => {
2602                    // Defer our write if we couldn't acquire all of the locks.
2603                    let role_metadata = ctx.session().role_metadata().clone();
2604                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2605                    let plan = DeferredPlan {
2606                        ctx,
2607                        plan: Plan::ReadThenWrite(plan),
2608                        validity: PlanValidity::new(
2609                            self.catalog.transient_revision(),
2610                            source_ids.clone(),
2611                            None,
2612                            None,
2613                            role_metadata,
2614                        ),
2615                        requires_locks: source_ids,
2616                    };
2617                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2618                }
2619            };
2620
2621            ctx.session_mut()
2622                .try_grant_write_locks(write_locks)
2623                .expect("session has already been granted write locks");
2624        }
2625
2626        let plan::ReadThenWritePlan {
2627            id,
2628            kind,
2629            selection,
2630            mut assignments,
2631            finishing,
2632            mut returning,
2633        } = plan;
2634
2635        // Read then writes can be queued, so re-verify the id exists.
2636        let desc = match self.catalog().try_get_entry(&id) {
2637            Some(table) => {
2638                // Inserts always occur at the latest version of the table.
2639                table
2640                    .relation_desc_latest()
2641                    .expect("table has a desc")
2642                    .into_owned()
2643            }
2644            None => {
2645                ctx.retire(Err(AdapterError::Catalog(
2646                    mz_catalog::memory::error::Error {
2647                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2648                            id.to_string(),
2649                        )),
2650                    },
2651                )));
2652                return;
2653            }
2654        };
2655
2656        // Disallow mz_now in any position because read time and write time differ.
2657        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2658            || assignments.values().any(|e| e.contains_temporal())
2659            || returning.iter().any(|e| e.contains_temporal());
2660        if contains_temporal {
2661            ctx.retire(Err(AdapterError::Unsupported(
2662                "calls to mz_now in write statements",
2663            )));
2664            return;
2665        }
2666
2667        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2668        //
2669        // - They do not refer to any objects whose notion of time moves differently than that of
2670        // user tables. This limitation is meant to ensure no writes occur between this read and the
2671        // subsequent write.
2672        // - They do not use mz_now(), whose time produced during read will differ from the write
2673        //   timestamp.
2674        fn validate_read_dependencies(
2675            catalog: &Catalog,
2676            id: &CatalogItemId,
2677        ) -> Result<(), AdapterError> {
2678            use CatalogItemType::*;
2679            use mz_catalog::memory::objects;
2680            let mut ids_to_check = Vec::new();
2681            let valid = match catalog.try_get_entry(id) {
2682                Some(entry) => {
2683                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2684                    | CatalogItem::MaterializedView(objects::MaterializedView {
2685                        optimized_expr,
2686                        ..
2687                    }) = entry.item()
2688                    {
2689                        if optimized_expr.contains_temporal() {
2690                            return Err(AdapterError::Unsupported(
2691                                "calls to mz_now in write statements",
2692                            ));
2693                        }
2694                    }
2695                    match entry.item().typ() {
2696                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2697                            ids_to_check.extend(entry.uses());
2698                            let valid_id = id.is_user() || matches!(typ, Func);
2699                            valid_id
2700                        }
2701                        Source | Secret | Connection => false,
2702                        // Cannot select from sinks or indexes.
2703                        Sink | Index => unreachable!(),
2704                        Table => {
2705                            if !id.is_user() {
2706                                // We can't read from non-user tables
2707                                false
2708                            } else {
2709                                // We can't read from tables that are source-exports
2710                                entry.source_export_details().is_none()
2711                            }
2712                        }
2713                        Type => true,
2714                    }
2715                }
2716                None => false,
2717            };
2718            if !valid {
2719                return Err(AdapterError::InvalidTableMutationSelection);
2720            }
2721            for id in ids_to_check {
2722                validate_read_dependencies(catalog, &id)?;
2723            }
2724            Ok(())
2725        }
2726
2727        for gid in selection.depends_on() {
2728            let item_id = self.catalog().resolve_item_id(&gid);
2729            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2730                ctx.retire(Err(err));
2731                return;
2732            }
2733        }
2734
2735        let (peek_tx, peek_rx) = oneshot::channel();
2736        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2737        let (tx, _, session, extra) = ctx.into_parts();
2738        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2739        // execution context, because this peek does not directly correspond to an execute,
2740        // and so we don't need to take any action on its retirement.
2741        // TODO[btv]: we might consider extending statement logging to log the inner
2742        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2743        // and mint a new "real" execution context here. We'd also have to add some logic to
2744        // make sure such "sub-statements" are always sampled when the top-level statement is
2745        //
2746        // It's debatable whether this makes sense conceptually,
2747        // because the inner fragment here is not actually a
2748        // "statement" in its own right.
2749        let peek_ctx = ExecuteContext::from_parts(
2750            peek_client_tx,
2751            self.internal_cmd_tx.clone(),
2752            session,
2753            Default::default(),
2754        );
2755
2756        self.sequence_peek(
2757            peek_ctx,
2758            plan::SelectPlan {
2759                select: None,
2760                source: selection,
2761                when: QueryWhen::FreshestTableWrite,
2762                finishing,
2763                copy_to: None,
2764            },
2765            TargetCluster::Active,
2766            None,
2767        )
2768        .await;
2769
2770        let internal_cmd_tx = self.internal_cmd_tx.clone();
2771        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2772        let catalog = self.owned_catalog();
2773        let max_result_size = self.catalog().system_config().max_result_size();
2774
2775        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2776            let (peek_response, session) = match peek_rx.await {
2777                Ok(Response {
2778                    result: Ok(resp),
2779                    session,
2780                    otel_ctx,
2781                }) => {
2782                    otel_ctx.attach_as_parent();
2783                    (resp, session)
2784                }
2785                Ok(Response {
2786                    result: Err(e),
2787                    session,
2788                    otel_ctx,
2789                }) => {
2790                    let ctx =
2791                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2792                    otel_ctx.attach_as_parent();
2793                    ctx.retire(Err(e));
2794                    return;
2795                }
2796                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2797                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2798            };
2799            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2800            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2801
2802            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2803            if timeout_dur == Duration::ZERO {
2804                timeout_dur = Duration::MAX;
2805            }
2806
2807            let style = ExprPrepOneShot {
2808                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2809                session: ctx.session(),
2810                catalog_state: catalog.state(),
2811            };
2812            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2813                return_if_err!(style.prep_scalar_expr(expr), ctx);
2814            }
2815
2816            let make_diffs =
2817                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2818                    let arena = RowArena::new();
2819                    let mut diffs = Vec::new();
2820                    let mut datum_vec = mz_repr::DatumVec::new();
2821
2822                    while let Some(row) = rows.next() {
2823                        if !assignments.is_empty() {
2824                            assert!(
2825                                matches!(kind, MutationKind::Update),
2826                                "only updates support assignments"
2827                            );
2828                            let mut datums = datum_vec.borrow_with(row);
2829                            let mut updates = vec![];
2830                            for (idx, expr) in &assignments {
2831                                let updated = match expr.eval(&datums, &arena) {
2832                                    Ok(updated) => updated,
2833                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2834                                };
2835                                updates.push((*idx, updated));
2836                            }
2837                            for (idx, new_value) in updates {
2838                                datums[idx] = new_value;
2839                            }
2840                            let updated = Row::pack_slice(&datums);
2841                            diffs.push((updated, Diff::ONE));
2842                        }
2843                        match kind {
2844                            // Updates and deletes always remove the
2845                            // current row. Updates will also add an
2846                            // updated value.
2847                            MutationKind::Update | MutationKind::Delete => {
2848                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2849                            }
2850                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2851                        }
2852                    }
2853
2854                    // Sum of all the rows' byte size, for checking if we go
2855                    // above the max_result_size threshold.
2856                    let mut byte_size: u64 = 0;
2857                    for (row, diff) in &diffs {
2858                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2859                        if diff.is_positive() {
2860                            for (idx, datum) in row.iter().enumerate() {
2861                                desc.constraints_met(idx, &datum)?;
2862                            }
2863                        }
2864                    }
2865                    Ok((diffs, byte_size))
2866                };
2867
2868            let diffs = match peek_response {
2869                ExecuteResponse::SendingRowsStreaming {
2870                    rows: mut rows_stream,
2871                    ..
2872                } => {
2873                    let mut byte_size: u64 = 0;
2874                    let mut diffs = Vec::new();
2875                    let result = loop {
2876                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2877                            Ok(Some(res)) => match res {
2878                                PeekResponseUnary::Rows(new_rows) => {
2879                                    match make_diffs(new_rows) {
2880                                        Ok((mut new_diffs, new_byte_size)) => {
2881                                            byte_size = byte_size.saturating_add(new_byte_size);
2882                                            if byte_size > max_result_size {
2883                                                break Err(AdapterError::ResultSize(format!(
2884                                                    "result exceeds max size of {max_result_size}"
2885                                                )));
2886                                            }
2887                                            diffs.append(&mut new_diffs)
2888                                        }
2889                                        Err(e) => break Err(e),
2890                                    };
2891                                }
2892                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2893                                PeekResponseUnary::Error(e) => {
2894                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2895                                }
2896                            },
2897                            Ok(None) => break Ok(diffs),
2898                            Err(_) => {
2899                                // We timed out, so remove the pending peek. This is
2900                                // best-effort and doesn't guarantee we won't
2901                                // receive a response.
2902                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2903                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2904                                    conn_id: ctx.session().conn_id().clone(),
2905                                });
2906                                if let Err(e) = result {
2907                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2908                                }
2909                                break Err(AdapterError::StatementTimeout);
2910                            }
2911                        }
2912                    };
2913
2914                    result
2915                }
2916                ExecuteResponse::SendingRowsImmediate { rows } => {
2917                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2918                }
2919                resp => Err(AdapterError::Unstructured(anyhow!(
2920                    "unexpected peek response: {resp:?}"
2921                ))),
2922            };
2923
2924            let mut returning_rows = Vec::new();
2925            let mut diff_err: Option<AdapterError> = None;
2926            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2927                let arena = RowArena::new();
2928                for (row, diff) in diffs {
2929                    if !diff.is_positive() {
2930                        continue;
2931                    }
2932                    let mut returning_row = Row::with_capacity(returning.len());
2933                    let mut packer = returning_row.packer();
2934                    for expr in &returning {
2935                        let datums: Vec<_> = row.iter().collect();
2936                        match expr.eval(&datums, &arena) {
2937                            Ok(datum) => {
2938                                packer.push(datum);
2939                            }
2940                            Err(err) => {
2941                                diff_err = Some(err.into());
2942                                break;
2943                            }
2944                        }
2945                    }
2946                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2947                    let diff = match NonZeroUsize::try_from(diff) {
2948                        Ok(diff) => diff,
2949                        Err(err) => {
2950                            diff_err = Some(err.into());
2951                            break;
2952                        }
2953                    };
2954                    returning_rows.push((returning_row, diff));
2955                    if diff_err.is_some() {
2956                        break;
2957                    }
2958                }
2959            }
2960            let diffs = if let Some(err) = diff_err {
2961                Err(err)
2962            } else {
2963                diffs
2964            };
2965
2966            // We need to clear out the timestamp context so the write doesn't fail due to a
2967            // read only transaction.
2968            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2969            // No matter what isolation level the client is using, we must linearize this
2970            // read. The write will be performed right after this, as part of a single
2971            // transaction, so the write must have a timestamp greater than or equal to the
2972            // read.
2973            //
2974            // Note: It's only OK for the write to have a greater timestamp than the read
2975            // because the write lock prevents any other writes from happening in between
2976            // the read and write.
2977            if let Some(timestamp_context) = timestamp_context {
2978                let (tx, rx) = tokio::sync::oneshot::channel();
2979                let conn_id = ctx.session().conn_id().clone();
2980                let pending_read_txn = PendingReadTxn {
2981                    txn: PendingRead::ReadThenWrite { ctx, tx },
2982                    timestamp_context,
2983                    created: Instant::now(),
2984                    num_requeues: 0,
2985                    otel_ctx: OpenTelemetryContext::obtain(),
2986                };
2987                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2988                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
2989                if let Err(e) = result {
2990                    warn!(
2991                        "strict_serializable_reads_tx dropped before we could send: {:?}",
2992                        e
2993                    );
2994                    return;
2995                }
2996                let result = rx.await;
2997                // It is not an error for these results to be ready after `tx` has been dropped.
2998                ctx = match result {
2999                    Ok(Some(ctx)) => ctx,
3000                    Ok(None) => {
3001                        // Coordinator took our context and will handle responding to the client.
3002                        // This usually indicates that our transaction was aborted.
3003                        return;
3004                    }
3005                    Err(e) => {
3006                        warn!(
3007                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3008                            e
3009                        );
3010                        return;
3011                    }
3012                };
3013            }
3014
3015            match diffs {
3016                Ok(diffs) => {
3017                    let result = Self::send_diffs(
3018                        ctx.session_mut(),
3019                        plan::SendDiffsPlan {
3020                            id,
3021                            updates: diffs,
3022                            kind,
3023                            returning: returning_rows,
3024                            max_result_size,
3025                        },
3026                    );
3027                    ctx.retire(result);
3028                }
3029                Err(e) => {
3030                    ctx.retire(Err(e));
3031                }
3032            }
3033        });
3034    }
3035
3036    #[instrument]
3037    pub(super) async fn sequence_alter_item_rename(
3038        &mut self,
3039        ctx: &mut ExecuteContext,
3040        plan: plan::AlterItemRenamePlan,
3041    ) -> Result<ExecuteResponse, AdapterError> {
3042        let op = catalog::Op::RenameItem {
3043            id: plan.id,
3044            current_full_name: plan.current_full_name,
3045            to_name: plan.to_name,
3046        };
3047        match self
3048            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3049            .await
3050        {
3051            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3052            Err(err) => Err(err),
3053        }
3054    }
3055
3056    #[instrument]
3057    pub(super) async fn sequence_alter_retain_history(
3058        &mut self,
3059        ctx: &mut ExecuteContext,
3060        plan: plan::AlterRetainHistoryPlan,
3061    ) -> Result<ExecuteResponse, AdapterError> {
3062        let ops = vec![catalog::Op::AlterRetainHistory {
3063            id: plan.id,
3064            value: plan.value,
3065            window: plan.window,
3066        }];
3067        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3068            Box::pin(async move {
3069                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3070                let cluster = match catalog_item {
3071                    CatalogItem::Table(_)
3072                    | CatalogItem::MaterializedView(_)
3073                    | CatalogItem::Source(_)
3074                    | CatalogItem::ContinualTask(_) => None,
3075                    CatalogItem::Index(index) => Some(index.cluster_id),
3076                    CatalogItem::Log(_)
3077                    | CatalogItem::View(_)
3078                    | CatalogItem::Sink(_)
3079                    | CatalogItem::Type(_)
3080                    | CatalogItem::Func(_)
3081                    | CatalogItem::Secret(_)
3082                    | CatalogItem::Connection(_) => unreachable!(),
3083                };
3084                match cluster {
3085                    Some(cluster) => {
3086                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3087                    }
3088                    None => {
3089                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3090                    }
3091                }
3092            })
3093        })
3094        .await?;
3095        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3096    }
3097
3098    #[instrument]
3099    pub(super) async fn sequence_alter_schema_rename(
3100        &mut self,
3101        ctx: &mut ExecuteContext,
3102        plan: plan::AlterSchemaRenamePlan,
3103    ) -> Result<ExecuteResponse, AdapterError> {
3104        let (database_spec, schema_spec) = plan.cur_schema_spec;
3105        let op = catalog::Op::RenameSchema {
3106            database_spec,
3107            schema_spec,
3108            new_name: plan.new_schema_name,
3109            check_reserved_names: true,
3110        };
3111        match self
3112            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3113            .await
3114        {
3115            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3116            Err(err) => Err(err),
3117        }
3118    }
3119
3120    #[instrument]
3121    pub(super) async fn sequence_alter_schema_swap(
3122        &mut self,
3123        ctx: &mut ExecuteContext,
3124        plan: plan::AlterSchemaSwapPlan,
3125    ) -> Result<ExecuteResponse, AdapterError> {
3126        let plan::AlterSchemaSwapPlan {
3127            schema_a_spec: (schema_a_db, schema_a),
3128            schema_a_name,
3129            schema_b_spec: (schema_b_db, schema_b),
3130            schema_b_name,
3131            name_temp,
3132        } = plan;
3133
3134        let op_a = catalog::Op::RenameSchema {
3135            database_spec: schema_a_db,
3136            schema_spec: schema_a,
3137            new_name: name_temp,
3138            check_reserved_names: false,
3139        };
3140        let op_b = catalog::Op::RenameSchema {
3141            database_spec: schema_b_db,
3142            schema_spec: schema_b,
3143            new_name: schema_a_name,
3144            check_reserved_names: false,
3145        };
3146        let op_c = catalog::Op::RenameSchema {
3147            database_spec: schema_a_db,
3148            schema_spec: schema_a,
3149            new_name: schema_b_name,
3150            check_reserved_names: false,
3151        };
3152
3153        match self
3154            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3155                Box::pin(async {})
3156            })
3157            .await
3158        {
3159            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3160            Err(err) => Err(err),
3161        }
3162    }
3163
3164    #[instrument]
3165    pub(super) async fn sequence_alter_role(
3166        &mut self,
3167        session: &Session,
3168        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3169    ) -> Result<ExecuteResponse, AdapterError> {
3170        let catalog = self.catalog().for_session(session);
3171        let role = catalog.get_role(&id);
3172
3173        // We'll send these notices to the user, if the operation is successful.
3174        let mut notices = vec![];
3175
3176        // Get the attributes and variables from the role, as they currently are.
3177        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3178        let mut vars = role.vars().clone();
3179
3180        // Whether to set the password to NULL. This is a special case since the existing
3181        // password is not stored in the role attributes.
3182        let mut nopassword = false;
3183
3184        // Apply our updates.
3185        match option {
3186            PlannedAlterRoleOption::Attributes(attrs) => {
3187                self.validate_role_attributes(&attrs.clone().into())?;
3188
3189                if let Some(inherit) = attrs.inherit {
3190                    attributes.inherit = inherit;
3191                }
3192
3193                if let Some(password) = attrs.password {
3194                    attributes.password = Some(password);
3195                    attributes.scram_iterations =
3196                        Some(self.catalog().system_config().scram_iterations())
3197                }
3198
3199                if let Some(superuser) = attrs.superuser {
3200                    attributes.superuser = Some(superuser);
3201                }
3202
3203                if let Some(login) = attrs.login {
3204                    attributes.login = Some(login);
3205                }
3206
3207                if attrs.nopassword.unwrap_or(false) {
3208                    nopassword = true;
3209                }
3210
3211                if let Some(notice) = self.should_emit_rbac_notice(session) {
3212                    notices.push(notice);
3213                }
3214            }
3215            PlannedAlterRoleOption::Variable(variable) => {
3216                // Get the variable to make sure it's valid and visible.
3217                let session_var = session.vars().inspect(variable.name())?;
3218                // Return early if it's not visible.
3219                session_var.visible(session.user(), catalog.system_vars())?;
3220
3221                // Emit a warning when deprecated variables are used.
3222                // TODO(database-issues#8069) remove this after sufficient time has passed
3223                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3224                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3225                } else if let PlannedRoleVariable::Set {
3226                    name,
3227                    value: VariableValue::Values(vals),
3228                } = &variable
3229                {
3230                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3231                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3232                    }
3233                }
3234
3235                let var_name = match variable {
3236                    PlannedRoleVariable::Set { name, value } => {
3237                        // Update our persisted set.
3238                        match &value {
3239                            VariableValue::Default => {
3240                                vars.remove(&name);
3241                            }
3242                            VariableValue::Values(vals) => {
3243                                let var = match &vals[..] {
3244                                    [val] => OwnedVarInput::Flat(val.clone()),
3245                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3246                                };
3247                                // Make sure the input is valid.
3248                                session_var.check(var.borrow())?;
3249
3250                                vars.insert(name.clone(), var);
3251                            }
3252                        };
3253                        name
3254                    }
3255                    PlannedRoleVariable::Reset { name } => {
3256                        // Remove it from our persisted values.
3257                        vars.remove(&name);
3258                        name
3259                    }
3260                };
3261
3262                // Emit a notice that they need to reconnect to see the change take effect.
3263                notices.push(AdapterNotice::VarDefaultUpdated {
3264                    role: Some(name.clone()),
3265                    var_name: Some(var_name),
3266                });
3267            }
3268        }
3269
3270        let op = catalog::Op::AlterRole {
3271            id,
3272            name,
3273            attributes,
3274            nopassword,
3275            vars: RoleVars { map: vars },
3276        };
3277        let response = self
3278            .catalog_transact(Some(session), vec![op])
3279            .await
3280            .map(|_| ExecuteResponse::AlteredRole)?;
3281
3282        // Send all of our queued notices.
3283        session.add_notices(notices);
3284
3285        Ok(response)
3286    }
3287
3288    #[instrument]
3289    pub(super) async fn sequence_alter_sink_prepare(
3290        &mut self,
3291        ctx: ExecuteContext,
3292        plan: plan::AlterSinkPlan,
3293    ) {
3294        // Put a read hold on the new relation
3295        let id_bundle = crate::CollectionIdBundle {
3296            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3297            compute_ids: BTreeMap::new(),
3298        };
3299        let read_hold = self.acquire_read_holds(&id_bundle);
3300
3301        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3302            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3303            return;
3304        };
3305
3306        let otel_ctx = OpenTelemetryContext::obtain();
3307        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3308
3309        let plan_validity = PlanValidity::new(
3310            self.catalog().transient_revision(),
3311            BTreeSet::from_iter([plan.item_id, from_item_id]),
3312            Some(plan.in_cluster),
3313            None,
3314            ctx.session().role_metadata().clone(),
3315        );
3316
3317        info!(
3318            "preparing alter sink for {}: frontiers={:?} export={:?}",
3319            plan.global_id,
3320            self.controller
3321                .storage_collections
3322                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3323            self.controller.storage.export(plan.global_id)
3324        );
3325
3326        // Now we must wait for the sink to make enough progress such that there is overlap between
3327        // the new `from` collection's read hold and the sink's write frontier.
3328        //
3329        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3330        // the watch set never completes and neither does the `ALTER SINK` command.
3331        self.install_storage_watch_set(
3332            ctx.session().conn_id().clone(),
3333            BTreeSet::from_iter([plan.global_id]),
3334            read_ts,
3335            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3336                ctx: Some(ctx),
3337                otel_ctx,
3338                plan,
3339                plan_validity,
3340                read_hold,
3341            }),
3342        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3343    }
3344
3345    #[instrument]
3346    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3347        ctx.otel_ctx.attach_as_parent();
3348
3349        let plan::AlterSinkPlan {
3350            item_id,
3351            global_id,
3352            sink: sink_plan,
3353            with_snapshot,
3354            in_cluster,
3355        } = ctx.plan.clone();
3356
3357        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3358        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3359        // arbitrarily changed since we performed planning, and we must re-assert that it still
3360        // matches our requirements.
3361        //
3362        // The `PlanValidity` check ensures that both the sink and the new source relation still
3363        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3364        // time, which we do by comparing the catalog sink version to the one in the plan.
3365        match ctx.plan_validity.check(self.catalog()) {
3366            Ok(()) => {}
3367            Err(err) => {
3368                ctx.retire(Err(err));
3369                return;
3370            }
3371        }
3372
3373        let entry = self.catalog().get_entry(&item_id);
3374        let CatalogItem::Sink(old_sink) = entry.item() else {
3375            panic!("invalid item kind for `AlterSinkPlan`");
3376        };
3377
3378        if sink_plan.version != old_sink.version + 1 {
3379            ctx.retire(Err(AdapterError::ChangedPlan(
3380                "sink was altered concurrently".into(),
3381            )));
3382            return;
3383        }
3384
3385        info!(
3386            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3387            self.controller
3388                .storage_collections
3389                .collections_frontiers(vec![global_id, sink_plan.from]),
3390            self.controller.storage.export(global_id),
3391        );
3392
3393        // Assert that we can recover the updates that happened at the timestamps of the write
3394        // frontier. This must be true in this call.
3395        let write_frontier = &self
3396            .controller
3397            .storage
3398            .export(global_id)
3399            .expect("sink known to exist")
3400            .write_frontier;
3401        let as_of = ctx.read_hold.least_valid_read();
3402        assert!(
3403            write_frontier.iter().all(|t| as_of.less_than(t)),
3404            "{:?} should be strictly less than {:?}",
3405            &*as_of,
3406            &**write_frontier
3407        );
3408
3409        // Parse the `create_sql` so we can update it to the new sink definition.
3410        //
3411        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3412        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3413        // names in the `create_sql` may have changed, for example due to a schema swap.
3414        let create_sql = &old_sink.create_sql;
3415        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3416        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3417            unreachable!("invalid statement kind for sink");
3418        };
3419
3420        // Update the sink version.
3421        stmt.with_options
3422            .retain(|o| o.name != CreateSinkOptionName::Version);
3423        stmt.with_options.push(CreateSinkOption {
3424            name: CreateSinkOptionName::Version,
3425            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3426                sink_plan.version.to_string(),
3427            ))),
3428        });
3429
3430        let conn_catalog = self.catalog().for_system_session();
3431        let (mut stmt, resolved_ids) =
3432            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3433
3434        // Update the `from` relation.
3435        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3436        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3437        stmt.from = ResolvedItemName::Item {
3438            id: from_entry.id(),
3439            qualifiers: from_entry.name.qualifiers.clone(),
3440            full_name,
3441            print_id: true,
3442            version: from_entry.version,
3443        };
3444
3445        let new_sink = Sink {
3446            create_sql: stmt.to_ast_string_stable(),
3447            global_id,
3448            from: sink_plan.from,
3449            connection: sink_plan.connection.clone(),
3450            envelope: sink_plan.envelope,
3451            version: sink_plan.version,
3452            with_snapshot,
3453            resolved_ids: resolved_ids.clone(),
3454            cluster_id: in_cluster,
3455            commit_interval: sink_plan.commit_interval,
3456        };
3457
3458        let ops = vec![catalog::Op::UpdateItem {
3459            id: item_id,
3460            name: entry.name().clone(),
3461            to_item: CatalogItem::Sink(new_sink),
3462        }];
3463
3464        match self
3465            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3466            .await
3467        {
3468            Ok(()) => {}
3469            Err(err) => {
3470                ctx.retire(Err(err));
3471                return;
3472            }
3473        }
3474
3475        let storage_sink_desc = StorageSinkDesc {
3476            from: sink_plan.from,
3477            from_desc: from_entry
3478                .relation_desc()
3479                .expect("sinks can only be built on items with descs")
3480                .into_owned(),
3481            connection: sink_plan
3482                .connection
3483                .clone()
3484                .into_inline_connection(self.catalog().state()),
3485            envelope: sink_plan.envelope,
3486            as_of,
3487            with_snapshot,
3488            version: sink_plan.version,
3489            from_storage_metadata: (),
3490            to_storage_metadata: (),
3491            commit_interval: sink_plan.commit_interval,
3492        };
3493
3494        self.controller
3495            .storage
3496            .alter_export(
3497                global_id,
3498                ExportDescription {
3499                    sink: storage_sink_desc,
3500                    instance_id: in_cluster,
3501                },
3502            )
3503            .await
3504            .unwrap_or_terminate("cannot fail to alter source desc");
3505
3506        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3507    }
3508
3509    #[instrument]
3510    pub(super) async fn sequence_alter_connection(
3511        &mut self,
3512        ctx: ExecuteContext,
3513        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3514    ) {
3515        match action {
3516            AlterConnectionAction::RotateKeys => {
3517                self.sequence_rotate_keys(ctx, id).await;
3518            }
3519            AlterConnectionAction::AlterOptions {
3520                set_options,
3521                drop_options,
3522                validate,
3523            } => {
3524                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3525                    .await
3526            }
3527        }
3528    }
3529
3530    #[instrument]
3531    async fn sequence_alter_connection_options(
3532        &mut self,
3533        mut ctx: ExecuteContext,
3534        id: CatalogItemId,
3535        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3536        drop_options: BTreeSet<ConnectionOptionName>,
3537        validate: bool,
3538    ) {
3539        let cur_entry = self.catalog().get_entry(&id);
3540        let cur_conn = cur_entry.connection().expect("known to be connection");
3541        let connection_gid = cur_conn.global_id();
3542
3543        let inner = || -> Result<Connection, AdapterError> {
3544            // Parse statement.
3545            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3546                .expect("invalid create sql persisted to catalog")
3547                .into_element()
3548                .ast
3549            {
3550                Statement::CreateConnection(stmt) => stmt,
3551                _ => unreachable!("proved type is source"),
3552            };
3553
3554            let catalog = self.catalog().for_system_session();
3555
3556            // Resolve items in statement
3557            let (mut create_conn_stmt, resolved_ids) =
3558                mz_sql::names::resolve(&catalog, create_conn_stmt)
3559                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3560
3561            // Retain options that are neither set nor dropped.
3562            create_conn_stmt
3563                .values
3564                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3565
3566            // Set new values
3567            create_conn_stmt.values.extend(
3568                set_options
3569                    .into_iter()
3570                    .map(|(name, value)| ConnectionOption { name, value }),
3571            );
3572
3573            // Open a new catalog, which we will use to re-plan our
3574            // statement with the desired config.
3575            let mut catalog = self.catalog().for_system_session();
3576            catalog.mark_id_unresolvable_for_replanning(id);
3577
3578            // Re-define our source in terms of the amended statement
3579            let plan = match mz_sql::plan::plan(
3580                None,
3581                &catalog,
3582                Statement::CreateConnection(create_conn_stmt),
3583                &Params::empty(),
3584                &resolved_ids,
3585            )
3586            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3587            {
3588                Plan::CreateConnection(plan) => plan,
3589                _ => unreachable!("create source plan is only valid response"),
3590            };
3591
3592            // Parse statement.
3593            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3594                .expect("invalid create sql persisted to catalog")
3595                .into_element()
3596                .ast
3597            {
3598                Statement::CreateConnection(stmt) => stmt,
3599                _ => unreachable!("proved type is source"),
3600            };
3601
3602            let catalog = self.catalog().for_system_session();
3603
3604            // Resolve items in statement
3605            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3606                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3607
3608            Ok(Connection {
3609                create_sql: plan.connection.create_sql,
3610                global_id: cur_conn.global_id,
3611                details: plan.connection.details,
3612                resolved_ids: new_deps,
3613            })
3614        };
3615
3616        let conn = match inner() {
3617            Ok(conn) => conn,
3618            Err(e) => {
3619                return ctx.retire(Err(e));
3620            }
3621        };
3622
3623        if validate {
3624            let connection = conn
3625                .details
3626                .to_connection()
3627                .into_inline_connection(self.catalog().state());
3628
3629            let internal_cmd_tx = self.internal_cmd_tx.clone();
3630            let transient_revision = self.catalog().transient_revision();
3631            let conn_id = ctx.session().conn_id().clone();
3632            let otel_ctx = OpenTelemetryContext::obtain();
3633            let role_metadata = ctx.session().role_metadata().clone();
3634            let current_storage_parameters = self.controller.storage.config().clone();
3635
3636            task::spawn(
3637                || format!("validate_alter_connection:{conn_id}"),
3638                async move {
3639                    let resolved_ids = conn.resolved_ids.clone();
3640                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3641                    let result = match connection.validate(id, &current_storage_parameters).await {
3642                        Ok(()) => Ok(conn),
3643                        Err(err) => Err(err.into()),
3644                    };
3645
3646                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3647                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3648                        AlterConnectionValidationReady {
3649                            ctx,
3650                            result,
3651                            connection_id: id,
3652                            connection_gid,
3653                            plan_validity: PlanValidity::new(
3654                                transient_revision,
3655                                dependency_ids.clone(),
3656                                None,
3657                                None,
3658                                role_metadata,
3659                            ),
3660                            otel_ctx,
3661                            resolved_ids,
3662                        },
3663                    ));
3664                    if let Err(e) = result {
3665                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3666                    }
3667                },
3668            );
3669        } else {
3670            let result = self
3671                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3672                .await;
3673            ctx.retire(result);
3674        }
3675    }
3676
3677    #[instrument]
3678    pub(crate) async fn sequence_alter_connection_stage_finish(
3679        &mut self,
3680        session: &Session,
3681        id: CatalogItemId,
3682        connection: Connection,
3683    ) -> Result<ExecuteResponse, AdapterError> {
3684        match self.catalog.get_entry(&id).item() {
3685            CatalogItem::Connection(curr_conn) => {
3686                curr_conn
3687                    .details
3688                    .to_connection()
3689                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3690                    .map_err(StorageError::from)?;
3691            }
3692            _ => unreachable!("known to be a connection"),
3693        };
3694
3695        let ops = vec![catalog::Op::UpdateItem {
3696            id,
3697            name: self.catalog.get_entry(&id).name().clone(),
3698            to_item: CatalogItem::Connection(connection.clone()),
3699        }];
3700
3701        self.catalog_transact(Some(session), ops).await?;
3702
3703        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3704        // and propagating connection changes to dependent sources, sinks, and
3705        // tables) is handled in `apply_catalog_implications` via
3706        // `handle_alter_connection`. The catalog transact above triggers that
3707        // code path.
3708
3709        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3710    }
3711
3712    #[instrument]
3713    pub(super) async fn sequence_alter_source(
3714        &mut self,
3715        session: &Session,
3716        plan::AlterSourcePlan {
3717            item_id,
3718            ingestion_id,
3719            action,
3720        }: plan::AlterSourcePlan,
3721    ) -> Result<ExecuteResponse, AdapterError> {
3722        let cur_entry = self.catalog().get_entry(&item_id);
3723        let cur_source = cur_entry.source().expect("known to be source");
3724
3725        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3726            // Parse statement.
3727            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3728                .expect("invalid create sql persisted to catalog")
3729                .into_element()
3730                .ast
3731            {
3732                Statement::CreateSource(stmt) => stmt,
3733                _ => unreachable!("proved type is source"),
3734            };
3735
3736            let catalog = coord.catalog().for_system_session();
3737
3738            // Resolve items in statement
3739            mz_sql::names::resolve(&catalog, create_source_stmt)
3740                .map_err(|e| AdapterError::internal(err_cx, e))
3741        };
3742
3743        match action {
3744            plan::AlterSourceAction::AddSubsourceExports {
3745                subsources,
3746                options,
3747            } => {
3748                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3749
3750                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3751                    text_columns: mut new_text_columns,
3752                    exclude_columns: mut new_exclude_columns,
3753                    ..
3754                } = options.try_into()?;
3755
3756                // Resolve items in statement
3757                let (mut create_source_stmt, resolved_ids) =
3758                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3759
3760                // Get all currently referred-to items
3761                let catalog = self.catalog();
3762                let curr_references: BTreeSet<_> = catalog
3763                    .get_entry(&item_id)
3764                    .used_by()
3765                    .into_iter()
3766                    .filter_map(|subsource| {
3767                        catalog
3768                            .get_entry(subsource)
3769                            .subsource_details()
3770                            .map(|(_id, reference, _details)| reference)
3771                    })
3772                    .collect();
3773
3774                // We are doing a lot of unwrapping, so just make an error to reference; all of
3775                // these invariants are guaranteed to be true because of how we plan subsources.
3776                let purification_err =
3777                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3778
3779                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3780                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3781                // statement.
3782                match &mut create_source_stmt.connection {
3783                    CreateSourceConnection::Postgres {
3784                        options: curr_options,
3785                        ..
3786                    } => {
3787                        let mz_sql::plan::PgConfigOptionExtracted {
3788                            mut text_columns, ..
3789                        } = curr_options.clone().try_into()?;
3790
3791                        // Drop text columns; we will add them back in
3792                        // as appropriate below.
3793                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3794
3795                        // Drop all text columns that are not currently referred to.
3796                        text_columns.retain(|column_qualified_reference| {
3797                            mz_ore::soft_assert_eq_or_log!(
3798                                column_qualified_reference.0.len(),
3799                                4,
3800                                "all TEXT COLUMNS values must be column-qualified references"
3801                            );
3802                            let mut table = column_qualified_reference.clone();
3803                            table.0.truncate(3);
3804                            curr_references.contains(&table)
3805                        });
3806
3807                        // Merge the current text columns into the new text columns.
3808                        new_text_columns.extend(text_columns);
3809
3810                        // If we have text columns, add them to the options.
3811                        if !new_text_columns.is_empty() {
3812                            new_text_columns.sort();
3813                            let new_text_columns = new_text_columns
3814                                .into_iter()
3815                                .map(WithOptionValue::UnresolvedItemName)
3816                                .collect();
3817
3818                            curr_options.push(PgConfigOption {
3819                                name: PgConfigOptionName::TextColumns,
3820                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3821                            });
3822                        }
3823                    }
3824                    CreateSourceConnection::MySql {
3825                        options: curr_options,
3826                        ..
3827                    } => {
3828                        let mz_sql::plan::MySqlConfigOptionExtracted {
3829                            mut text_columns,
3830                            mut exclude_columns,
3831                            ..
3832                        } = curr_options.clone().try_into()?;
3833
3834                        // Drop both ignore and text columns; we will add them back in
3835                        // as appropriate below.
3836                        curr_options.retain(|o| {
3837                            !matches!(
3838                                o.name,
3839                                MySqlConfigOptionName::TextColumns
3840                                    | MySqlConfigOptionName::ExcludeColumns
3841                            )
3842                        });
3843
3844                        // Drop all text / exclude columns that are not currently referred to.
3845                        let column_referenced =
3846                            |column_qualified_reference: &UnresolvedItemName| {
3847                                mz_ore::soft_assert_eq_or_log!(
3848                                    column_qualified_reference.0.len(),
3849                                    3,
3850                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3851                                );
3852                                let mut table = column_qualified_reference.clone();
3853                                table.0.truncate(2);
3854                                curr_references.contains(&table)
3855                            };
3856                        text_columns.retain(column_referenced);
3857                        exclude_columns.retain(column_referenced);
3858
3859                        // Merge the current text / exclude columns into the new text / exclude columns.
3860                        new_text_columns.extend(text_columns);
3861                        new_exclude_columns.extend(exclude_columns);
3862
3863                        // If we have text columns, add them to the options.
3864                        if !new_text_columns.is_empty() {
3865                            new_text_columns.sort();
3866                            let new_text_columns = new_text_columns
3867                                .into_iter()
3868                                .map(WithOptionValue::UnresolvedItemName)
3869                                .collect();
3870
3871                            curr_options.push(MySqlConfigOption {
3872                                name: MySqlConfigOptionName::TextColumns,
3873                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3874                            });
3875                        }
3876                        // If we have exclude columns, add them to the options.
3877                        if !new_exclude_columns.is_empty() {
3878                            new_exclude_columns.sort();
3879                            let new_exclude_columns = new_exclude_columns
3880                                .into_iter()
3881                                .map(WithOptionValue::UnresolvedItemName)
3882                                .collect();
3883
3884                            curr_options.push(MySqlConfigOption {
3885                                name: MySqlConfigOptionName::ExcludeColumns,
3886                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3887                            });
3888                        }
3889                    }
3890                    CreateSourceConnection::SqlServer {
3891                        options: curr_options,
3892                        ..
3893                    } => {
3894                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3895                            mut text_columns,
3896                            mut exclude_columns,
3897                            ..
3898                        } = curr_options.clone().try_into()?;
3899
3900                        // Drop both ignore and text columns; we will add them back in
3901                        // as appropriate below.
3902                        curr_options.retain(|o| {
3903                            !matches!(
3904                                o.name,
3905                                SqlServerConfigOptionName::TextColumns
3906                                    | SqlServerConfigOptionName::ExcludeColumns
3907                            )
3908                        });
3909
3910                        // Drop all text / exclude columns that are not currently referred to.
3911                        let column_referenced =
3912                            |column_qualified_reference: &UnresolvedItemName| {
3913                                mz_ore::soft_assert_eq_or_log!(
3914                                    column_qualified_reference.0.len(),
3915                                    3,
3916                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3917                                );
3918                                let mut table = column_qualified_reference.clone();
3919                                table.0.truncate(2);
3920                                curr_references.contains(&table)
3921                            };
3922                        text_columns.retain(column_referenced);
3923                        exclude_columns.retain(column_referenced);
3924
3925                        // Merge the current text / exclude columns into the new text / exclude columns.
3926                        new_text_columns.extend(text_columns);
3927                        new_exclude_columns.extend(exclude_columns);
3928
3929                        // If we have text columns, add them to the options.
3930                        if !new_text_columns.is_empty() {
3931                            new_text_columns.sort();
3932                            let new_text_columns = new_text_columns
3933                                .into_iter()
3934                                .map(WithOptionValue::UnresolvedItemName)
3935                                .collect();
3936
3937                            curr_options.push(SqlServerConfigOption {
3938                                name: SqlServerConfigOptionName::TextColumns,
3939                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3940                            });
3941                        }
3942                        // If we have exclude columns, add them to the options.
3943                        if !new_exclude_columns.is_empty() {
3944                            new_exclude_columns.sort();
3945                            let new_exclude_columns = new_exclude_columns
3946                                .into_iter()
3947                                .map(WithOptionValue::UnresolvedItemName)
3948                                .collect();
3949
3950                            curr_options.push(SqlServerConfigOption {
3951                                name: SqlServerConfigOptionName::ExcludeColumns,
3952                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3953                            });
3954                        }
3955                    }
3956                    _ => return Err(purification_err()),
3957                };
3958
3959                let mut catalog = self.catalog().for_system_session();
3960                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3961
3962                // Re-define our source in terms of the amended statement
3963                let plan = match mz_sql::plan::plan(
3964                    None,
3965                    &catalog,
3966                    Statement::CreateSource(create_source_stmt),
3967                    &Params::empty(),
3968                    &resolved_ids,
3969                )
3970                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
3971                {
3972                    Plan::CreateSource(plan) => plan,
3973                    _ => unreachable!("create source plan is only valid response"),
3974                };
3975
3976                // Asserting that we've done the right thing with dependencies
3977                // here requires mocking out objects in the catalog, which is a
3978                // large task for an operation we have to cover in tests anyway.
3979                let source = Source::new(
3980                    plan,
3981                    cur_source.global_id,
3982                    resolved_ids,
3983                    cur_source.custom_logical_compaction_window,
3984                    cur_source.is_retained_metrics_object,
3985                );
3986
3987                // Get new ingestion description for storage.
3988                let desc = match &source.data_source {
3989                    DataSourceDesc::Ingestion { desc, .. }
3990                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3991                        desc.clone().into_inline_connection(self.catalog().state())
3992                    }
3993                    _ => unreachable!("already verified of type ingestion"),
3994                };
3995
3996                self.controller
3997                    .storage
3998                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
3999                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4000
4001                // Redefine source. This must be done before we create any new
4002                // subsources so that it has the right ingestion.
4003                let mut ops = vec![catalog::Op::UpdateItem {
4004                    id: item_id,
4005                    // Look this up again so we don't have to hold an immutable reference to the
4006                    // entry for so long.
4007                    name: self.catalog.get_entry(&item_id).name().clone(),
4008                    to_item: CatalogItem::Source(source),
4009                }];
4010
4011                let CreateSourceInner {
4012                    ops: new_ops,
4013                    sources: _,
4014                    if_not_exists_ids,
4015                } = self.create_source_inner(session, subsources).await?;
4016
4017                ops.extend(new_ops.into_iter());
4018
4019                assert!(
4020                    if_not_exists_ids.is_empty(),
4021                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4022                );
4023
4024                self.catalog_transact(Some(session), ops).await?;
4025            }
4026            plan::AlterSourceAction::RefreshReferences { references } => {
4027                self.catalog_transact(
4028                    Some(session),
4029                    vec![catalog::Op::UpdateSourceReferences {
4030                        source_id: item_id,
4031                        references: references.into(),
4032                    }],
4033                )
4034                .await?;
4035            }
4036        }
4037
4038        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4039    }
4040
4041    #[instrument]
4042    pub(super) async fn sequence_alter_system_set(
4043        &mut self,
4044        session: &Session,
4045        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4046    ) -> Result<ExecuteResponse, AdapterError> {
4047        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4048        // We want to ensure that the network policy we're switching too actually exists.
4049        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4050            self.validate_alter_system_network_policy(session, &value)?;
4051        }
4052
4053        let op = match value {
4054            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4055                name: name.clone(),
4056                value: OwnedVarInput::SqlSet(values),
4057            },
4058            plan::VariableValue::Default => {
4059                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4060            }
4061        };
4062        self.catalog_transact(Some(session), vec![op]).await?;
4063
4064        session.add_notice(AdapterNotice::VarDefaultUpdated {
4065            role: None,
4066            var_name: Some(name),
4067        });
4068        Ok(ExecuteResponse::AlteredSystemConfiguration)
4069    }
4070
4071    #[instrument]
4072    pub(super) async fn sequence_alter_system_reset(
4073        &mut self,
4074        session: &Session,
4075        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4076    ) -> Result<ExecuteResponse, AdapterError> {
4077        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4078        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4079        self.catalog_transact(Some(session), vec![op]).await?;
4080        session.add_notice(AdapterNotice::VarDefaultUpdated {
4081            role: None,
4082            var_name: Some(name),
4083        });
4084        Ok(ExecuteResponse::AlteredSystemConfiguration)
4085    }
4086
4087    #[instrument]
4088    pub(super) async fn sequence_alter_system_reset_all(
4089        &mut self,
4090        session: &Session,
4091        _: plan::AlterSystemResetAllPlan,
4092    ) -> Result<ExecuteResponse, AdapterError> {
4093        self.is_user_allowed_to_alter_system(session, None)?;
4094        let op = catalog::Op::ResetAllSystemConfiguration;
4095        self.catalog_transact(Some(session), vec![op]).await?;
4096        session.add_notice(AdapterNotice::VarDefaultUpdated {
4097            role: None,
4098            var_name: None,
4099        });
4100        Ok(ExecuteResponse::AlteredSystemConfiguration)
4101    }
4102
4103    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4104    fn is_user_allowed_to_alter_system(
4105        &self,
4106        session: &Session,
4107        var_name: Option<&str>,
4108    ) -> Result<(), AdapterError> {
4109        match (session.user().kind(), var_name) {
4110            // Only internal superusers can reset all system variables.
4111            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4112            // Whether or not a variable can be modified depends if we're an internal superuser.
4113            (UserKind::Superuser, Some(name))
4114                if session.user().is_internal()
4115                    || self.catalog().system_config().user_modifiable(name) =>
4116            {
4117                // In lieu of plumbing the user to all system config functions, just check that
4118                // the var is visible.
4119                let var = self.catalog().system_config().get(name)?;
4120                var.visible(session.user(), self.catalog().system_config())?;
4121                Ok(())
4122            }
4123            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4124            // session variables.
4125            (UserKind::Regular, Some(name))
4126                if self.catalog().system_config().user_modifiable(name) =>
4127            {
4128                Err(AdapterError::Unauthorized(
4129                    rbac::UnauthorizedError::Superuser {
4130                        action: format!("toggle the '{name}' system configuration parameter"),
4131                    },
4132                ))
4133            }
4134            _ => Err(AdapterError::Unauthorized(
4135                rbac::UnauthorizedError::MzSystem {
4136                    action: "alter system".into(),
4137                },
4138            )),
4139        }
4140    }
4141
4142    fn validate_alter_system_network_policy(
4143        &self,
4144        session: &Session,
4145        policy_value: &plan::VariableValue,
4146    ) -> Result<(), AdapterError> {
4147        let policy_name = match &policy_value {
4148            // Make sure the compiled in default still exists.
4149            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4150            plan::VariableValue::Values(values) if values.len() == 1 => {
4151                values.iter().next().cloned()
4152            }
4153            plan::VariableValue::Values(values) => {
4154                tracing::warn!(?values, "can't set multiple network policies at once");
4155                None
4156            }
4157        };
4158        let maybe_network_policy = policy_name
4159            .as_ref()
4160            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4161        let Some(network_policy) = maybe_network_policy else {
4162            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4163                VarError::InvalidParameterValue {
4164                    name: NETWORK_POLICY.name(),
4165                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4166                    reason: "no network policy with such name exists".to_string(),
4167                },
4168            )));
4169        };
4170        self.validate_alter_network_policy(session, &network_policy.rules)
4171    }
4172
4173    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4174    ///
4175    /// This helps prevent users from modifying network policies in a way that would lock out their
4176    /// current connection.
4177    fn validate_alter_network_policy(
4178        &self,
4179        session: &Session,
4180        policy_rules: &Vec<NetworkPolicyRule>,
4181    ) -> Result<(), AdapterError> {
4182        // If the user is not an internal user attempt to protect them from
4183        // blocking themselves.
4184        if session.user().is_internal() {
4185            return Ok(());
4186        }
4187        if let Some(ip) = session.meta().client_ip() {
4188            validate_ip_with_policy_rules(ip, policy_rules)
4189                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4190        } else {
4191            // Sessions without IPs are only temporarily constructed for default values
4192            // they should not be permitted here.
4193            return Err(AdapterError::NetworkPolicyDenied(
4194                NetworkPolicyError::MissingIp,
4195            ));
4196        }
4197        Ok(())
4198    }
4199
4200    // Returns the name of the portal to execute.
4201    #[instrument]
4202    pub(super) fn sequence_execute(
4203        &self,
4204        session: &mut Session,
4205        plan: plan::ExecutePlan,
4206    ) -> Result<String, AdapterError> {
4207        // Verify the stmt is still valid.
4208        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4209        let ps = session
4210            .get_prepared_statement_unverified(&plan.name)
4211            .expect("known to exist");
4212        let stmt = ps.stmt().cloned();
4213        let desc = ps.desc().clone();
4214        let state_revision = ps.state_revision;
4215        let logging = Arc::clone(ps.logging());
4216        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4217    }
4218
4219    #[instrument]
4220    pub(super) async fn sequence_grant_privileges(
4221        &mut self,
4222        session: &Session,
4223        plan::GrantPrivilegesPlan {
4224            update_privileges,
4225            grantees,
4226        }: plan::GrantPrivilegesPlan,
4227    ) -> Result<ExecuteResponse, AdapterError> {
4228        self.sequence_update_privileges(
4229            session,
4230            update_privileges,
4231            grantees,
4232            UpdatePrivilegeVariant::Grant,
4233        )
4234        .await
4235    }
4236
4237    #[instrument]
4238    pub(super) async fn sequence_revoke_privileges(
4239        &mut self,
4240        session: &Session,
4241        plan::RevokePrivilegesPlan {
4242            update_privileges,
4243            revokees,
4244        }: plan::RevokePrivilegesPlan,
4245    ) -> Result<ExecuteResponse, AdapterError> {
4246        self.sequence_update_privileges(
4247            session,
4248            update_privileges,
4249            revokees,
4250            UpdatePrivilegeVariant::Revoke,
4251        )
4252        .await
4253    }
4254
4255    #[instrument]
4256    async fn sequence_update_privileges(
4257        &mut self,
4258        session: &Session,
4259        update_privileges: Vec<UpdatePrivilege>,
4260        grantees: Vec<RoleId>,
4261        variant: UpdatePrivilegeVariant,
4262    ) -> Result<ExecuteResponse, AdapterError> {
4263        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4264        let mut warnings = Vec::new();
4265        let catalog = self.catalog().for_session(session);
4266
4267        for UpdatePrivilege {
4268            acl_mode,
4269            target_id,
4270            grantor,
4271        } in update_privileges
4272        {
4273            let actual_object_type = catalog.get_system_object_type(&target_id);
4274            // For all relations we allow all applicable table privileges, but send a warning if the
4275            // privilege isn't actually applicable to the object type.
4276            if actual_object_type.is_relation() {
4277                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4278                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4279                if !non_applicable_privileges.is_empty() {
4280                    let object_description =
4281                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4282                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4283                        non_applicable_privileges,
4284                        object_description,
4285                    })
4286                }
4287            }
4288
4289            if let SystemObjectId::Object(object_id) = &target_id {
4290                self.catalog()
4291                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4292            }
4293
4294            let privileges = self
4295                .catalog()
4296                .get_privileges(&target_id, session.conn_id())
4297                // Should be unreachable since the parser will refuse to parse grant/revoke
4298                // statements on objects without privileges.
4299                .ok_or(AdapterError::Unsupported(
4300                    "GRANTs/REVOKEs on an object type with no privileges",
4301                ))?;
4302
4303            for grantee in &grantees {
4304                self.catalog().ensure_not_system_role(grantee)?;
4305                self.catalog().ensure_not_predefined_role(grantee)?;
4306                let existing_privilege = privileges
4307                    .get_acl_item(grantee, &grantor)
4308                    .map(Cow::Borrowed)
4309                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4310
4311                match variant {
4312                    UpdatePrivilegeVariant::Grant
4313                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4314                    {
4315                        ops.push(catalog::Op::UpdatePrivilege {
4316                            target_id: target_id.clone(),
4317                            privilege: MzAclItem {
4318                                grantee: *grantee,
4319                                grantor,
4320                                acl_mode,
4321                            },
4322                            variant,
4323                        });
4324                    }
4325                    UpdatePrivilegeVariant::Revoke
4326                        if !existing_privilege
4327                            .acl_mode
4328                            .intersection(acl_mode)
4329                            .is_empty() =>
4330                    {
4331                        ops.push(catalog::Op::UpdatePrivilege {
4332                            target_id: target_id.clone(),
4333                            privilege: MzAclItem {
4334                                grantee: *grantee,
4335                                grantor,
4336                                acl_mode,
4337                            },
4338                            variant,
4339                        });
4340                    }
4341                    // no-op
4342                    _ => {}
4343                }
4344            }
4345        }
4346
4347        if ops.is_empty() {
4348            session.add_notices(warnings);
4349            return Ok(variant.into());
4350        }
4351
4352        let res = self
4353            .catalog_transact(Some(session), ops)
4354            .await
4355            .map(|_| match variant {
4356                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4357                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4358            });
4359        if res.is_ok() {
4360            session.add_notices(warnings);
4361        }
4362        res
4363    }
4364
4365    #[instrument]
4366    pub(super) async fn sequence_alter_default_privileges(
4367        &mut self,
4368        session: &Session,
4369        plan::AlterDefaultPrivilegesPlan {
4370            privilege_objects,
4371            privilege_acl_items,
4372            is_grant,
4373        }: plan::AlterDefaultPrivilegesPlan,
4374    ) -> Result<ExecuteResponse, AdapterError> {
4375        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4376        let variant = if is_grant {
4377            UpdatePrivilegeVariant::Grant
4378        } else {
4379            UpdatePrivilegeVariant::Revoke
4380        };
4381        for privilege_object in &privilege_objects {
4382            self.catalog()
4383                .ensure_not_system_role(&privilege_object.role_id)?;
4384            self.catalog()
4385                .ensure_not_predefined_role(&privilege_object.role_id)?;
4386            if let Some(database_id) = privilege_object.database_id {
4387                self.catalog()
4388                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4389            }
4390            if let Some(schema_id) = privilege_object.schema_id {
4391                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4392                let schema_spec: SchemaSpecifier = schema_id.into();
4393
4394                self.catalog().ensure_not_reserved_object(
4395                    &(database_spec, schema_spec).into(),
4396                    session.conn_id(),
4397                )?;
4398            }
4399            for privilege_acl_item in &privilege_acl_items {
4400                self.catalog()
4401                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4402                self.catalog()
4403                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4404                ops.push(catalog::Op::UpdateDefaultPrivilege {
4405                    privilege_object: privilege_object.clone(),
4406                    privilege_acl_item: privilege_acl_item.clone(),
4407                    variant,
4408                })
4409            }
4410        }
4411
4412        self.catalog_transact(Some(session), ops).await?;
4413        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4414    }
4415
4416    #[instrument]
4417    pub(super) async fn sequence_grant_role(
4418        &mut self,
4419        session: &Session,
4420        plan::GrantRolePlan {
4421            role_ids,
4422            member_ids,
4423            grantor_id,
4424        }: plan::GrantRolePlan,
4425    ) -> Result<ExecuteResponse, AdapterError> {
4426        let catalog = self.catalog();
4427        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4428        for role_id in role_ids {
4429            for member_id in &member_ids {
4430                let member_membership: BTreeSet<_> =
4431                    catalog.get_role(member_id).membership().keys().collect();
4432                if member_membership.contains(&role_id) {
4433                    let role_name = catalog.get_role(&role_id).name().to_string();
4434                    let member_name = catalog.get_role(member_id).name().to_string();
4435                    // We need this check so we don't accidentally return a success on a reserved role.
4436                    catalog.ensure_not_reserved_role(member_id)?;
4437                    catalog.ensure_grantable_role(&role_id)?;
4438                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4439                        role_name,
4440                        member_name,
4441                    });
4442                } else {
4443                    ops.push(catalog::Op::GrantRole {
4444                        role_id,
4445                        member_id: *member_id,
4446                        grantor_id,
4447                    });
4448                }
4449            }
4450        }
4451
4452        if ops.is_empty() {
4453            return Ok(ExecuteResponse::GrantedRole);
4454        }
4455
4456        self.catalog_transact(Some(session), ops)
4457            .await
4458            .map(|_| ExecuteResponse::GrantedRole)
4459    }
4460
4461    #[instrument]
4462    pub(super) async fn sequence_revoke_role(
4463        &mut self,
4464        session: &Session,
4465        plan::RevokeRolePlan {
4466            role_ids,
4467            member_ids,
4468            grantor_id,
4469        }: plan::RevokeRolePlan,
4470    ) -> Result<ExecuteResponse, AdapterError> {
4471        let catalog = self.catalog();
4472        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4473        for role_id in role_ids {
4474            for member_id in &member_ids {
4475                let member_membership: BTreeSet<_> =
4476                    catalog.get_role(member_id).membership().keys().collect();
4477                if !member_membership.contains(&role_id) {
4478                    let role_name = catalog.get_role(&role_id).name().to_string();
4479                    let member_name = catalog.get_role(member_id).name().to_string();
4480                    // We need this check so we don't accidentally return a success on a reserved role.
4481                    catalog.ensure_not_reserved_role(member_id)?;
4482                    catalog.ensure_grantable_role(&role_id)?;
4483                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4484                        role_name,
4485                        member_name,
4486                    });
4487                } else {
4488                    ops.push(catalog::Op::RevokeRole {
4489                        role_id,
4490                        member_id: *member_id,
4491                        grantor_id,
4492                    });
4493                }
4494            }
4495        }
4496
4497        if ops.is_empty() {
4498            return Ok(ExecuteResponse::RevokedRole);
4499        }
4500
4501        self.catalog_transact(Some(session), ops)
4502            .await
4503            .map(|_| ExecuteResponse::RevokedRole)
4504    }
4505
4506    #[instrument]
4507    pub(super) async fn sequence_alter_owner(
4508        &mut self,
4509        session: &Session,
4510        plan::AlterOwnerPlan {
4511            id,
4512            object_type,
4513            new_owner,
4514        }: plan::AlterOwnerPlan,
4515    ) -> Result<ExecuteResponse, AdapterError> {
4516        let mut ops = vec![catalog::Op::UpdateOwner {
4517            id: id.clone(),
4518            new_owner,
4519        }];
4520
4521        match &id {
4522            ObjectId::Item(global_id) => {
4523                let entry = self.catalog().get_entry(global_id);
4524
4525                // Cannot directly change the owner of an index.
4526                if entry.is_index() {
4527                    let name = self
4528                        .catalog()
4529                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4530                        .to_string();
4531                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4532                    return Ok(ExecuteResponse::AlteredObject(object_type));
4533                }
4534
4535                // Alter owner cascades down to dependent indexes.
4536                let dependent_index_ops = entry
4537                    .used_by()
4538                    .into_iter()
4539                    .filter(|id| self.catalog().get_entry(id).is_index())
4540                    .map(|id| catalog::Op::UpdateOwner {
4541                        id: ObjectId::Item(*id),
4542                        new_owner,
4543                    });
4544                ops.extend(dependent_index_ops);
4545
4546                // Alter owner cascades down to progress collections.
4547                let dependent_subsources =
4548                    entry
4549                        .progress_id()
4550                        .into_iter()
4551                        .map(|item_id| catalog::Op::UpdateOwner {
4552                            id: ObjectId::Item(item_id),
4553                            new_owner,
4554                        });
4555                ops.extend(dependent_subsources);
4556            }
4557            ObjectId::Cluster(cluster_id) => {
4558                let cluster = self.catalog().get_cluster(*cluster_id);
4559                // Alter owner cascades down to cluster replicas.
4560                let managed_cluster_replica_ops =
4561                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4562                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4563                        new_owner,
4564                    });
4565                ops.extend(managed_cluster_replica_ops);
4566            }
4567            _ => {}
4568        }
4569
4570        self.catalog_transact(Some(session), ops)
4571            .await
4572            .map(|_| ExecuteResponse::AlteredObject(object_type))
4573    }
4574
4575    #[instrument]
4576    pub(super) async fn sequence_reassign_owned(
4577        &mut self,
4578        session: &Session,
4579        plan::ReassignOwnedPlan {
4580            old_roles,
4581            new_role,
4582            reassign_ids,
4583        }: plan::ReassignOwnedPlan,
4584    ) -> Result<ExecuteResponse, AdapterError> {
4585        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4586            self.catalog().ensure_not_reserved_role(role_id)?;
4587        }
4588
4589        let ops = reassign_ids
4590            .into_iter()
4591            .map(|id| catalog::Op::UpdateOwner {
4592                id,
4593                new_owner: new_role,
4594            })
4595            .collect();
4596
4597        self.catalog_transact(Some(session), ops)
4598            .await
4599            .map(|_| ExecuteResponse::ReassignOwned)
4600    }
4601
4602    #[instrument]
4603    pub(crate) async fn handle_deferred_statement(&mut self) {
4604        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4605        // was processed, removing the single element from deferred_statements, so it is expected
4606        // that this is sometimes empty.
4607        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4608            return;
4609        };
4610        match ps {
4611            crate::coord::PlanStatement::Statement { stmt, params } => {
4612                self.handle_execute_inner(stmt, params, ctx).await;
4613            }
4614            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4615                self.sequence_plan(ctx, plan, resolved_ids).await;
4616            }
4617        }
4618    }
4619
4620    #[instrument]
4621    // TODO(parkmycar): Remove this once we have an actual implementation.
4622    #[allow(clippy::unused_async)]
4623    pub(super) async fn sequence_alter_table(
4624        &mut self,
4625        ctx: &mut ExecuteContext,
4626        plan: plan::AlterTablePlan,
4627    ) -> Result<ExecuteResponse, AdapterError> {
4628        let plan::AlterTablePlan {
4629            relation_id,
4630            column_name,
4631            column_type,
4632            raw_sql_type,
4633        } = plan;
4634
4635        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4636        let id_ts = self.get_catalog_write_ts().await;
4637        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4638        let ops = vec![catalog::Op::AlterAddColumn {
4639            id: relation_id,
4640            new_global_id,
4641            name: column_name,
4642            typ: column_type,
4643            sql: raw_sql_type,
4644        }];
4645
4646        self.catalog_transact_with_context(None, Some(ctx), ops)
4647            .await?;
4648
4649        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4650    }
4651
4652    #[instrument]
4653    pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4654        &mut self,
4655        ctx: &ExecuteContext,
4656        plan: AlterMaterializedViewApplyReplacementPlan,
4657    ) -> Result<ExecuteResponse, AdapterError> {
4658        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4659
4660        // TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
4661        // new MV's as-of, to ensure no times are skipped.
4662
4663        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4664        self.catalog_transact(Some(ctx.session()), ops).await?;
4665
4666        Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4667    }
4668
4669    pub(super) async fn statistics_oracle(
4670        &self,
4671        session: &Session,
4672        source_ids: &BTreeSet<GlobalId>,
4673        query_as_of: &Antichain<Timestamp>,
4674        is_oneshot: bool,
4675    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4676        super::statistics_oracle(
4677            session,
4678            source_ids,
4679            query_as_of,
4680            is_oneshot,
4681            self.catalog().system_config(),
4682            self.controller.storage_collections.as_ref(),
4683        )
4684        .await
4685    }
4686}
4687
4688impl Coordinator {
4689    /// Process the metainfo from a newly created non-transient dataflow.
4690    async fn process_dataflow_metainfo(
4691        &mut self,
4692        df_meta: DataflowMetainfo,
4693        export_id: GlobalId,
4694        ctx: Option<&mut ExecuteContext>,
4695        notice_ids: Vec<GlobalId>,
4696    ) -> Option<BuiltinTableAppendNotify> {
4697        // Emit raw notices to the user.
4698        if let Some(ctx) = ctx {
4699            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4700        }
4701
4702        // Create a metainfo with rendered notices.
4703        let df_meta = self
4704            .catalog()
4705            .render_notices(df_meta, notice_ids, Some(export_id));
4706
4707        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4708        // in-memory state.
4709        if self.catalog().state().system_config().enable_mz_notices()
4710            && !df_meta.optimizer_notices.is_empty()
4711        {
4712            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4713            self.catalog().state().pack_optimizer_notices(
4714                &mut builtin_table_updates,
4715                df_meta.optimizer_notices.iter(),
4716                Diff::ONE,
4717            );
4718
4719            // Save the metainfo.
4720            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4721
4722            Some(
4723                self.builtin_table_update()
4724                    .execute(builtin_table_updates)
4725                    .await
4726                    .0,
4727            )
4728        } else {
4729            // Save the metainfo.
4730            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4731
4732            None
4733        }
4734    }
4735}