mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_cloud_resources::VpcEndpointConfig;
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43    RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48    SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58    SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62    StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
103    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
104    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
105    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
106    validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114    WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119mod cluster;
120mod copy_from;
121mod create_continual_task;
122mod create_index;
123mod create_materialized_view;
124mod create_view;
125mod explain_timestamp;
126mod peek;
127mod secret;
128mod subscribe;
129
130/// Attempts to evaluate an expression. If an error is returned then the error is sent
131/// to the client and the function is exited.
132macro_rules! return_if_err {
133    ($expr:expr, $ctx:expr) => {
134        match $expr {
135            Ok(v) => v,
136            Err(e) => return $ctx.retire(Err(e.into())),
137        }
138    };
139}
140
141pub(super) use return_if_err;
142
143struct DropOps {
144    ops: Vec<catalog::Op>,
145    dropped_active_db: bool,
146    dropped_active_cluster: bool,
147    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
148}
149
150// A bundle of values returned from create_source_inner
151struct CreateSourceInner {
152    ops: Vec<catalog::Op>,
153    sources: Vec<(CatalogItemId, Source)>,
154    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
155}
156
157impl Coordinator {
158    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
159    /// execute both on and off of the coordinator thread. Stages can either produce another stage
160    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
161    /// tracing.
162    pub(crate) async fn sequence_staged<S>(
163        &mut self,
164        mut ctx: S::Ctx,
165        parent_span: Span,
166        mut stage: S,
167    ) where
168        S: Staged + 'static,
169        S::Ctx: Send + 'static,
170    {
171        return_if_err!(stage.validity().check(self.catalog()), ctx);
172        loop {
173            let mut cancel_enabled = stage.cancel_enabled();
174            if let Some(session) = ctx.session() {
175                if cancel_enabled {
176                    // Channel to await cancellation. Insert a new channel, but check if the previous one
177                    // was already canceled.
178                    if let Some((_prev_tx, prev_rx)) = self
179                        .staged_cancellation
180                        .insert(session.conn_id().clone(), watch::channel(false))
181                    {
182                        let was_canceled = *prev_rx.borrow();
183                        if was_canceled {
184                            ctx.retire(Err(AdapterError::Canceled));
185                            return;
186                        }
187                    }
188                } else {
189                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
190                    // when cancel_enabled may have been true on an earlier stage.
191                    self.staged_cancellation.remove(session.conn_id());
192                }
193            } else {
194                cancel_enabled = false
195            };
196            let next = stage
197                .stage(self, &mut ctx)
198                .instrument(parent_span.clone())
199                .await;
200            let res = return_if_err!(next, ctx);
201            stage = match res {
202                StageResult::Handle(handle) => {
203                    let internal_cmd_tx = self.internal_cmd_tx.clone();
204                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
205                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
206                    });
207                    return;
208                }
209                StageResult::HandleRetire(handle) => {
210                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
211                        ctx.retire(Ok(resp));
212                    });
213                    return;
214                }
215                StageResult::Response(resp) => {
216                    ctx.retire(Ok(resp));
217                    return;
218                }
219                StageResult::Immediate(stage) => *stage,
220            }
221        }
222    }
223
224    fn handle_spawn<C, T, F>(
225        &self,
226        ctx: C,
227        handle: JoinHandle<Result<T, AdapterError>>,
228        cancel_enabled: bool,
229        f: F,
230    ) where
231        C: StagedContext + Send + 'static,
232        T: Send + 'static,
233        F: FnOnce(C, T) + Send + 'static,
234    {
235        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
236            .session()
237            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
238        {
239            let mut rx = rx.clone();
240            Box::pin(async move {
241                // Wait for true or dropped sender.
242                let _ = rx.wait_for(|v| *v).await;
243                ()
244            })
245        } else {
246            Box::pin(future::pending())
247        };
248        spawn(|| "sequence_staged", async move {
249            tokio::select! {
250                res = handle => {
251                    let next = return_if_err!(res, ctx);
252                    f(ctx, next);
253                }
254                _ = rx, if cancel_enabled => {
255                    ctx.retire(Err(AdapterError::Canceled));
256                }
257            }
258        });
259    }
260
261    async fn create_source_inner(
262        &self,
263        session: &Session,
264        plans: Vec<plan::CreateSourcePlanBundle>,
265    ) -> Result<CreateSourceInner, AdapterError> {
266        let mut ops = vec![];
267        let mut sources = vec![];
268
269        let if_not_exists_ids = plans
270            .iter()
271            .filter_map(
272                |plan::CreateSourcePlanBundle {
273                     item_id,
274                     global_id: _,
275                     plan,
276                     resolved_ids: _,
277                     available_source_references: _,
278                 }| {
279                    if plan.if_not_exists {
280                        Some((*item_id, plan.name.clone()))
281                    } else {
282                        None
283                    }
284                },
285            )
286            .collect::<BTreeMap<_, _>>();
287
288        for plan::CreateSourcePlanBundle {
289            item_id,
290            global_id,
291            mut plan,
292            resolved_ids,
293            available_source_references,
294        } in plans
295        {
296            let name = plan.name.clone();
297
298            match plan.source.data_source {
299                plan::DataSourceDesc::Ingestion(ref desc)
300                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
301                    let cluster_id = plan
302                        .in_cluster
303                        .expect("ingestion plans must specify cluster");
304                    match desc.connection {
305                        GenericSourceConnection::Postgres(_)
306                        | GenericSourceConnection::MySql(_)
307                        | GenericSourceConnection::SqlServer(_)
308                        | GenericSourceConnection::Kafka(_)
309                        | GenericSourceConnection::LoadGenerator(_) => {
310                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
311                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
312                                    .get(self.catalog().system_config().dyncfgs());
313
314                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
315                                {
316                                    return Err(AdapterError::Unsupported(
317                                        "sources in clusters with >1 replicas",
318                                    ));
319                                }
320                            }
321                        }
322                    }
323                }
324                plan::DataSourceDesc::Webhook { .. } => {
325                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
326                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
327                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
328                            .get(self.catalog().system_config().dyncfgs());
329
330                        if !enable_multi_replica_sources {
331                            if cluster.replica_ids().len() > 1 {
332                                return Err(AdapterError::Unsupported(
333                                    "webhook sources in clusters with >1 replicas",
334                                ));
335                            }
336                        }
337                    }
338                }
339                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
340            }
341
342            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
343            if let mz_sql::plan::DataSourceDesc::Webhook {
344                validate_using: Some(validate),
345                ..
346            } = &mut plan.source.data_source
347            {
348                if let Err(reason) = validate.reduce_expression().await {
349                    self.metrics
350                        .webhook_validation_reduce_failures
351                        .with_label_values(&[reason])
352                        .inc();
353                    return Err(AdapterError::Internal(format!(
354                        "failed to reduce check expression, {reason}"
355                    )));
356                }
357            }
358
359            // If this source contained a set of available source references, update the
360            // source references catalog table.
361            let mut reference_ops = vec![];
362            if let Some(references) = &available_source_references {
363                reference_ops.push(catalog::Op::UpdateSourceReferences {
364                    source_id: item_id,
365                    references: references.clone().into(),
366                });
367            }
368
369            let source = Source::new(plan, global_id, resolved_ids, None, false);
370            ops.push(catalog::Op::CreateItem {
371                id: item_id,
372                name,
373                item: CatalogItem::Source(source.clone()),
374                owner_id: *session.current_role_id(),
375            });
376            sources.push((item_id, source));
377            // These operations must be executed after the source is added to the catalog.
378            ops.extend(reference_ops);
379        }
380
381        Ok(CreateSourceInner {
382            ops,
383            sources,
384            if_not_exists_ids,
385        })
386    }
387
388    /// Subsources are planned differently from other statements because they
389    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
390    /// Because of this, we have usually "missed" the opportunity to plan them
391    /// through the normal statement execution life cycle (the exception being
392    /// during bootstrapping).
393    ///
394    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
395    pub(crate) fn plan_subsource(
396        &self,
397        session: &Session,
398        params: &mz_sql::plan::Params,
399        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
400        item_id: CatalogItemId,
401        global_id: GlobalId,
402    ) -> Result<CreateSourcePlanBundle, AdapterError> {
403        let catalog = self.catalog().for_session(session);
404        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
405
406        let plan = self.plan_statement(
407            session,
408            Statement::CreateSubsource(subsource_stmt),
409            params,
410            &resolved_ids,
411        )?;
412        let plan = match plan {
413            Plan::CreateSource(plan) => plan,
414            _ => unreachable!(),
415        };
416        Ok(CreateSourcePlanBundle {
417            item_id,
418            global_id,
419            plan,
420            resolved_ids,
421            available_source_references: None,
422        })
423    }
424
425    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
426    pub(crate) async fn plan_purified_alter_source_add_subsource(
427        &mut self,
428        session: &Session,
429        params: Params,
430        source_name: ResolvedItemName,
431        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
432        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
433    ) -> Result<(Plan, ResolvedIds), AdapterError> {
434        let mut subsource_plans = Vec::with_capacity(subsources.len());
435
436        // Generate subsource statements
437        let conn_catalog = self.catalog().for_system_session();
438        let pcx = plan::PlanContext::zero();
439        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
440
441        let entry = self.catalog().get_entry(source_name.item_id());
442        let source = entry.source().ok_or_else(|| {
443            AdapterError::internal(
444                "plan alter source",
445                format!("expected Source found {entry:?}"),
446            )
447        })?;
448
449        let item_id = entry.id();
450        let ingestion_id = source.global_id();
451        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
452
453        let id_ts = self.get_catalog_write_ts().await;
454        let ids = self
455            .catalog()
456            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
457            .await?;
458        for (subsource_stmt, (item_id, global_id)) in
459            subsource_stmts.into_iter().zip_eq(ids.into_iter())
460        {
461            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
462            subsource_plans.push(s);
463        }
464
465        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
466            subsources: subsource_plans,
467            options,
468        };
469
470        Ok((
471            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
472                item_id,
473                ingestion_id,
474                action,
475            }),
476            ResolvedIds::empty(),
477        ))
478    }
479
480    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
481    pub(crate) fn plan_purified_alter_source_refresh_references(
482        &self,
483        _session: &Session,
484        _params: Params,
485        source_name: ResolvedItemName,
486        available_source_references: plan::SourceReferences,
487    ) -> Result<(Plan, ResolvedIds), AdapterError> {
488        let entry = self.catalog().get_entry(source_name.item_id());
489        let source = entry.source().ok_or_else(|| {
490            AdapterError::internal(
491                "plan alter source",
492                format!("expected Source found {entry:?}"),
493            )
494        })?;
495        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
496            references: available_source_references,
497        };
498
499        Ok((
500            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
501                item_id: entry.id(),
502                ingestion_id: source.global_id(),
503                action,
504            }),
505            ResolvedIds::empty(),
506        ))
507    }
508
509    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
510    /// the primary source, and any ingestion export subsources (e.g. PG
511    /// tables).
512    pub(crate) async fn plan_purified_create_source(
513        &mut self,
514        ctx: &ExecuteContext,
515        params: Params,
516        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
517        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
518        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
519        available_source_references: plan::SourceReferences,
520    ) -> Result<(Plan, ResolvedIds), AdapterError> {
521        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
522
523        // 1. First plan the progress subsource, if any.
524        if let Some(progress_stmt) = progress_stmt {
525            // The primary source depends on this subsource because the primary
526            // source needs to know its shard ID, and the easiest way of
527            // guaranteeing that the shard ID is discoverable is to create this
528            // collection first.
529            assert_none!(progress_stmt.of_source);
530            let id_ts = self.get_catalog_write_ts().await;
531            let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
532            let progress_plan =
533                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
534            let progress_full_name = self
535                .catalog()
536                .resolve_full_name(&progress_plan.plan.name, None);
537            let progress_subsource = ResolvedItemName::Item {
538                id: progress_plan.item_id,
539                qualifiers: progress_plan.plan.name.qualifiers.clone(),
540                full_name: progress_full_name,
541                print_id: true,
542                version: RelationVersionSelector::Latest,
543            };
544
545            create_source_plans.push(progress_plan);
546
547            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548        }
549
550        let catalog = self.catalog().for_session(ctx.session());
551        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553        let propagated_with_options: Vec<_> = source_stmt
554            .with_options
555            .iter()
556            .filter_map(|opt| match opt.name {
557                CreateSourceOptionName::TimestampInterval => None,
558                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559                    name: CreateSubsourceOptionName::RetainHistory,
560                    value: opt.value.clone(),
561                }),
562            })
563            .collect();
564
565        // 2. Then plan the main source.
566        let source_plan = match self.plan_statement(
567            ctx.session(),
568            Statement::CreateSource(source_stmt),
569            &params,
570            &resolved_ids,
571        )? {
572            Plan::CreateSource(plan) => plan,
573            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574        };
575
576        let id_ts = self.get_catalog_write_ts().await;
577        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
578
579        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
580        let of_source = ResolvedItemName::Item {
581            id: item_id,
582            qualifiers: source_plan.name.qualifiers.clone(),
583            full_name: source_full_name,
584            print_id: true,
585            version: RelationVersionSelector::Latest,
586        };
587
588        // Generate subsource statements
589        let conn_catalog = self.catalog().for_system_session();
590        let pcx = plan::PlanContext::zero();
591        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
592
593        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
594
595        for subsource_stmt in subsource_stmts.iter_mut() {
596            subsource_stmt
597                .with_options
598                .extend(propagated_with_options.iter().cloned())
599        }
600
601        create_source_plans.push(CreateSourcePlanBundle {
602            item_id,
603            global_id,
604            plan: source_plan,
605            resolved_ids: resolved_ids.clone(),
606            available_source_references: Some(available_source_references),
607        });
608
609        // 3. Finally, plan all the subsources
610        let id_ts = self.get_catalog_write_ts().await;
611        let ids = self
612            .catalog()
613            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
614            .await?;
615        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
616            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
617            create_source_plans.push(plan);
618        }
619
620        Ok((
621            Plan::CreateSources(create_source_plans),
622            ResolvedIds::empty(),
623        ))
624    }
625
626    #[instrument]
627    pub(super) async fn sequence_create_source(
628        &mut self,
629        ctx: &mut ExecuteContext,
630        plans: Vec<plan::CreateSourcePlanBundle>,
631    ) -> Result<ExecuteResponse, AdapterError> {
632        let CreateSourceInner {
633            ops,
634            sources,
635            if_not_exists_ids,
636        } = self.create_source_inner(ctx.session(), plans).await?;
637
638        let transact_result = self
639            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
640            .await;
641
642        // Check if any sources are webhook sources and report them as created.
643        for (item_id, source) in &sources {
644            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
645                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
646                    ctx.session()
647                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
648                }
649            }
650        }
651
652        match transact_result {
653            Ok(()) => Ok(ExecuteResponse::CreatedSource),
654            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
655                kind:
656                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
657            })) if if_not_exists_ids.contains_key(&id) => {
658                ctx.session()
659                    .add_notice(AdapterNotice::ObjectAlreadyExists {
660                        name: if_not_exists_ids[&id].item.clone(),
661                        ty: "source",
662                    });
663                Ok(ExecuteResponse::CreatedSource)
664            }
665            Err(err) => Err(err),
666        }
667    }
668
669    #[instrument]
670    pub(super) async fn sequence_create_connection(
671        &mut self,
672        mut ctx: ExecuteContext,
673        plan: plan::CreateConnectionPlan,
674        resolved_ids: ResolvedIds,
675    ) {
676        let id_ts = self.get_catalog_write_ts().await;
677        let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
678            Ok(item_id) => item_id,
679            Err(err) => return ctx.retire(Err(err.into())),
680        };
681
682        match &plan.connection.details {
683            ConnectionDetails::Ssh { key_1, key_2, .. } => {
684                let key_1 = match key_1.as_key_pair() {
685                    Some(key_1) => key_1.clone(),
686                    None => {
687                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
688                            "the PUBLIC KEY 1 option cannot be explicitly specified"
689                        ))));
690                    }
691                };
692
693                let key_2 = match key_2.as_key_pair() {
694                    Some(key_2) => key_2.clone(),
695                    None => {
696                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
697                            "the PUBLIC KEY 2 option cannot be explicitly specified"
698                        ))));
699                    }
700                };
701
702                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
703                let secret = key_set.to_bytes();
704                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
705                    return ctx.retire(Err(err.into()));
706                }
707            }
708            _ => (),
709        };
710
711        if plan.validate {
712            let internal_cmd_tx = self.internal_cmd_tx.clone();
713            let transient_revision = self.catalog().transient_revision();
714            let conn_id = ctx.session().conn_id().clone();
715            let otel_ctx = OpenTelemetryContext::obtain();
716            let role_metadata = ctx.session().role_metadata().clone();
717
718            let connection = plan
719                .connection
720                .details
721                .to_connection()
722                .into_inline_connection(self.catalog().state());
723
724            let current_storage_parameters = self.controller.storage.config().clone();
725            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
726                let result = match connection
727                    .validate(connection_id, &current_storage_parameters)
728                    .await
729                {
730                    Ok(()) => Ok(plan),
731                    Err(err) => Err(err.into()),
732                };
733
734                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
735                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
736                    CreateConnectionValidationReady {
737                        ctx,
738                        result,
739                        connection_id,
740                        connection_gid,
741                        plan_validity: PlanValidity::new(
742                            transient_revision,
743                            resolved_ids.items().copied().collect(),
744                            None,
745                            None,
746                            role_metadata,
747                        ),
748                        otel_ctx,
749                        resolved_ids: resolved_ids.clone(),
750                    },
751                ));
752                if let Err(e) = result {
753                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
754                }
755            });
756        } else {
757            let result = self
758                .sequence_create_connection_stage_finish(
759                    &mut ctx,
760                    connection_id,
761                    connection_gid,
762                    plan,
763                    resolved_ids,
764                )
765                .await;
766            ctx.retire(result);
767        }
768    }
769
770    #[instrument]
771    pub(crate) async fn sequence_create_connection_stage_finish(
772        &mut self,
773        ctx: &mut ExecuteContext,
774        connection_id: CatalogItemId,
775        connection_gid: GlobalId,
776        plan: plan::CreateConnectionPlan,
777        resolved_ids: ResolvedIds,
778    ) -> Result<ExecuteResponse, AdapterError> {
779        let ops = vec![catalog::Op::CreateItem {
780            id: connection_id,
781            name: plan.name.clone(),
782            item: CatalogItem::Connection(Connection {
783                create_sql: plan.connection.create_sql,
784                global_id: connection_gid,
785                details: plan.connection.details.clone(),
786                resolved_ids,
787            }),
788            owner_id: *ctx.session().current_role_id(),
789        }];
790
791        let transact_result = self
792            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
793                Box::pin(async move {
794                    match plan.connection.details {
795                        ConnectionDetails::AwsPrivatelink(ref privatelink) => {
796                            let spec = VpcEndpointConfig {
797                                aws_service_name: privatelink.service_name.to_owned(),
798                                availability_zone_ids: privatelink.availability_zones.to_owned(),
799                            };
800                            let cloud_resource_controller =
801                                match coord.cloud_resource_controller.as_ref().cloned() {
802                                    Some(controller) => controller,
803                                    None => {
804                                        tracing::warn!("AWS PrivateLink connections unsupported");
805                                        return;
806                                    }
807                                };
808                            if let Err(err) = cloud_resource_controller
809                                .ensure_vpc_endpoint(connection_id, spec)
810                                .await
811                            {
812                                tracing::warn!(?err, "failed to ensure vpc endpoint!");
813                            }
814                        }
815                        _ => {}
816                    }
817                })
818            })
819            .await;
820
821        match transact_result {
822            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
823            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
824                kind:
825                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
826            })) if plan.if_not_exists => {
827                ctx.session()
828                    .add_notice(AdapterNotice::ObjectAlreadyExists {
829                        name: plan.name.item,
830                        ty: "connection",
831                    });
832                Ok(ExecuteResponse::CreatedConnection)
833            }
834            Err(err) => Err(err),
835        }
836    }
837
838    #[instrument]
839    pub(super) async fn sequence_create_database(
840        &mut self,
841        session: &Session,
842        plan: plan::CreateDatabasePlan,
843    ) -> Result<ExecuteResponse, AdapterError> {
844        let ops = vec![catalog::Op::CreateDatabase {
845            name: plan.name.clone(),
846            owner_id: *session.current_role_id(),
847        }];
848        match self.catalog_transact(Some(session), ops).await {
849            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
850            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851                kind:
852                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
853            })) if plan.if_not_exists => {
854                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
855                Ok(ExecuteResponse::CreatedDatabase)
856            }
857            Err(err) => Err(err),
858        }
859    }
860
861    #[instrument]
862    pub(super) async fn sequence_create_schema(
863        &mut self,
864        session: &Session,
865        plan: plan::CreateSchemaPlan,
866    ) -> Result<ExecuteResponse, AdapterError> {
867        let op = catalog::Op::CreateSchema {
868            database_id: plan.database_spec,
869            schema_name: plan.schema_name.clone(),
870            owner_id: *session.current_role_id(),
871        };
872        match self.catalog_transact(Some(session), vec![op]).await {
873            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
874            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
875                kind:
876                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
877            })) if plan.if_not_exists => {
878                session.add_notice(AdapterNotice::SchemaAlreadyExists {
879                    name: plan.schema_name,
880                });
881                Ok(ExecuteResponse::CreatedSchema)
882            }
883            Err(err) => Err(err),
884        }
885    }
886
887    /// Validates the role attributes for a `CREATE ROLE` statement.
888    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
889        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
890            if attributes.superuser.is_some()
891                || attributes.password.is_some()
892                || attributes.login.is_some()
893            {
894                return Err(AdapterError::UnavailableFeature {
895                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
896                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
897                });
898            }
899        }
900        Ok(())
901    }
902
903    #[instrument]
904    pub(super) async fn sequence_create_role(
905        &mut self,
906        conn_id: Option<&ConnectionId>,
907        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
908    ) -> Result<ExecuteResponse, AdapterError> {
909        self.validate_role_attributes(&attributes.clone())?;
910        let op = catalog::Op::CreateRole { name, attributes };
911        self.catalog_transact_with_context(conn_id, None, vec![op])
912            .await
913            .map(|_| ExecuteResponse::CreatedRole)
914    }
915
916    #[instrument]
917    pub(super) async fn sequence_create_network_policy(
918        &mut self,
919        session: &Session,
920        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
921    ) -> Result<ExecuteResponse, AdapterError> {
922        let op = catalog::Op::CreateNetworkPolicy {
923            rules,
924            name,
925            owner_id: *session.current_role_id(),
926        };
927        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
928            .await
929            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
930    }
931
932    #[instrument]
933    pub(super) async fn sequence_alter_network_policy(
934        &mut self,
935        session: &Session,
936        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
937    ) -> Result<ExecuteResponse, AdapterError> {
938        // TODO(network_policy): Consider role based network policies here.
939        let current_network_policy_name =
940            self.catalog().system_config().default_network_policy_name();
941        // Check if the way we're alerting the policy is still valid for the current connection.
942        if current_network_policy_name == name {
943            self.validate_alter_network_policy(session, &rules)?;
944        }
945
946        let op = catalog::Op::AlterNetworkPolicy {
947            id,
948            rules,
949            name,
950            owner_id: *session.current_role_id(),
951        };
952        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
953            .await
954            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
955    }
956
957    #[instrument]
958    pub(super) async fn sequence_create_table(
959        &mut self,
960        ctx: &mut ExecuteContext,
961        plan: plan::CreateTablePlan,
962        resolved_ids: ResolvedIds,
963    ) -> Result<ExecuteResponse, AdapterError> {
964        let plan::CreateTablePlan {
965            name,
966            table,
967            if_not_exists,
968        } = plan;
969
970        let conn_id = if table.temporary {
971            Some(ctx.session().conn_id())
972        } else {
973            None
974        };
975        let id_ts = self.get_catalog_write_ts().await;
976        let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
977        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
978
979        let data_source = match table.data_source {
980            plan::TableDataSource::TableWrites { defaults } => {
981                TableDataSource::TableWrites { defaults }
982            }
983            plan::TableDataSource::DataSource {
984                desc: data_source_plan,
985                timeline,
986            } => match data_source_plan {
987                plan::DataSourceDesc::IngestionExport {
988                    ingestion_id,
989                    external_reference,
990                    details,
991                    data_config,
992                } => TableDataSource::DataSource {
993                    desc: DataSourceDesc::IngestionExport {
994                        ingestion_id,
995                        external_reference,
996                        details,
997                        data_config,
998                    },
999                    timeline,
1000                },
1001                plan::DataSourceDesc::Webhook {
1002                    validate_using,
1003                    body_format,
1004                    headers,
1005                    cluster_id,
1006                } => TableDataSource::DataSource {
1007                    desc: DataSourceDesc::Webhook {
1008                        validate_using,
1009                        body_format,
1010                        headers,
1011                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1012                    },
1013                    timeline,
1014                },
1015                o => {
1016                    unreachable!("CREATE TABLE data source got {:?}", o)
1017                }
1018            },
1019        };
1020
1021        let is_webhook = if let TableDataSource::DataSource {
1022            desc: DataSourceDesc::Webhook { .. },
1023            timeline: _,
1024        } = &data_source
1025        {
1026            true
1027        } else {
1028            false
1029        };
1030
1031        let table = Table {
1032            create_sql: Some(table.create_sql),
1033            desc: table.desc,
1034            collections,
1035            conn_id: conn_id.cloned(),
1036            resolved_ids,
1037            custom_logical_compaction_window: table.compaction_window,
1038            is_retained_metrics_object: false,
1039            data_source,
1040        };
1041        let ops = vec![catalog::Op::CreateItem {
1042            id: table_id,
1043            name: name.clone(),
1044            item: CatalogItem::Table(table.clone()),
1045            owner_id: *ctx.session().current_role_id(),
1046        }];
1047
1048        let catalog_result = self
1049            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1050            .await;
1051
1052        if is_webhook {
1053            // try_get_webhook_url will make up a URL for things that are not
1054            // webhooks, so we guard against that here.
1055            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1056                ctx.session()
1057                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1058            }
1059        }
1060
1061        match catalog_result {
1062            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1063            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1064                kind:
1065                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1066            })) if if_not_exists => {
1067                ctx.session_mut()
1068                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1069                        name: name.item,
1070                        ty: "table",
1071                    });
1072                Ok(ExecuteResponse::CreatedTable)
1073            }
1074            Err(err) => Err(err),
1075        }
1076    }
1077
1078    #[instrument]
1079    pub(super) async fn sequence_create_sink(
1080        &mut self,
1081        ctx: ExecuteContext,
1082        plan: plan::CreateSinkPlan,
1083        resolved_ids: ResolvedIds,
1084    ) {
1085        let plan::CreateSinkPlan {
1086            name,
1087            sink,
1088            with_snapshot,
1089            if_not_exists,
1090            in_cluster,
1091        } = plan;
1092
1093        // First try to allocate an ID and an OID. If either fails, we're done.
1094        let id_ts = self.get_catalog_write_ts().await;
1095        let (item_id, global_id) =
1096            return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1097
1098        let catalog_sink = Sink {
1099            create_sql: sink.create_sql,
1100            global_id,
1101            from: sink.from,
1102            connection: sink.connection,
1103            envelope: sink.envelope,
1104            version: sink.version,
1105            with_snapshot,
1106            resolved_ids,
1107            cluster_id: in_cluster,
1108        };
1109
1110        let ops = vec![catalog::Op::CreateItem {
1111            id: item_id,
1112            name: name.clone(),
1113            item: CatalogItem::Sink(catalog_sink.clone()),
1114            owner_id: *ctx.session().current_role_id(),
1115        }];
1116
1117        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1118
1119        match result {
1120            Ok(()) => {}
1121            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1122                kind:
1123                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1124            })) if if_not_exists => {
1125                ctx.session()
1126                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1127                        name: name.item,
1128                        ty: "sink",
1129                    });
1130                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1131                return;
1132            }
1133            Err(e) => {
1134                ctx.retire(Err(e));
1135                return;
1136            }
1137        };
1138
1139        self.create_storage_export(global_id, &catalog_sink)
1140            .await
1141            .unwrap_or_terminate("cannot fail to create exports");
1142
1143        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1144            .await;
1145
1146        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1147    }
1148
1149    /// Validates that a view definition does not contain any expressions that may lead to
1150    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1151    ///
1152    /// We prevent these expressions so that we can add columns to system tables without
1153    /// changing the definition of the view.
1154    ///
1155    /// Here is a bit of a hand wavy proof as to why we only need to check the
1156    /// immediate view definition for system objects and ambiguous column
1157    /// references, and not the entire dependency tree:
1158    ///
1159    ///   - A view with no object references cannot have any ambiguous column
1160    ///   references to a system object, because it has no system objects.
1161    ///   - A view with a direct reference to a system object and a * or
1162    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1163    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1164    ///   any ambiguous column references to a system object, because all column
1165    ///   references are explicitly named.
1166    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1167    ///   system object cannot have any ambiguous column references to a system
1168    ///   object, because there are no system objects in the top level view and
1169    ///   all sub-views are guaranteed to have no ambiguous column references to
1170    ///   system objects.
1171    pub(super) fn validate_system_column_references(
1172        &self,
1173        uses_ambiguous_columns: bool,
1174        depends_on: &BTreeSet<GlobalId>,
1175    ) -> Result<(), AdapterError> {
1176        if uses_ambiguous_columns
1177            && depends_on
1178                .iter()
1179                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1180        {
1181            Err(AdapterError::AmbiguousSystemColumnReference)
1182        } else {
1183            Ok(())
1184        }
1185    }
1186
1187    #[instrument]
1188    pub(super) async fn sequence_create_type(
1189        &mut self,
1190        session: &Session,
1191        plan: plan::CreateTypePlan,
1192        resolved_ids: ResolvedIds,
1193    ) -> Result<ExecuteResponse, AdapterError> {
1194        let id_ts = self.get_catalog_write_ts().await;
1195        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1196        // Validate the type definition (e.g., composite columns) before storing.
1197        plan.typ
1198            .inner
1199            .desc(&self.catalog().for_session(session))
1200            .map_err(AdapterError::from)?;
1201        let typ = Type {
1202            create_sql: Some(plan.typ.create_sql),
1203            global_id,
1204            details: CatalogTypeDetails {
1205                array_id: None,
1206                typ: plan.typ.inner,
1207                pg_metadata: None,
1208            },
1209            resolved_ids,
1210        };
1211        let op = catalog::Op::CreateItem {
1212            id: item_id,
1213            name: plan.name,
1214            item: CatalogItem::Type(typ),
1215            owner_id: *session.current_role_id(),
1216        };
1217        match self.catalog_transact(Some(session), vec![op]).await {
1218            Ok(()) => Ok(ExecuteResponse::CreatedType),
1219            Err(err) => Err(err),
1220        }
1221    }
1222
1223    #[instrument]
1224    pub(super) async fn sequence_comment_on(
1225        &mut self,
1226        session: &Session,
1227        plan: plan::CommentPlan,
1228    ) -> Result<ExecuteResponse, AdapterError> {
1229        let op = catalog::Op::Comment {
1230            object_id: plan.object_id,
1231            sub_component: plan.sub_component,
1232            comment: plan.comment,
1233        };
1234        self.catalog_transact(Some(session), vec![op]).await?;
1235        Ok(ExecuteResponse::Comment)
1236    }
1237
1238    #[instrument]
1239    pub(super) async fn sequence_drop_objects(
1240        &mut self,
1241        ctx: &mut ExecuteContext,
1242        plan::DropObjectsPlan {
1243            drop_ids,
1244            object_type,
1245            referenced_ids,
1246        }: plan::DropObjectsPlan,
1247    ) -> Result<ExecuteResponse, AdapterError> {
1248        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1249        let mut objects = Vec::new();
1250        for obj_id in &drop_ids {
1251            if !referenced_ids_hashset.contains(obj_id) {
1252                let object_info = ErrorMessageObjectDescription::from_id(
1253                    obj_id,
1254                    &self.catalog().for_session(ctx.session()),
1255                )
1256                .to_string();
1257                objects.push(object_info);
1258            }
1259        }
1260
1261        if !objects.is_empty() {
1262            ctx.session()
1263                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1264        }
1265
1266        let DropOps {
1267            ops,
1268            dropped_active_db,
1269            dropped_active_cluster,
1270            dropped_in_use_indexes,
1271        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1272
1273        self.catalog_transact_with_context(None, Some(ctx), ops)
1274            .await?;
1275
1276        fail::fail_point!("after_sequencer_drop_replica");
1277
1278        if dropped_active_db {
1279            ctx.session()
1280                .add_notice(AdapterNotice::DroppedActiveDatabase {
1281                    name: ctx.session().vars().database().to_string(),
1282                });
1283        }
1284        if dropped_active_cluster {
1285            ctx.session()
1286                .add_notice(AdapterNotice::DroppedActiveCluster {
1287                    name: ctx.session().vars().cluster().to_string(),
1288                });
1289        }
1290        for dropped_in_use_index in dropped_in_use_indexes {
1291            ctx.session()
1292                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1293            self.metrics
1294                .optimization_notices
1295                .with_label_values(&["DroppedInUseIndex"])
1296                .inc_by(1);
1297        }
1298        Ok(ExecuteResponse::DroppedObject(object_type))
1299    }
1300
1301    fn validate_dropped_role_ownership(
1302        &self,
1303        session: &Session,
1304        dropped_roles: &BTreeMap<RoleId, &str>,
1305    ) -> Result<(), AdapterError> {
1306        fn privilege_check(
1307            privileges: &PrivilegeMap,
1308            dropped_roles: &BTreeMap<RoleId, &str>,
1309            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1310            object_id: &SystemObjectId,
1311            catalog: &ConnCatalog,
1312        ) {
1313            for privilege in privileges.all_values() {
1314                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1315                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1316                    let object_description =
1317                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1318                    dependent_objects
1319                        .entry(role_name.to_string())
1320                        .or_default()
1321                        .push(format!(
1322                            "privileges on {object_description} granted by {grantor_name}",
1323                        ));
1324                }
1325                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1326                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1327                    let object_description =
1328                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1329                    dependent_objects
1330                        .entry(role_name.to_string())
1331                        .or_default()
1332                        .push(format!(
1333                            "privileges granted on {object_description} to {grantee_name}"
1334                        ));
1335                }
1336            }
1337        }
1338
1339        let catalog = self.catalog().for_session(session);
1340        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1341        for entry in self.catalog.entries() {
1342            let id = SystemObjectId::Object(entry.id().into());
1343            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1344                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1345                dependent_objects
1346                    .entry(role_name.to_string())
1347                    .or_default()
1348                    .push(format!("owner of {object_description}"));
1349            }
1350            privilege_check(
1351                entry.privileges(),
1352                dropped_roles,
1353                &mut dependent_objects,
1354                &id,
1355                &catalog,
1356            );
1357        }
1358        for database in self.catalog.databases() {
1359            let database_id = SystemObjectId::Object(database.id().into());
1360            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1361                let object_description =
1362                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1363                dependent_objects
1364                    .entry(role_name.to_string())
1365                    .or_default()
1366                    .push(format!("owner of {object_description}"));
1367            }
1368            privilege_check(
1369                &database.privileges,
1370                dropped_roles,
1371                &mut dependent_objects,
1372                &database_id,
1373                &catalog,
1374            );
1375            for schema in database.schemas_by_id.values() {
1376                let schema_id = SystemObjectId::Object(
1377                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1378                );
1379                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1380                    let object_description =
1381                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1382                    dependent_objects
1383                        .entry(role_name.to_string())
1384                        .or_default()
1385                        .push(format!("owner of {object_description}"));
1386                }
1387                privilege_check(
1388                    &schema.privileges,
1389                    dropped_roles,
1390                    &mut dependent_objects,
1391                    &schema_id,
1392                    &catalog,
1393                );
1394            }
1395        }
1396        for cluster in self.catalog.clusters() {
1397            let cluster_id = SystemObjectId::Object(cluster.id().into());
1398            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1399                let object_description =
1400                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1401                dependent_objects
1402                    .entry(role_name.to_string())
1403                    .or_default()
1404                    .push(format!("owner of {object_description}"));
1405            }
1406            privilege_check(
1407                &cluster.privileges,
1408                dropped_roles,
1409                &mut dependent_objects,
1410                &cluster_id,
1411                &catalog,
1412            );
1413            for replica in cluster.replicas() {
1414                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1415                    let replica_id =
1416                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1417                    let object_description =
1418                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1419                    dependent_objects
1420                        .entry(role_name.to_string())
1421                        .or_default()
1422                        .push(format!("owner of {object_description}"));
1423                }
1424            }
1425        }
1426        privilege_check(
1427            self.catalog().system_privileges(),
1428            dropped_roles,
1429            &mut dependent_objects,
1430            &SystemObjectId::System,
1431            &catalog,
1432        );
1433        for (default_privilege_object, default_privilege_acl_items) in
1434            self.catalog.default_privileges()
1435        {
1436            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1437                dependent_objects
1438                    .entry(role_name.to_string())
1439                    .or_default()
1440                    .push(format!(
1441                        "default privileges on {}S created by {}",
1442                        default_privilege_object.object_type, role_name
1443                    ));
1444            }
1445            for default_privilege_acl_item in default_privilege_acl_items {
1446                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1447                    dependent_objects
1448                        .entry(role_name.to_string())
1449                        .or_default()
1450                        .push(format!(
1451                            "default privileges on {}S granted to {}",
1452                            default_privilege_object.object_type, role_name
1453                        ));
1454                }
1455            }
1456        }
1457
1458        if !dependent_objects.is_empty() {
1459            Err(AdapterError::DependentObject(dependent_objects))
1460        } else {
1461            Ok(())
1462        }
1463    }
1464
1465    #[instrument]
1466    pub(super) async fn sequence_drop_owned(
1467        &mut self,
1468        session: &Session,
1469        plan: plan::DropOwnedPlan,
1470    ) -> Result<ExecuteResponse, AdapterError> {
1471        for role_id in &plan.role_ids {
1472            self.catalog().ensure_not_reserved_role(role_id)?;
1473        }
1474
1475        let mut privilege_revokes = plan.privilege_revokes;
1476
1477        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1478        let session_catalog = self.catalog().for_session(session);
1479        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1480            && !session.is_superuser()
1481        {
1482            // Obtain all roles that the current session is a member of.
1483            let role_membership =
1484                session_catalog.collect_role_membership(session.current_role_id());
1485            let invalid_revokes: BTreeSet<_> = privilege_revokes
1486                .extract_if(.., |(_, privilege)| {
1487                    !role_membership.contains(&privilege.grantor)
1488                })
1489                .map(|(object_id, _)| object_id)
1490                .collect();
1491            for invalid_revoke in invalid_revokes {
1492                let object_description =
1493                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1494                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1495            }
1496        }
1497
1498        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1499            catalog::Op::UpdatePrivilege {
1500                target_id: object_id,
1501                privilege,
1502                variant: UpdatePrivilegeVariant::Revoke,
1503            }
1504        });
1505        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1506            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1507                privilege_object,
1508                privilege_acl_item,
1509                variant: UpdatePrivilegeVariant::Revoke,
1510            },
1511        );
1512        let DropOps {
1513            ops: drop_ops,
1514            dropped_active_db,
1515            dropped_active_cluster,
1516            dropped_in_use_indexes,
1517        } = self.sequence_drop_common(session, plan.drop_ids)?;
1518
1519        let ops = privilege_revoke_ops
1520            .chain(default_privilege_revoke_ops)
1521            .chain(drop_ops.into_iter())
1522            .collect();
1523
1524        self.catalog_transact(Some(session), ops).await?;
1525
1526        if dropped_active_db {
1527            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1528                name: session.vars().database().to_string(),
1529            });
1530        }
1531        if dropped_active_cluster {
1532            session.add_notice(AdapterNotice::DroppedActiveCluster {
1533                name: session.vars().cluster().to_string(),
1534            });
1535        }
1536        for dropped_in_use_index in dropped_in_use_indexes {
1537            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1538        }
1539        Ok(ExecuteResponse::DroppedOwned)
1540    }
1541
1542    fn sequence_drop_common(
1543        &self,
1544        session: &Session,
1545        ids: Vec<ObjectId>,
1546    ) -> Result<DropOps, AdapterError> {
1547        let mut dropped_active_db = false;
1548        let mut dropped_active_cluster = false;
1549        let mut dropped_in_use_indexes = Vec::new();
1550        let mut dropped_roles = BTreeMap::new();
1551        let mut dropped_databases = BTreeSet::new();
1552        let mut dropped_schemas = BTreeSet::new();
1553        // Dropping either the group role or the member role of a role membership will trigger a
1554        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1555        // role membership twice.
1556        let mut role_revokes = BTreeSet::new();
1557        // Dropping a database or a schema will revoke all default roles associated with that
1558        // database or schema.
1559        let mut default_privilege_revokes = BTreeSet::new();
1560
1561        // Clusters we're dropping
1562        let mut clusters_to_drop = BTreeSet::new();
1563
1564        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1565        for id in &ids {
1566            match id {
1567                ObjectId::Database(id) => {
1568                    let name = self.catalog().get_database(id).name();
1569                    if name == session.vars().database() {
1570                        dropped_active_db = true;
1571                    }
1572                    dropped_databases.insert(id);
1573                }
1574                ObjectId::Schema((_, spec)) => {
1575                    if let SchemaSpecifier::Id(id) = spec {
1576                        dropped_schemas.insert(id);
1577                    }
1578                }
1579                ObjectId::Cluster(id) => {
1580                    clusters_to_drop.insert(*id);
1581                    if let Some(active_id) = self
1582                        .catalog()
1583                        .active_cluster(session)
1584                        .ok()
1585                        .map(|cluster| cluster.id())
1586                    {
1587                        if id == &active_id {
1588                            dropped_active_cluster = true;
1589                        }
1590                    }
1591                }
1592                ObjectId::Role(id) => {
1593                    let role = self.catalog().get_role(id);
1594                    let name = role.name();
1595                    dropped_roles.insert(*id, name);
1596                    // We must revoke all role memberships that the dropped roles belongs to.
1597                    for (group_id, grantor_id) in &role.membership.map {
1598                        role_revokes.insert((*group_id, *id, *grantor_id));
1599                    }
1600                }
1601                ObjectId::Item(id) => {
1602                    if let Some(index) = self.catalog().get_entry(id).index() {
1603                        let humanizer = self.catalog().for_session(session);
1604                        let dependants = self
1605                            .controller
1606                            .compute
1607                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1608                            .ok()
1609                            .into_iter()
1610                            .flatten()
1611                            .filter(|dependant_id| {
1612                                // Transient Ids belong to Peeks. We are not interested for now in
1613                                // peeks depending on a dropped index.
1614                                // TODO: show a different notice in this case. Something like
1615                                // "There is an in-progress ad hoc SELECT that uses the dropped
1616                                // index. The resources used by the index will be freed when all
1617                                // such SELECTs complete."
1618                                if dependant_id.is_transient() {
1619                                    return false;
1620                                }
1621                                // The item should exist, but don't panic if it doesn't.
1622                                let Some(dependent_id) = humanizer
1623                                    .try_get_item_by_global_id(dependant_id)
1624                                    .map(|item| item.id())
1625                                else {
1626                                    return false;
1627                                };
1628                                // If the dependent object is also being dropped, then there is no
1629                                // problem, so we don't want a notice.
1630                                !ids_set.contains(&ObjectId::Item(dependent_id))
1631                            })
1632                            .flat_map(|dependant_id| {
1633                                // If we are not able to find a name for this ID it probably means
1634                                // we have already dropped the compute collection, in which case we
1635                                // can ignore it.
1636                                humanizer.humanize_id(dependant_id)
1637                            })
1638                            .collect_vec();
1639                        if !dependants.is_empty() {
1640                            dropped_in_use_indexes.push(DroppedInUseIndex {
1641                                index_name: humanizer
1642                                    .humanize_id(index.global_id())
1643                                    .unwrap_or_else(|| id.to_string()),
1644                                dependant_objects: dependants,
1645                            });
1646                        }
1647                    }
1648                }
1649                _ => {}
1650            }
1651        }
1652
1653        for id in &ids {
1654            match id {
1655                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1656                // unless they are internal replicas, which exist outside the scope
1657                // of managed clusters.
1658                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1659                    if !clusters_to_drop.contains(cluster_id) {
1660                        let cluster = self.catalog.get_cluster(*cluster_id);
1661                        if cluster.is_managed() {
1662                            let replica =
1663                                cluster.replica(*replica_id).expect("Catalog out of sync");
1664                            if !replica.config.location.internal() {
1665                                coord_bail!("cannot drop replica of managed cluster");
1666                            }
1667                        }
1668                    }
1669                }
1670                _ => {}
1671            }
1672        }
1673
1674        for role_id in dropped_roles.keys() {
1675            self.catalog().ensure_not_reserved_role(role_id)?;
1676        }
1677        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1678        // If any role is a member of a dropped role, then we must revoke that membership.
1679        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1680        for role in self.catalog().user_roles() {
1681            for dropped_role_id in
1682                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1683            {
1684                role_revokes.insert((
1685                    **dropped_role_id,
1686                    role.id(),
1687                    *role
1688                        .membership
1689                        .map
1690                        .get(*dropped_role_id)
1691                        .expect("included in keys above"),
1692                ));
1693            }
1694        }
1695
1696        for (default_privilege_object, default_privilege_acls) in
1697            self.catalog().default_privileges()
1698        {
1699            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1700                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1701            {
1702                for default_privilege_acl in default_privilege_acls {
1703                    default_privilege_revokes.insert((
1704                        default_privilege_object.clone(),
1705                        default_privilege_acl.clone(),
1706                    ));
1707                }
1708            }
1709        }
1710
1711        let ops = role_revokes
1712            .into_iter()
1713            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1714                role_id,
1715                member_id,
1716                grantor_id,
1717            })
1718            .chain(default_privilege_revokes.into_iter().map(
1719                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1720                    privilege_object,
1721                    privilege_acl_item,
1722                    variant: UpdatePrivilegeVariant::Revoke,
1723                },
1724            ))
1725            .chain(iter::once(catalog::Op::DropObjects(
1726                ids.into_iter()
1727                    .map(DropObjectInfo::manual_drop_from_object_id)
1728                    .collect(),
1729            )))
1730            .collect();
1731
1732        Ok(DropOps {
1733            ops,
1734            dropped_active_db,
1735            dropped_active_cluster,
1736            dropped_in_use_indexes,
1737        })
1738    }
1739
1740    pub(super) fn sequence_explain_schema(
1741        &self,
1742        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1743    ) -> Result<ExecuteResponse, AdapterError> {
1744        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1745            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1746        })?;
1747
1748        let json_string = json_string(&json_value);
1749        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1750        Ok(Self::send_immediate_rows(row))
1751    }
1752
1753    pub(super) fn sequence_show_all_variables(
1754        &self,
1755        session: &Session,
1756    ) -> Result<ExecuteResponse, AdapterError> {
1757        let mut rows = viewable_variables(self.catalog().state(), session)
1758            .map(|v| (v.name(), v.value(), v.description()))
1759            .collect::<Vec<_>>();
1760        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1761
1762        // TODO(parkmycar): Pack all of these into a single RowCollection.
1763        let rows: Vec<_> = rows
1764            .into_iter()
1765            .map(|(name, val, desc)| {
1766                Row::pack_slice(&[
1767                    Datum::String(name),
1768                    Datum::String(&val),
1769                    Datum::String(desc),
1770                ])
1771            })
1772            .collect();
1773        Ok(Self::send_immediate_rows(rows))
1774    }
1775
1776    pub(super) fn sequence_show_variable(
1777        &self,
1778        session: &Session,
1779        plan: plan::ShowVariablePlan,
1780    ) -> Result<ExecuteResponse, AdapterError> {
1781        if &plan.name == SCHEMA_ALIAS {
1782            let schemas = self.catalog.resolve_search_path(session);
1783            let schema = schemas.first();
1784            return match schema {
1785                Some((database_spec, schema_spec)) => {
1786                    let schema_name = &self
1787                        .catalog
1788                        .get_schema(database_spec, schema_spec, session.conn_id())
1789                        .name()
1790                        .schema;
1791                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1792                    Ok(Self::send_immediate_rows(row))
1793                }
1794                None => {
1795                    if session.vars().current_object_missing_warnings() {
1796                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1797                            search_path: session
1798                                .vars()
1799                                .search_path()
1800                                .into_iter()
1801                                .map(|schema| schema.to_string())
1802                                .collect(),
1803                        });
1804                    }
1805                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1806                }
1807            };
1808        }
1809
1810        let variable = session
1811            .vars()
1812            .get(self.catalog().system_config(), &plan.name)
1813            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1814
1815        // In lieu of plumbing the user to all system config functions, just check that the var is
1816        // visible.
1817        variable.visible(session.user(), self.catalog().system_config())?;
1818
1819        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1820        if variable.name() == vars::DATABASE.name()
1821            && matches!(
1822                self.catalog().resolve_database(&variable.value()),
1823                Err(CatalogError::UnknownDatabase(_))
1824            )
1825            && session.vars().current_object_missing_warnings()
1826        {
1827            let name = variable.value();
1828            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1829        } else if variable.name() == vars::CLUSTER.name()
1830            && matches!(
1831                self.catalog().resolve_cluster(&variable.value()),
1832                Err(CatalogError::UnknownCluster(_))
1833            )
1834            && session.vars().current_object_missing_warnings()
1835        {
1836            let name = variable.value();
1837            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1838        }
1839        Ok(Self::send_immediate_rows(row))
1840    }
1841
1842    #[instrument]
1843    pub(super) async fn sequence_inspect_shard(
1844        &self,
1845        session: &Session,
1846        plan: plan::InspectShardPlan,
1847    ) -> Result<ExecuteResponse, AdapterError> {
1848        // TODO: Not thrilled about this rbac special case here, but probably
1849        // sufficient for now.
1850        if !session.user().is_internal() {
1851            return Err(AdapterError::Unauthorized(
1852                rbac::UnauthorizedError::MzSystem {
1853                    action: "inspect".into(),
1854                },
1855            ));
1856        }
1857        let state = self
1858            .controller
1859            .storage
1860            .inspect_persist_state(plan.id)
1861            .await?;
1862        let jsonb = Jsonb::from_serde_json(state)?;
1863        Ok(Self::send_immediate_rows(jsonb.into_row()))
1864    }
1865
1866    #[instrument]
1867    pub(super) fn sequence_set_variable(
1868        &self,
1869        session: &mut Session,
1870        plan: plan::SetVariablePlan,
1871    ) -> Result<ExecuteResponse, AdapterError> {
1872        let (name, local) = (plan.name, plan.local);
1873        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1874            self.validate_set_isolation_level(session)?;
1875        }
1876        if &name == vars::CLUSTER.name() {
1877            self.validate_set_cluster(session)?;
1878        }
1879
1880        let vars = session.vars_mut();
1881        let values = match plan.value {
1882            plan::VariableValue::Default => None,
1883            plan::VariableValue::Values(values) => Some(values),
1884        };
1885
1886        match values {
1887            Some(values) => {
1888                vars.set(
1889                    self.catalog().system_config(),
1890                    &name,
1891                    VarInput::SqlSet(&values),
1892                    local,
1893                )?;
1894
1895                let vars = session.vars();
1896
1897                // Emit a warning when deprecated variables are used.
1898                // TODO(database-issues#8069) remove this after sufficient time has passed
1899                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1900                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1901                } else if name == vars::CLUSTER.name()
1902                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1903                {
1904                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1905                }
1906
1907                // Database or cluster value does not correspond to a catalog item.
1908                if name.as_str() == vars::DATABASE.name()
1909                    && matches!(
1910                        self.catalog().resolve_database(vars.database()),
1911                        Err(CatalogError::UnknownDatabase(_))
1912                    )
1913                    && session.vars().current_object_missing_warnings()
1914                {
1915                    let name = vars.database().to_string();
1916                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1917                } else if name.as_str() == vars::CLUSTER.name()
1918                    && matches!(
1919                        self.catalog().resolve_cluster(vars.cluster()),
1920                        Err(CatalogError::UnknownCluster(_))
1921                    )
1922                    && session.vars().current_object_missing_warnings()
1923                {
1924                    let name = vars.cluster().to_string();
1925                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1926                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1927                    let v = values.into_first().to_lowercase();
1928                    if v == IsolationLevel::ReadUncommitted.as_str()
1929                        || v == IsolationLevel::ReadCommitted.as_str()
1930                        || v == IsolationLevel::RepeatableRead.as_str()
1931                    {
1932                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1933                            isolation_level: v,
1934                        });
1935                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1936                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1937                    }
1938                }
1939            }
1940            None => vars.reset(self.catalog().system_config(), &name, local)?,
1941        }
1942
1943        Ok(ExecuteResponse::SetVariable { name, reset: false })
1944    }
1945
1946    pub(super) fn sequence_reset_variable(
1947        &self,
1948        session: &mut Session,
1949        plan: plan::ResetVariablePlan,
1950    ) -> Result<ExecuteResponse, AdapterError> {
1951        let name = plan.name;
1952        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1953            self.validate_set_isolation_level(session)?;
1954        }
1955        if &name == vars::CLUSTER.name() {
1956            self.validate_set_cluster(session)?;
1957        }
1958        session
1959            .vars_mut()
1960            .reset(self.catalog().system_config(), &name, false)?;
1961        Ok(ExecuteResponse::SetVariable { name, reset: true })
1962    }
1963
1964    pub(super) fn sequence_set_transaction(
1965        &self,
1966        session: &mut Session,
1967        plan: plan::SetTransactionPlan,
1968    ) -> Result<ExecuteResponse, AdapterError> {
1969        // TODO(jkosh44) Only supports isolation levels for now.
1970        for mode in plan.modes {
1971            match mode {
1972                TransactionMode::AccessMode(_) => {
1973                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1974                }
1975                TransactionMode::IsolationLevel(isolation_level) => {
1976                    self.validate_set_isolation_level(session)?;
1977
1978                    session.vars_mut().set(
1979                        self.catalog().system_config(),
1980                        TRANSACTION_ISOLATION_VAR_NAME,
1981                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1982                        plan.local,
1983                    )?
1984                }
1985            }
1986        }
1987        Ok(ExecuteResponse::SetVariable {
1988            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1989            reset: false,
1990        })
1991    }
1992
1993    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1994        if session.transaction().contains_ops() {
1995            Err(AdapterError::InvalidSetIsolationLevel)
1996        } else {
1997            Ok(())
1998        }
1999    }
2000
2001    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2002        if session.transaction().contains_ops() {
2003            Err(AdapterError::InvalidSetCluster)
2004        } else {
2005            Ok(())
2006        }
2007    }
2008
2009    #[instrument]
2010    pub(super) async fn sequence_end_transaction(
2011        &mut self,
2012        mut ctx: ExecuteContext,
2013        mut action: EndTransactionAction,
2014    ) {
2015        // If the transaction has failed, we can only rollback.
2016        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2017            (&action, ctx.session().transaction())
2018        {
2019            action = EndTransactionAction::Rollback;
2020        }
2021        let response = match action {
2022            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2023                params: BTreeMap::new(),
2024            }),
2025            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2026                params: BTreeMap::new(),
2027            }),
2028        };
2029
2030        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2031
2032        let (response, action) = match result {
2033            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2034                (response, action)
2035            }
2036            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2037                // Make sure we have the correct set of write locks for this transaction.
2038                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2039                // transactions.
2040                let validated_locks = match write_lock_guards {
2041                    None => None,
2042                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2043                        Ok(locks) => Some(locks),
2044                        Err(missing) => {
2045                            tracing::error!(?missing, "programming error, missing write locks");
2046                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2047                        }
2048                    },
2049                };
2050
2051                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2052                for WriteOp { id, rows } in writes {
2053                    let total_rows = collected_writes.entry(id).or_default();
2054                    total_rows.push(rows);
2055                }
2056
2057                self.submit_write(PendingWriteTxn::User {
2058                    span: Span::current(),
2059                    writes: collected_writes,
2060                    write_locks: validated_locks,
2061                    pending_txn: PendingTxn {
2062                        ctx,
2063                        response,
2064                        action,
2065                    },
2066                });
2067                return;
2068            }
2069            Ok((
2070                Some(TransactionOps::Peeks {
2071                    determination,
2072                    requires_linearization: RequireLinearization::Required,
2073                    ..
2074                }),
2075                _,
2076            )) if ctx.session().vars().transaction_isolation()
2077                == &IsolationLevel::StrictSerializable =>
2078            {
2079                let conn_id = ctx.session().conn_id().clone();
2080                let pending_read_txn = PendingReadTxn {
2081                    txn: PendingRead::Read {
2082                        txn: PendingTxn {
2083                            ctx,
2084                            response,
2085                            action,
2086                        },
2087                    },
2088                    timestamp_context: determination.timestamp_context,
2089                    created: Instant::now(),
2090                    num_requeues: 0,
2091                    otel_ctx: OpenTelemetryContext::obtain(),
2092                };
2093                self.strict_serializable_reads_tx
2094                    .send((conn_id, pending_read_txn))
2095                    .expect("sending to strict_serializable_reads_tx cannot fail");
2096                return;
2097            }
2098            Ok((
2099                Some(TransactionOps::Peeks {
2100                    determination,
2101                    requires_linearization: RequireLinearization::Required,
2102                    ..
2103                }),
2104                _,
2105            )) if ctx.session().vars().transaction_isolation()
2106                == &IsolationLevel::StrongSessionSerializable =>
2107            {
2108                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2109                    ctx.session_mut()
2110                        .ensure_timestamp_oracle(timeline.clone())
2111                        .apply_write(*ts);
2112                }
2113                (response, action)
2114            }
2115            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2116                self.internal_cmd_tx
2117                    .send(Message::ExecuteSingleStatementTransaction {
2118                        ctx,
2119                        otel_ctx: OpenTelemetryContext::obtain(),
2120                        stmt,
2121                        params,
2122                    })
2123                    .expect("must send");
2124                return;
2125            }
2126            Ok((_, _)) => (response, action),
2127            Err(err) => (Err(err), EndTransactionAction::Rollback),
2128        };
2129        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2130        // Append any parameters that changed to the response.
2131        let response = response.map(|mut r| {
2132            r.extend_params(changed);
2133            ExecuteResponse::from(r)
2134        });
2135
2136        ctx.retire(response);
2137    }
2138
2139    #[instrument]
2140    async fn sequence_end_transaction_inner(
2141        &mut self,
2142        ctx: &mut ExecuteContext,
2143        action: EndTransactionAction,
2144    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2145        let txn = self.clear_transaction(ctx.session_mut()).await;
2146
2147        if let EndTransactionAction::Commit = action {
2148            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2149                match &mut ops {
2150                    TransactionOps::Writes(writes) => {
2151                        for WriteOp { id, .. } in &mut writes.iter() {
2152                            // Re-verify this id exists.
2153                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2154                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2155                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2156                                        CatalogError::UnknownItem(id.to_string()),
2157                                    ),
2158                                })
2159                            })?;
2160                        }
2161
2162                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2163                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2164                    }
2165                    TransactionOps::DDL {
2166                        ops,
2167                        state: _,
2168                        side_effects,
2169                        revision,
2170                    } => {
2171                        // Make sure our catalog hasn't changed.
2172                        if *revision != self.catalog().transient_revision() {
2173                            return Err(AdapterError::DDLTransactionRace);
2174                        }
2175                        // Commit all of our queued ops.
2176                        let ops = std::mem::take(ops);
2177                        let side_effects = std::mem::take(side_effects);
2178                        self.catalog_transact_with_side_effects(
2179                            Some(ctx),
2180                            ops,
2181                            move |a, mut ctx| {
2182                                Box::pin(async move {
2183                                    for side_effect in side_effects {
2184                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2185                                    }
2186                                })
2187                            },
2188                        )
2189                        .await?;
2190                    }
2191                    _ => (),
2192                }
2193                return Ok((Some(ops), write_lock_guards));
2194            }
2195        }
2196
2197        Ok((None, None))
2198    }
2199
2200    pub(super) async fn sequence_side_effecting_func(
2201        &mut self,
2202        ctx: ExecuteContext,
2203        plan: SideEffectingFunc,
2204    ) {
2205        match plan {
2206            SideEffectingFunc::PgCancelBackend { connection_id } => {
2207                if ctx.session().conn_id().unhandled() == connection_id {
2208                    // As a special case, if we're canceling ourselves, we send
2209                    // back a canceled resposne to the client issuing the query,
2210                    // and so we need to do no further processing of the cancel.
2211                    ctx.retire(Err(AdapterError::Canceled));
2212                    return;
2213                }
2214
2215                let res = if let Some((id_handle, _conn_meta)) =
2216                    self.active_conns.get_key_value(&connection_id)
2217                {
2218                    // check_plan already verified role membership.
2219                    self.handle_privileged_cancel(id_handle.clone()).await;
2220                    Datum::True
2221                } else {
2222                    Datum::False
2223                };
2224                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2225            }
2226        }
2227    }
2228
2229    /// Inner method that performs the actual real-time recency timestamp determination.
2230    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2231    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2232    pub(crate) async fn determine_real_time_recent_timestamp(
2233        &self,
2234        source_ids: impl Iterator<Item = GlobalId>,
2235        real_time_recency_timeout: Duration,
2236    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2237    {
2238        let item_ids = source_ids
2239            .map(|gid| {
2240                self.catalog
2241                    .try_resolve_item_id(&gid)
2242                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2243            })
2244            .collect::<Result<Vec<_>, _>>()?;
2245
2246        // Find all dependencies transitively because we need to ensure that
2247        // RTR queries determine the timestamp from the sources' (i.e.
2248        // storage objects that ingest data from external systems) remap
2249        // data. We "cheat" a little bit and filter out any IDs that aren't
2250        // user objects because we know they are not a RTR source.
2251        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2252        // If none of the sources are user objects, we don't need to provide
2253        // a RTR timestamp.
2254        if to_visit.is_empty() {
2255            return Ok(None);
2256        }
2257
2258        let mut timestamp_objects = BTreeSet::new();
2259
2260        while let Some(id) = to_visit.pop_front() {
2261            timestamp_objects.insert(id);
2262            to_visit.extend(
2263                self.catalog()
2264                    .get_entry(&id)
2265                    .uses()
2266                    .into_iter()
2267                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2268            );
2269        }
2270        let timestamp_objects = timestamp_objects
2271            .into_iter()
2272            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2273            .collect();
2274
2275        let r = self
2276            .controller
2277            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2278            .await?;
2279
2280        Ok(Some(r))
2281    }
2282
2283    /// Checks to see if the session needs a real time recency timestamp and if so returns
2284    /// a future that will return the timestamp.
2285    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2286        &self,
2287        session: &Session,
2288        source_ids: impl Iterator<Item = GlobalId>,
2289    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2290    {
2291        let vars = session.vars();
2292
2293        if vars.real_time_recency()
2294            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2295            && !session.contains_read_timestamp()
2296        {
2297            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2298                .await
2299        } else {
2300            Ok(None)
2301        }
2302    }
2303
2304    #[instrument]
2305    pub(super) async fn sequence_explain_plan(
2306        &mut self,
2307        ctx: ExecuteContext,
2308        plan: plan::ExplainPlanPlan,
2309        target_cluster: TargetCluster,
2310    ) {
2311        match &plan.explainee {
2312            plan::Explainee::Statement(stmt) => match stmt {
2313                plan::ExplaineeStatement::CreateView { .. } => {
2314                    self.explain_create_view(ctx, plan).await;
2315                }
2316                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2317                    self.explain_create_materialized_view(ctx, plan).await;
2318                }
2319                plan::ExplaineeStatement::CreateIndex { .. } => {
2320                    self.explain_create_index(ctx, plan).await;
2321                }
2322                plan::ExplaineeStatement::Select { .. } => {
2323                    self.explain_peek(ctx, plan, target_cluster).await;
2324                }
2325            },
2326            plan::Explainee::View(_) => {
2327                let result = self.explain_view(&ctx, plan);
2328                ctx.retire(result);
2329            }
2330            plan::Explainee::MaterializedView(_) => {
2331                let result = self.explain_materialized_view(&ctx, plan);
2332                ctx.retire(result);
2333            }
2334            plan::Explainee::Index(_) => {
2335                let result = self.explain_index(&ctx, plan);
2336                ctx.retire(result);
2337            }
2338            plan::Explainee::ReplanView(_) => {
2339                self.explain_replan_view(ctx, plan).await;
2340            }
2341            plan::Explainee::ReplanMaterializedView(_) => {
2342                self.explain_replan_materialized_view(ctx, plan).await;
2343            }
2344            plan::Explainee::ReplanIndex(_) => {
2345                self.explain_replan_index(ctx, plan).await;
2346            }
2347        };
2348    }
2349
2350    pub(super) async fn sequence_explain_pushdown(
2351        &mut self,
2352        ctx: ExecuteContext,
2353        plan: plan::ExplainPushdownPlan,
2354        target_cluster: TargetCluster,
2355    ) {
2356        match plan.explainee {
2357            Explainee::Statement(ExplaineeStatement::Select {
2358                broken: false,
2359                plan,
2360                desc: _,
2361            }) => {
2362                let stage = return_if_err!(
2363                    self.peek_validate(
2364                        ctx.session(),
2365                        plan,
2366                        target_cluster,
2367                        None,
2368                        ExplainContext::Pushdown,
2369                        Some(ctx.session().vars().max_query_result_size()),
2370                    ),
2371                    ctx
2372                );
2373                self.sequence_staged(ctx, Span::current(), stage).await;
2374            }
2375            Explainee::MaterializedView(item_id) => {
2376                self.explain_pushdown_materialized_view(ctx, item_id).await;
2377            }
2378            _ => {
2379                ctx.retire(Err(AdapterError::Unsupported(
2380                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2381                )));
2382            }
2383        };
2384    }
2385
2386    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2387    async fn execute_explain_pushdown_with_read_holds(
2388        &self,
2389        ctx: ExecuteContext,
2390        as_of: Antichain<Timestamp>,
2391        mz_now: ResultSpec<'static>,
2392        read_holds: Option<ReadHolds<Timestamp>>,
2393        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2394    ) {
2395        let fut = self
2396            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2397            .await;
2398        task::spawn(|| "render explain pushdown", async move {
2399            // Transfer the necessary read holds over to the background task
2400            let _read_holds = read_holds;
2401            let res = fut.await;
2402            ctx.retire(res);
2403        });
2404    }
2405
2406    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2407    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2408        &self,
2409        session: &Session,
2410        as_of: Antichain<Timestamp>,
2411        mz_now: ResultSpec<'static>,
2412        imports: I,
2413    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2414        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2415        super::explain_pushdown_future_inner(
2416            session,
2417            &self.catalog,
2418            &self.controller.storage_collections,
2419            as_of,
2420            mz_now,
2421            imports,
2422        )
2423        .await
2424    }
2425
2426    #[instrument]
2427    pub(super) async fn sequence_insert(
2428        &mut self,
2429        mut ctx: ExecuteContext,
2430        plan: plan::InsertPlan,
2431    ) {
2432        // Normally, this would get checked when trying to add "write ops" to
2433        // the transaction but we go down diverging paths below, based on
2434        // whether the INSERT is only constant values or not.
2435        //
2436        // For the non-constant case we sequence an implicit read-then-write,
2437        // which messes with the transaction ops and would allow an implicit
2438        // read-then-write to sneak into a read-only transaction.
2439        if !ctx.session_mut().transaction().allows_writes() {
2440            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2441            return;
2442        }
2443
2444        // The structure of this code originates from a time where
2445        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2446        // optimized `MirRelationExpr`.
2447        //
2448        // Ideally, we would like to make the `selection.as_const().is_some()`
2449        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2450        // are planned as a Wrap($n, $vals) call, so until we can reduce
2451        // HirRelationExpr this will always returns false.
2452        //
2453        // Unfortunately, hitting the default path of the match below also
2454        // causes a lot of tests to fail, so we opted to go with the extra
2455        // `plan.values.clone()` statements when producing the `optimized_mir`
2456        // and re-optimize the values in the `sequence_read_then_write` call.
2457        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2458            // We don't perform any optimizations on an expression that is already
2459            // a constant for writes, as we want to maximize bulk-insert throughput.
2460            let expr = return_if_err!(
2461                plan.values
2462                    .clone()
2463                    .lower(self.catalog().system_config(), None),
2464                ctx
2465            );
2466            OptimizedMirRelationExpr(expr)
2467        } else {
2468            // Collect optimizer parameters.
2469            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2470
2471            // (`optimize::view::Optimizer` has a special case for constant queries.)
2472            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2473
2474            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2475            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2476        };
2477
2478        match optimized_mir.into_inner() {
2479            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2480                let catalog = self.owned_catalog();
2481                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2482                    let result =
2483                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2484                    ctx.retire(result);
2485                });
2486            }
2487            // All non-constant values must be planned as read-then-writes.
2488            _ => {
2489                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2490                    Some(table) => {
2491                        // Inserts always occur at the latest version of the table.
2492                        let desc = table.relation_desc_latest().expect("table has a desc");
2493                        desc.arity()
2494                    }
2495                    None => {
2496                        ctx.retire(Err(AdapterError::Catalog(
2497                            mz_catalog::memory::error::Error {
2498                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2499                                    CatalogError::UnknownItem(plan.id.to_string()),
2500                                ),
2501                            },
2502                        )));
2503                        return;
2504                    }
2505                };
2506
2507                let finishing = RowSetFinishing {
2508                    order_by: vec![],
2509                    limit: None,
2510                    offset: 0,
2511                    project: (0..desc_arity).collect(),
2512                };
2513
2514                let read_then_write_plan = plan::ReadThenWritePlan {
2515                    id: plan.id,
2516                    selection: plan.values,
2517                    finishing,
2518                    assignments: BTreeMap::new(),
2519                    kind: MutationKind::Insert,
2520                    returning: plan.returning,
2521                };
2522
2523                self.sequence_read_then_write(ctx, read_then_write_plan)
2524                    .await;
2525            }
2526        }
2527    }
2528
2529    /// ReadThenWrite is a plan whose writes depend on the results of a
2530    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2531    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2532    /// serializability violation could occur.
2533    #[instrument]
2534    pub(super) async fn sequence_read_then_write(
2535        &mut self,
2536        mut ctx: ExecuteContext,
2537        plan: plan::ReadThenWritePlan,
2538    ) {
2539        let mut source_ids: BTreeSet<_> = plan
2540            .selection
2541            .depends_on()
2542            .into_iter()
2543            .map(|gid| self.catalog().resolve_item_id(&gid))
2544            .collect();
2545        source_ids.insert(plan.id);
2546
2547        // If the transaction doesn't already have write locks, acquire them.
2548        if ctx.session().transaction().write_locks().is_none() {
2549            // Pre-define all of the locks we need.
2550            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2551
2552            // Try acquiring all of our locks.
2553            for id in &source_ids {
2554                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2555                    write_locks.insert_lock(*id, lock);
2556                }
2557            }
2558
2559            // See if we acquired all of the neccessary locks.
2560            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2561                Ok(locks) => locks,
2562                Err(missing) => {
2563                    // Defer our write if we couldn't acquire all of the locks.
2564                    let role_metadata = ctx.session().role_metadata().clone();
2565                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2566                    let plan = DeferredPlan {
2567                        ctx,
2568                        plan: Plan::ReadThenWrite(plan),
2569                        validity: PlanValidity::new(
2570                            self.catalog.transient_revision(),
2571                            source_ids.clone(),
2572                            None,
2573                            None,
2574                            role_metadata,
2575                        ),
2576                        requires_locks: source_ids,
2577                    };
2578                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2579                }
2580            };
2581
2582            ctx.session_mut()
2583                .try_grant_write_locks(write_locks)
2584                .expect("session has already been granted write locks");
2585        }
2586
2587        let plan::ReadThenWritePlan {
2588            id,
2589            kind,
2590            selection,
2591            mut assignments,
2592            finishing,
2593            mut returning,
2594        } = plan;
2595
2596        // Read then writes can be queued, so re-verify the id exists.
2597        let desc = match self.catalog().try_get_entry(&id) {
2598            Some(table) => {
2599                // Inserts always occur at the latest version of the table.
2600                table
2601                    .relation_desc_latest()
2602                    .expect("table has a desc")
2603                    .into_owned()
2604            }
2605            None => {
2606                ctx.retire(Err(AdapterError::Catalog(
2607                    mz_catalog::memory::error::Error {
2608                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2609                            id.to_string(),
2610                        )),
2611                    },
2612                )));
2613                return;
2614            }
2615        };
2616
2617        // Disallow mz_now in any position because read time and write time differ.
2618        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2619            || assignments.values().any(|e| e.contains_temporal())
2620            || returning.iter().any(|e| e.contains_temporal());
2621        if contains_temporal {
2622            ctx.retire(Err(AdapterError::Unsupported(
2623                "calls to mz_now in write statements",
2624            )));
2625            return;
2626        }
2627
2628        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2629        //
2630        // - They do not refer to any objects whose notion of time moves differently than that of
2631        // user tables. This limitation is meant to ensure no writes occur between this read and the
2632        // subsequent write.
2633        // - They do not use mz_now(), whose time produced during read will differ from the write
2634        //   timestamp.
2635        fn validate_read_dependencies(
2636            catalog: &Catalog,
2637            id: &CatalogItemId,
2638        ) -> Result<(), AdapterError> {
2639            use CatalogItemType::*;
2640            use mz_catalog::memory::objects;
2641            let mut ids_to_check = Vec::new();
2642            let valid = match catalog.try_get_entry(id) {
2643                Some(entry) => {
2644                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2645                    | CatalogItem::MaterializedView(objects::MaterializedView {
2646                        optimized_expr,
2647                        ..
2648                    }) = entry.item()
2649                    {
2650                        if optimized_expr.contains_temporal() {
2651                            return Err(AdapterError::Unsupported(
2652                                "calls to mz_now in write statements",
2653                            ));
2654                        }
2655                    }
2656                    match entry.item().typ() {
2657                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2658                            ids_to_check.extend(entry.uses());
2659                            let valid_id = id.is_user() || matches!(typ, Func);
2660                            valid_id
2661                        }
2662                        Source | Secret | Connection => false,
2663                        // Cannot select from sinks or indexes.
2664                        Sink | Index => unreachable!(),
2665                        Table => {
2666                            if !id.is_user() {
2667                                // We can't read from non-user tables
2668                                false
2669                            } else {
2670                                // We can't read from tables that are source-exports
2671                                entry.source_export_details().is_none()
2672                            }
2673                        }
2674                        Type => true,
2675                    }
2676                }
2677                None => false,
2678            };
2679            if !valid {
2680                return Err(AdapterError::InvalidTableMutationSelection);
2681            }
2682            for id in ids_to_check {
2683                validate_read_dependencies(catalog, &id)?;
2684            }
2685            Ok(())
2686        }
2687
2688        for gid in selection.depends_on() {
2689            let item_id = self.catalog().resolve_item_id(&gid);
2690            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2691                ctx.retire(Err(err));
2692                return;
2693            }
2694        }
2695
2696        let (peek_tx, peek_rx) = oneshot::channel();
2697        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2698        let (tx, _, session, extra) = ctx.into_parts();
2699        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2700        // execution context, because this peek does not directly correspond to an execute,
2701        // and so we don't need to take any action on its retirement.
2702        // TODO[btv]: we might consider extending statement logging to log the inner
2703        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2704        // and mint a new "real" execution context here. We'd also have to add some logic to
2705        // make sure such "sub-statements" are always sampled when the top-level statement is
2706        //
2707        // It's debatable whether this makes sense conceptually,
2708        // because the inner fragment here is not actually a
2709        // "statement" in its own right.
2710        let peek_ctx = ExecuteContext::from_parts(
2711            peek_client_tx,
2712            self.internal_cmd_tx.clone(),
2713            session,
2714            Default::default(),
2715        );
2716
2717        self.sequence_peek(
2718            peek_ctx,
2719            plan::SelectPlan {
2720                select: None,
2721                source: selection,
2722                when: QueryWhen::FreshestTableWrite,
2723                finishing,
2724                copy_to: None,
2725            },
2726            TargetCluster::Active,
2727            None,
2728        )
2729        .await;
2730
2731        let internal_cmd_tx = self.internal_cmd_tx.clone();
2732        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2733        let catalog = self.owned_catalog();
2734        let max_result_size = self.catalog().system_config().max_result_size();
2735
2736        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2737            let (peek_response, session) = match peek_rx.await {
2738                Ok(Response {
2739                    result: Ok(resp),
2740                    session,
2741                    otel_ctx,
2742                }) => {
2743                    otel_ctx.attach_as_parent();
2744                    (resp, session)
2745                }
2746                Ok(Response {
2747                    result: Err(e),
2748                    session,
2749                    otel_ctx,
2750                }) => {
2751                    let ctx =
2752                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2753                    otel_ctx.attach_as_parent();
2754                    ctx.retire(Err(e));
2755                    return;
2756                }
2757                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2758                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2759            };
2760            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2761            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2762
2763            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2764            if timeout_dur == Duration::ZERO {
2765                timeout_dur = Duration::MAX;
2766            }
2767
2768            let style = ExprPrepStyle::OneShot {
2769                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2770                session: ctx.session(),
2771                catalog_state: catalog.state(),
2772            };
2773            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2774                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
2775            }
2776
2777            let make_diffs =
2778                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2779                    let arena = RowArena::new();
2780                    let mut diffs = Vec::new();
2781                    let mut datum_vec = mz_repr::DatumVec::new();
2782
2783                    while let Some(row) = rows.next() {
2784                        if !assignments.is_empty() {
2785                            assert!(
2786                                matches!(kind, MutationKind::Update),
2787                                "only updates support assignments"
2788                            );
2789                            let mut datums = datum_vec.borrow_with(row);
2790                            let mut updates = vec![];
2791                            for (idx, expr) in &assignments {
2792                                let updated = match expr.eval(&datums, &arena) {
2793                                    Ok(updated) => updated,
2794                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2795                                };
2796                                updates.push((*idx, updated));
2797                            }
2798                            for (idx, new_value) in updates {
2799                                datums[idx] = new_value;
2800                            }
2801                            let updated = Row::pack_slice(&datums);
2802                            diffs.push((updated, Diff::ONE));
2803                        }
2804                        match kind {
2805                            // Updates and deletes always remove the
2806                            // current row. Updates will also add an
2807                            // updated value.
2808                            MutationKind::Update | MutationKind::Delete => {
2809                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2810                            }
2811                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2812                        }
2813                    }
2814
2815                    // Sum of all the rows' byte size, for checking if we go
2816                    // above the max_result_size threshold.
2817                    let mut byte_size: u64 = 0;
2818                    for (row, diff) in &diffs {
2819                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2820                        if diff.is_positive() {
2821                            for (idx, datum) in row.iter().enumerate() {
2822                                desc.constraints_met(idx, &datum)?;
2823                            }
2824                        }
2825                    }
2826                    Ok((diffs, byte_size))
2827                };
2828
2829            let diffs = match peek_response {
2830                ExecuteResponse::SendingRowsStreaming {
2831                    rows: mut rows_stream,
2832                    ..
2833                } => {
2834                    let mut byte_size: u64 = 0;
2835                    let mut diffs = Vec::new();
2836                    let result = loop {
2837                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2838                            Ok(Some(res)) => match res {
2839                                PeekResponseUnary::Rows(new_rows) => {
2840                                    match make_diffs(new_rows) {
2841                                        Ok((mut new_diffs, new_byte_size)) => {
2842                                            byte_size = byte_size.saturating_add(new_byte_size);
2843                                            if byte_size > max_result_size {
2844                                                break Err(AdapterError::ResultSize(format!(
2845                                                    "result exceeds max size of {max_result_size}"
2846                                                )));
2847                                            }
2848                                            diffs.append(&mut new_diffs)
2849                                        }
2850                                        Err(e) => break Err(e),
2851                                    };
2852                                }
2853                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2854                                PeekResponseUnary::Error(e) => {
2855                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2856                                }
2857                            },
2858                            Ok(None) => break Ok(diffs),
2859                            Err(_) => {
2860                                // We timed out, so remove the pending peek. This is
2861                                // best-effort and doesn't guarantee we won't
2862                                // receive a response.
2863                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2864                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2865                                    conn_id: ctx.session().conn_id().clone(),
2866                                });
2867                                if let Err(e) = result {
2868                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2869                                }
2870                                break Err(AdapterError::StatementTimeout);
2871                            }
2872                        }
2873                    };
2874
2875                    result
2876                }
2877                ExecuteResponse::SendingRowsImmediate { rows } => {
2878                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2879                }
2880                resp => Err(AdapterError::Unstructured(anyhow!(
2881                    "unexpected peek response: {resp:?}"
2882                ))),
2883            };
2884
2885            let mut returning_rows = Vec::new();
2886            let mut diff_err: Option<AdapterError> = None;
2887            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2888                let arena = RowArena::new();
2889                for (row, diff) in diffs {
2890                    if !diff.is_positive() {
2891                        continue;
2892                    }
2893                    let mut returning_row = Row::with_capacity(returning.len());
2894                    let mut packer = returning_row.packer();
2895                    for expr in &returning {
2896                        let datums: Vec<_> = row.iter().collect();
2897                        match expr.eval(&datums, &arena) {
2898                            Ok(datum) => {
2899                                packer.push(datum);
2900                            }
2901                            Err(err) => {
2902                                diff_err = Some(err.into());
2903                                break;
2904                            }
2905                        }
2906                    }
2907                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2908                    let diff = match NonZeroUsize::try_from(diff) {
2909                        Ok(diff) => diff,
2910                        Err(err) => {
2911                            diff_err = Some(err.into());
2912                            break;
2913                        }
2914                    };
2915                    returning_rows.push((returning_row, diff));
2916                    if diff_err.is_some() {
2917                        break;
2918                    }
2919                }
2920            }
2921            let diffs = if let Some(err) = diff_err {
2922                Err(err)
2923            } else {
2924                diffs
2925            };
2926
2927            // We need to clear out the timestamp context so the write doesn't fail due to a
2928            // read only transaction.
2929            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2930            // No matter what isolation level the client is using, we must linearize this
2931            // read. The write will be performed right after this, as part of a single
2932            // transaction, so the write must have a timestamp greater than or equal to the
2933            // read.
2934            //
2935            // Note: It's only OK for the write to have a greater timestamp than the read
2936            // because the write lock prevents any other writes from happening in between
2937            // the read and write.
2938            if let Some(timestamp_context) = timestamp_context {
2939                let (tx, rx) = tokio::sync::oneshot::channel();
2940                let conn_id = ctx.session().conn_id().clone();
2941                let pending_read_txn = PendingReadTxn {
2942                    txn: PendingRead::ReadThenWrite { ctx, tx },
2943                    timestamp_context,
2944                    created: Instant::now(),
2945                    num_requeues: 0,
2946                    otel_ctx: OpenTelemetryContext::obtain(),
2947                };
2948                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2949                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
2950                if let Err(e) = result {
2951                    warn!(
2952                        "strict_serializable_reads_tx dropped before we could send: {:?}",
2953                        e
2954                    );
2955                    return;
2956                }
2957                let result = rx.await;
2958                // It is not an error for these results to be ready after `tx` has been dropped.
2959                ctx = match result {
2960                    Ok(Some(ctx)) => ctx,
2961                    Ok(None) => {
2962                        // Coordinator took our context and will handle responding to the client.
2963                        // This usually indicates that our transaction was aborted.
2964                        return;
2965                    }
2966                    Err(e) => {
2967                        warn!(
2968                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2969                            e
2970                        );
2971                        return;
2972                    }
2973                };
2974            }
2975
2976            match diffs {
2977                Ok(diffs) => {
2978                    let result = Self::send_diffs(
2979                        ctx.session_mut(),
2980                        plan::SendDiffsPlan {
2981                            id,
2982                            updates: diffs,
2983                            kind,
2984                            returning: returning_rows,
2985                            max_result_size,
2986                        },
2987                    );
2988                    ctx.retire(result);
2989                }
2990                Err(e) => {
2991                    ctx.retire(Err(e));
2992                }
2993            }
2994        });
2995    }
2996
2997    #[instrument]
2998    pub(super) async fn sequence_alter_item_rename(
2999        &mut self,
3000        ctx: &mut ExecuteContext,
3001        plan: plan::AlterItemRenamePlan,
3002    ) -> Result<ExecuteResponse, AdapterError> {
3003        let op = catalog::Op::RenameItem {
3004            id: plan.id,
3005            current_full_name: plan.current_full_name,
3006            to_name: plan.to_name,
3007        };
3008        match self
3009            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3010            .await
3011        {
3012            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3013            Err(err) => Err(err),
3014        }
3015    }
3016
3017    #[instrument]
3018    pub(super) async fn sequence_alter_retain_history(
3019        &mut self,
3020        ctx: &mut ExecuteContext,
3021        plan: plan::AlterRetainHistoryPlan,
3022    ) -> Result<ExecuteResponse, AdapterError> {
3023        let ops = vec![catalog::Op::AlterRetainHistory {
3024            id: plan.id,
3025            value: plan.value,
3026            window: plan.window,
3027        }];
3028        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3029            Box::pin(async move {
3030                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3031                let cluster = match catalog_item {
3032                    CatalogItem::Table(_)
3033                    | CatalogItem::MaterializedView(_)
3034                    | CatalogItem::Source(_)
3035                    | CatalogItem::ContinualTask(_) => None,
3036                    CatalogItem::Index(index) => Some(index.cluster_id),
3037                    CatalogItem::Log(_)
3038                    | CatalogItem::View(_)
3039                    | CatalogItem::Sink(_)
3040                    | CatalogItem::Type(_)
3041                    | CatalogItem::Func(_)
3042                    | CatalogItem::Secret(_)
3043                    | CatalogItem::Connection(_) => unreachable!(),
3044                };
3045                match cluster {
3046                    Some(cluster) => {
3047                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3048                    }
3049                    None => {
3050                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3051                    }
3052                }
3053            })
3054        })
3055        .await?;
3056        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3057    }
3058
3059    #[instrument]
3060    pub(super) async fn sequence_alter_schema_rename(
3061        &mut self,
3062        ctx: &mut ExecuteContext,
3063        plan: plan::AlterSchemaRenamePlan,
3064    ) -> Result<ExecuteResponse, AdapterError> {
3065        let (database_spec, schema_spec) = plan.cur_schema_spec;
3066        let op = catalog::Op::RenameSchema {
3067            database_spec,
3068            schema_spec,
3069            new_name: plan.new_schema_name,
3070            check_reserved_names: true,
3071        };
3072        match self
3073            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3074            .await
3075        {
3076            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3077            Err(err) => Err(err),
3078        }
3079    }
3080
3081    #[instrument]
3082    pub(super) async fn sequence_alter_schema_swap(
3083        &mut self,
3084        ctx: &mut ExecuteContext,
3085        plan: plan::AlterSchemaSwapPlan,
3086    ) -> Result<ExecuteResponse, AdapterError> {
3087        let plan::AlterSchemaSwapPlan {
3088            schema_a_spec: (schema_a_db, schema_a),
3089            schema_a_name,
3090            schema_b_spec: (schema_b_db, schema_b),
3091            schema_b_name,
3092            name_temp,
3093        } = plan;
3094
3095        let op_a = catalog::Op::RenameSchema {
3096            database_spec: schema_a_db,
3097            schema_spec: schema_a,
3098            new_name: name_temp,
3099            check_reserved_names: false,
3100        };
3101        let op_b = catalog::Op::RenameSchema {
3102            database_spec: schema_b_db,
3103            schema_spec: schema_b,
3104            new_name: schema_a_name,
3105            check_reserved_names: false,
3106        };
3107        let op_c = catalog::Op::RenameSchema {
3108            database_spec: schema_a_db,
3109            schema_spec: schema_a,
3110            new_name: schema_b_name,
3111            check_reserved_names: false,
3112        };
3113
3114        match self
3115            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3116                Box::pin(async {})
3117            })
3118            .await
3119        {
3120            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3121            Err(err) => Err(err),
3122        }
3123    }
3124
3125    #[instrument]
3126    pub(super) async fn sequence_alter_role(
3127        &mut self,
3128        session: &Session,
3129        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3130    ) -> Result<ExecuteResponse, AdapterError> {
3131        let catalog = self.catalog().for_session(session);
3132        let role = catalog.get_role(&id);
3133
3134        // We'll send these notices to the user, if the operation is successful.
3135        let mut notices = vec![];
3136
3137        // Get the attributes and variables from the role, as they currently are.
3138        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3139        let mut vars = role.vars().clone();
3140
3141        // Whether to set the password to NULL. This is a special case since the existing
3142        // password is not stored in the role attributes.
3143        let mut nopassword = false;
3144
3145        // Apply our updates.
3146        match option {
3147            PlannedAlterRoleOption::Attributes(attrs) => {
3148                self.validate_role_attributes(&attrs.clone().into())?;
3149
3150                if let Some(inherit) = attrs.inherit {
3151                    attributes.inherit = inherit;
3152                }
3153
3154                if let Some(password) = attrs.password {
3155                    attributes.password = Some(password);
3156                    attributes.scram_iterations =
3157                        Some(self.catalog().system_config().scram_iterations())
3158                }
3159
3160                if let Some(superuser) = attrs.superuser {
3161                    attributes.superuser = Some(superuser);
3162                }
3163
3164                if let Some(login) = attrs.login {
3165                    attributes.login = Some(login);
3166                }
3167
3168                if attrs.nopassword.unwrap_or(false) {
3169                    nopassword = true;
3170                }
3171
3172                if let Some(notice) = self.should_emit_rbac_notice(session) {
3173                    notices.push(notice);
3174                }
3175            }
3176            PlannedAlterRoleOption::Variable(variable) => {
3177                // Get the variable to make sure it's valid and visible.
3178                let session_var = session.vars().inspect(variable.name())?;
3179                // Return early if it's not visible.
3180                session_var.visible(session.user(), catalog.system_vars())?;
3181
3182                // Emit a warning when deprecated variables are used.
3183                // TODO(database-issues#8069) remove this after sufficient time has passed
3184                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3185                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3186                } else if let PlannedRoleVariable::Set {
3187                    name,
3188                    value: VariableValue::Values(vals),
3189                } = &variable
3190                {
3191                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3192                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3193                    }
3194                }
3195
3196                let var_name = match variable {
3197                    PlannedRoleVariable::Set { name, value } => {
3198                        // Update our persisted set.
3199                        match &value {
3200                            VariableValue::Default => {
3201                                vars.remove(&name);
3202                            }
3203                            VariableValue::Values(vals) => {
3204                                let var = match &vals[..] {
3205                                    [val] => OwnedVarInput::Flat(val.clone()),
3206                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3207                                };
3208                                // Make sure the input is valid.
3209                                session_var.check(var.borrow())?;
3210
3211                                vars.insert(name.clone(), var);
3212                            }
3213                        };
3214                        name
3215                    }
3216                    PlannedRoleVariable::Reset { name } => {
3217                        // Remove it from our persisted values.
3218                        vars.remove(&name);
3219                        name
3220                    }
3221                };
3222
3223                // Emit a notice that they need to reconnect to see the change take effect.
3224                notices.push(AdapterNotice::VarDefaultUpdated {
3225                    role: Some(name.clone()),
3226                    var_name: Some(var_name),
3227                });
3228            }
3229        }
3230
3231        let op = catalog::Op::AlterRole {
3232            id,
3233            name,
3234            attributes,
3235            nopassword,
3236            vars: RoleVars { map: vars },
3237        };
3238        let response = self
3239            .catalog_transact(Some(session), vec![op])
3240            .await
3241            .map(|_| ExecuteResponse::AlteredRole)?;
3242
3243        // Send all of our queued notices.
3244        session.add_notices(notices);
3245
3246        Ok(response)
3247    }
3248
3249    #[instrument]
3250    pub(super) async fn sequence_alter_sink_prepare(
3251        &mut self,
3252        ctx: ExecuteContext,
3253        plan: plan::AlterSinkPlan,
3254    ) {
3255        // Put a read hold on the new relation
3256        let id_bundle = crate::CollectionIdBundle {
3257            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3258            compute_ids: BTreeMap::new(),
3259        };
3260        let read_hold = self.acquire_read_holds(&id_bundle);
3261
3262        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3263            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3264            return;
3265        };
3266
3267        let otel_ctx = OpenTelemetryContext::obtain();
3268        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3269
3270        let plan_validity = PlanValidity::new(
3271            self.catalog().transient_revision(),
3272            BTreeSet::from_iter([plan.item_id, from_item_id]),
3273            Some(plan.in_cluster),
3274            None,
3275            ctx.session().role_metadata().clone(),
3276        );
3277
3278        info!(
3279            "preparing alter sink for {}: frontiers={:?} export={:?}",
3280            plan.global_id,
3281            self.controller
3282                .storage_collections
3283                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3284            self.controller.storage.export(plan.global_id)
3285        );
3286
3287        // Now we must wait for the sink to make enough progress such that there is overlap between
3288        // the new `from` collection's read hold and the sink's write frontier.
3289        //
3290        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3291        // the watch set never completes and neither does the `ALTER SINK` command.
3292        self.install_storage_watch_set(
3293            ctx.session().conn_id().clone(),
3294            BTreeSet::from_iter([plan.global_id]),
3295            read_ts,
3296            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3297                ctx: Some(ctx),
3298                otel_ctx,
3299                plan,
3300                plan_validity,
3301                read_hold,
3302            }),
3303        );
3304    }
3305
3306    #[instrument]
3307    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3308        ctx.otel_ctx.attach_as_parent();
3309
3310        let plan::AlterSinkPlan {
3311            item_id,
3312            global_id,
3313            sink: sink_plan,
3314            with_snapshot,
3315            in_cluster,
3316        } = ctx.plan.clone();
3317
3318        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3319        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3320        // arbitrarily changed since we performed planning, and we must re-assert that it still
3321        // matches our requirements.
3322        //
3323        // The `PlanValidity` check ensures that both the sink and the new source relation still
3324        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3325        // time, which we do by comparing the catalog sink version to the one in the plan.
3326        match ctx.plan_validity.check(self.catalog()) {
3327            Ok(()) => {}
3328            Err(err) => {
3329                ctx.retire(Err(err));
3330                return;
3331            }
3332        }
3333
3334        let entry = self.catalog().get_entry(&item_id);
3335        let CatalogItem::Sink(old_sink) = entry.item() else {
3336            panic!("invalid item kind for `AlterSinkPlan`");
3337        };
3338
3339        if sink_plan.version != old_sink.version + 1 {
3340            ctx.retire(Err(AdapterError::ChangedPlan(
3341                "sink was altered concurrently".into(),
3342            )));
3343            return;
3344        }
3345
3346        info!(
3347            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3348            self.controller
3349                .storage_collections
3350                .collections_frontiers(vec![global_id, sink_plan.from]),
3351            self.controller.storage.export(global_id),
3352        );
3353
3354        // Assert that we can recover the updates that happened at the timestamps of the write
3355        // frontier. This must be true in this call.
3356        let write_frontier = &self
3357            .controller
3358            .storage
3359            .export(global_id)
3360            .expect("sink known to exist")
3361            .write_frontier;
3362        let as_of = ctx.read_hold.least_valid_read();
3363        assert!(
3364            write_frontier.iter().all(|t| as_of.less_than(t)),
3365            "{:?} should be strictly less than {:?}",
3366            &*as_of,
3367            &**write_frontier
3368        );
3369
3370        // Parse the `create_sql` so we can update it to the new sink definition.
3371        //
3372        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3373        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3374        // names in the `create_sql` may have changed, for example due to a schema swap.
3375        let create_sql = &old_sink.create_sql;
3376        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3377        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3378            unreachable!("invalid statement kind for sink");
3379        };
3380
3381        // Update the sink version.
3382        stmt.with_options
3383            .retain(|o| o.name != CreateSinkOptionName::Version);
3384        stmt.with_options.push(CreateSinkOption {
3385            name: CreateSinkOptionName::Version,
3386            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3387                sink_plan.version.to_string(),
3388            ))),
3389        });
3390
3391        let conn_catalog = self.catalog().for_system_session();
3392        let (mut stmt, resolved_ids) =
3393            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3394
3395        // Update the `from` relation.
3396        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3397        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3398        stmt.from = ResolvedItemName::Item {
3399            id: from_entry.id(),
3400            qualifiers: from_entry.name.qualifiers.clone(),
3401            full_name,
3402            print_id: true,
3403            version: from_entry.version,
3404        };
3405
3406        let new_sink = Sink {
3407            create_sql: stmt.to_ast_string_stable(),
3408            global_id,
3409            from: sink_plan.from,
3410            connection: sink_plan.connection.clone(),
3411            envelope: sink_plan.envelope,
3412            version: sink_plan.version,
3413            with_snapshot,
3414            resolved_ids: resolved_ids.clone(),
3415            cluster_id: in_cluster,
3416        };
3417
3418        let ops = vec![catalog::Op::UpdateItem {
3419            id: item_id,
3420            name: entry.name().clone(),
3421            to_item: CatalogItem::Sink(new_sink),
3422        }];
3423
3424        match self
3425            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3426            .await
3427        {
3428            Ok(()) => {}
3429            Err(err) => {
3430                ctx.retire(Err(err));
3431                return;
3432            }
3433        }
3434
3435        let storage_sink_desc = StorageSinkDesc {
3436            from: sink_plan.from,
3437            from_desc: from_entry
3438                .relation_desc()
3439                .expect("sinks can only be built on items with descs")
3440                .into_owned(),
3441            connection: sink_plan
3442                .connection
3443                .clone()
3444                .into_inline_connection(self.catalog().state()),
3445            envelope: sink_plan.envelope,
3446            as_of,
3447            with_snapshot,
3448            version: sink_plan.version,
3449            from_storage_metadata: (),
3450            to_storage_metadata: (),
3451        };
3452
3453        self.controller
3454            .storage
3455            .alter_export(
3456                global_id,
3457                ExportDescription {
3458                    sink: storage_sink_desc,
3459                    instance_id: in_cluster,
3460                },
3461            )
3462            .await
3463            .unwrap_or_terminate("cannot fail to alter source desc");
3464
3465        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3466    }
3467
3468    #[instrument]
3469    pub(super) async fn sequence_alter_connection(
3470        &mut self,
3471        ctx: ExecuteContext,
3472        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3473    ) {
3474        match action {
3475            AlterConnectionAction::RotateKeys => {
3476                self.sequence_rotate_keys(ctx, id).await;
3477            }
3478            AlterConnectionAction::AlterOptions {
3479                set_options,
3480                drop_options,
3481                validate,
3482            } => {
3483                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3484                    .await
3485            }
3486        }
3487    }
3488
3489    #[instrument]
3490    async fn sequence_alter_connection_options(
3491        &mut self,
3492        mut ctx: ExecuteContext,
3493        id: CatalogItemId,
3494        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3495        drop_options: BTreeSet<ConnectionOptionName>,
3496        validate: bool,
3497    ) {
3498        let cur_entry = self.catalog().get_entry(&id);
3499        let cur_conn = cur_entry.connection().expect("known to be connection");
3500        let connection_gid = cur_conn.global_id();
3501
3502        let inner = || -> Result<Connection, AdapterError> {
3503            // Parse statement.
3504            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3505                .expect("invalid create sql persisted to catalog")
3506                .into_element()
3507                .ast
3508            {
3509                Statement::CreateConnection(stmt) => stmt,
3510                _ => unreachable!("proved type is source"),
3511            };
3512
3513            let catalog = self.catalog().for_system_session();
3514
3515            // Resolve items in statement
3516            let (mut create_conn_stmt, resolved_ids) =
3517                mz_sql::names::resolve(&catalog, create_conn_stmt)
3518                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3519
3520            // Retain options that are neither set nor dropped.
3521            create_conn_stmt
3522                .values
3523                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3524
3525            // Set new values
3526            create_conn_stmt.values.extend(
3527                set_options
3528                    .into_iter()
3529                    .map(|(name, value)| ConnectionOption { name, value }),
3530            );
3531
3532            // Open a new catalog, which we will use to re-plan our
3533            // statement with the desired config.
3534            let mut catalog = self.catalog().for_system_session();
3535            catalog.mark_id_unresolvable_for_replanning(id);
3536
3537            // Re-define our source in terms of the amended statement
3538            let plan = match mz_sql::plan::plan(
3539                None,
3540                &catalog,
3541                Statement::CreateConnection(create_conn_stmt),
3542                &Params::empty(),
3543                &resolved_ids,
3544            )
3545            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3546            {
3547                Plan::CreateConnection(plan) => plan,
3548                _ => unreachable!("create source plan is only valid response"),
3549            };
3550
3551            // Parse statement.
3552            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3553                .expect("invalid create sql persisted to catalog")
3554                .into_element()
3555                .ast
3556            {
3557                Statement::CreateConnection(stmt) => stmt,
3558                _ => unreachable!("proved type is source"),
3559            };
3560
3561            let catalog = self.catalog().for_system_session();
3562
3563            // Resolve items in statement
3564            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3565                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3566
3567            Ok(Connection {
3568                create_sql: plan.connection.create_sql,
3569                global_id: cur_conn.global_id,
3570                details: plan.connection.details,
3571                resolved_ids: new_deps,
3572            })
3573        };
3574
3575        let conn = match inner() {
3576            Ok(conn) => conn,
3577            Err(e) => {
3578                return ctx.retire(Err(e));
3579            }
3580        };
3581
3582        if validate {
3583            let connection = conn
3584                .details
3585                .to_connection()
3586                .into_inline_connection(self.catalog().state());
3587
3588            let internal_cmd_tx = self.internal_cmd_tx.clone();
3589            let transient_revision = self.catalog().transient_revision();
3590            let conn_id = ctx.session().conn_id().clone();
3591            let otel_ctx = OpenTelemetryContext::obtain();
3592            let role_metadata = ctx.session().role_metadata().clone();
3593            let current_storage_parameters = self.controller.storage.config().clone();
3594
3595            task::spawn(
3596                || format!("validate_alter_connection:{conn_id}"),
3597                async move {
3598                    let resolved_ids = conn.resolved_ids.clone();
3599                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3600                    let result = match connection.validate(id, &current_storage_parameters).await {
3601                        Ok(()) => Ok(conn),
3602                        Err(err) => Err(err.into()),
3603                    };
3604
3605                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3606                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3607                        AlterConnectionValidationReady {
3608                            ctx,
3609                            result,
3610                            connection_id: id,
3611                            connection_gid,
3612                            plan_validity: PlanValidity::new(
3613                                transient_revision,
3614                                dependency_ids.clone(),
3615                                None,
3616                                None,
3617                                role_metadata,
3618                            ),
3619                            otel_ctx,
3620                            resolved_ids,
3621                        },
3622                    ));
3623                    if let Err(e) = result {
3624                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3625                    }
3626                },
3627            );
3628        } else {
3629            let result = self
3630                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3631                .await;
3632            ctx.retire(result);
3633        }
3634    }
3635
3636    #[instrument]
3637    pub(crate) async fn sequence_alter_connection_stage_finish(
3638        &mut self,
3639        session: &Session,
3640        id: CatalogItemId,
3641        connection: Connection,
3642    ) -> Result<ExecuteResponse, AdapterError> {
3643        match self.catalog.get_entry(&id).item() {
3644            CatalogItem::Connection(curr_conn) => {
3645                curr_conn
3646                    .details
3647                    .to_connection()
3648                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3649                    .map_err(StorageError::from)?;
3650            }
3651            _ => unreachable!("known to be a connection"),
3652        };
3653
3654        let ops = vec![catalog::Op::UpdateItem {
3655            id,
3656            name: self.catalog.get_entry(&id).name().clone(),
3657            to_item: CatalogItem::Connection(connection.clone()),
3658        }];
3659
3660        self.catalog_transact(Some(session), ops).await?;
3661
3662        match connection.details {
3663            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3664                let spec = VpcEndpointConfig {
3665                    aws_service_name: privatelink.service_name.to_owned(),
3666                    availability_zone_ids: privatelink.availability_zones.to_owned(),
3667                };
3668                self.cloud_resource_controller
3669                    .as_ref()
3670                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3671                    .ensure_vpc_endpoint(id, spec)
3672                    .await?;
3673            }
3674            _ => {}
3675        };
3676
3677        let entry = self.catalog().get_entry(&id);
3678
3679        let mut connections = VecDeque::new();
3680        connections.push_front(entry.id());
3681
3682        let mut source_connections = BTreeMap::new();
3683        let mut sink_connections = BTreeMap::new();
3684        let mut source_export_data_configs = BTreeMap::new();
3685
3686        while let Some(id) = connections.pop_front() {
3687            for id in self.catalog.get_entry(&id).used_by() {
3688                let entry = self.catalog.get_entry(id);
3689                match entry.item() {
3690                    CatalogItem::Connection(_) => connections.push_back(*id),
3691                    CatalogItem::Source(source) => {
3692                        let desc = match &entry.source().expect("known to be source").data_source {
3693                            DataSourceDesc::Ingestion { desc, .. }
3694                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3695                                desc.clone().into_inline_connection(self.catalog().state())
3696                            }
3697                            _ => unreachable!("only ingestions reference connections"),
3698                        };
3699
3700                        source_connections.insert(source.global_id, desc.connection);
3701                    }
3702                    CatalogItem::Sink(sink) => {
3703                        let export = entry.sink().expect("known to be sink");
3704                        sink_connections.insert(
3705                            sink.global_id,
3706                            export
3707                                .connection
3708                                .clone()
3709                                .into_inline_connection(self.catalog().state()),
3710                        );
3711                    }
3712                    CatalogItem::Table(table) => {
3713                        // This is a source-fed table that reference a schema registry
3714                        // connection as a part of its encoding / data config
3715                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
3716                            let data_config = export_data_config.clone();
3717                            source_export_data_configs.insert(
3718                                table.global_id_writes(),
3719                                data_config.into_inline_connection(self.catalog().state()),
3720                            );
3721                        }
3722                    }
3723                    t => unreachable!("connection dependency not expected on {:?}", t),
3724                }
3725            }
3726        }
3727
3728        if !source_connections.is_empty() {
3729            self.controller
3730                .storage
3731                .alter_ingestion_connections(source_connections)
3732                .await
3733                .unwrap_or_terminate("cannot fail to alter ingestion connection");
3734        }
3735
3736        if !sink_connections.is_empty() {
3737            self.controller
3738                .storage
3739                .alter_export_connections(sink_connections)
3740                .await
3741                .unwrap_or_terminate("altering exports after txn must succeed");
3742        }
3743
3744        if !source_export_data_configs.is_empty() {
3745            self.controller
3746                .storage
3747                .alter_ingestion_export_data_configs(source_export_data_configs)
3748                .await
3749                .unwrap_or_terminate("altering source export data configs after txn must succeed");
3750        }
3751
3752        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3753    }
3754
3755    #[instrument]
3756    pub(super) async fn sequence_alter_source(
3757        &mut self,
3758        session: &Session,
3759        plan::AlterSourcePlan {
3760            item_id,
3761            ingestion_id,
3762            action,
3763        }: plan::AlterSourcePlan,
3764    ) -> Result<ExecuteResponse, AdapterError> {
3765        let cur_entry = self.catalog().get_entry(&item_id);
3766        let cur_source = cur_entry.source().expect("known to be source");
3767
3768        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3769            // Parse statement.
3770            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3771                .expect("invalid create sql persisted to catalog")
3772                .into_element()
3773                .ast
3774            {
3775                Statement::CreateSource(stmt) => stmt,
3776                _ => unreachable!("proved type is source"),
3777            };
3778
3779            let catalog = coord.catalog().for_system_session();
3780
3781            // Resolve items in statement
3782            mz_sql::names::resolve(&catalog, create_source_stmt)
3783                .map_err(|e| AdapterError::internal(err_cx, e))
3784        };
3785
3786        match action {
3787            plan::AlterSourceAction::AddSubsourceExports {
3788                subsources,
3789                options,
3790            } => {
3791                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3792
3793                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3794                    text_columns: mut new_text_columns,
3795                    exclude_columns: mut new_exclude_columns,
3796                    ..
3797                } = options.try_into()?;
3798
3799                // Resolve items in statement
3800                let (mut create_source_stmt, resolved_ids) =
3801                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3802
3803                // Get all currently referred-to items
3804                let catalog = self.catalog();
3805                let curr_references: BTreeSet<_> = catalog
3806                    .get_entry(&item_id)
3807                    .used_by()
3808                    .into_iter()
3809                    .filter_map(|subsource| {
3810                        catalog
3811                            .get_entry(subsource)
3812                            .subsource_details()
3813                            .map(|(_id, reference, _details)| reference)
3814                    })
3815                    .collect();
3816
3817                // We are doing a lot of unwrapping, so just make an error to reference; all of
3818                // these invariants are guaranteed to be true because of how we plan subsources.
3819                let purification_err =
3820                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3821
3822                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3823                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3824                // statement.
3825                match &mut create_source_stmt.connection {
3826                    CreateSourceConnection::Postgres {
3827                        options: curr_options,
3828                        ..
3829                    } => {
3830                        let mz_sql::plan::PgConfigOptionExtracted {
3831                            mut text_columns, ..
3832                        } = curr_options.clone().try_into()?;
3833
3834                        // Drop text columns; we will add them back in
3835                        // as appropriate below.
3836                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3837
3838                        // Drop all text columns that are not currently referred to.
3839                        text_columns.retain(|column_qualified_reference| {
3840                            mz_ore::soft_assert_eq_or_log!(
3841                                column_qualified_reference.0.len(),
3842                                4,
3843                                "all TEXT COLUMNS values must be column-qualified references"
3844                            );
3845                            let mut table = column_qualified_reference.clone();
3846                            table.0.truncate(3);
3847                            curr_references.contains(&table)
3848                        });
3849
3850                        // Merge the current text columns into the new text columns.
3851                        new_text_columns.extend(text_columns);
3852
3853                        // If we have text columns, add them to the options.
3854                        if !new_text_columns.is_empty() {
3855                            new_text_columns.sort();
3856                            let new_text_columns = new_text_columns
3857                                .into_iter()
3858                                .map(WithOptionValue::UnresolvedItemName)
3859                                .collect();
3860
3861                            curr_options.push(PgConfigOption {
3862                                name: PgConfigOptionName::TextColumns,
3863                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3864                            });
3865                        }
3866                    }
3867                    CreateSourceConnection::MySql {
3868                        options: curr_options,
3869                        ..
3870                    } => {
3871                        let mz_sql::plan::MySqlConfigOptionExtracted {
3872                            mut text_columns,
3873                            mut exclude_columns,
3874                            ..
3875                        } = curr_options.clone().try_into()?;
3876
3877                        // Drop both ignore and text columns; we will add them back in
3878                        // as appropriate below.
3879                        curr_options.retain(|o| {
3880                            !matches!(
3881                                o.name,
3882                                MySqlConfigOptionName::TextColumns
3883                                    | MySqlConfigOptionName::ExcludeColumns
3884                            )
3885                        });
3886
3887                        // Drop all text / exclude columns that are not currently referred to.
3888                        let column_referenced =
3889                            |column_qualified_reference: &UnresolvedItemName| {
3890                                mz_ore::soft_assert_eq_or_log!(
3891                                    column_qualified_reference.0.len(),
3892                                    3,
3893                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3894                                );
3895                                let mut table = column_qualified_reference.clone();
3896                                table.0.truncate(2);
3897                                curr_references.contains(&table)
3898                            };
3899                        text_columns.retain(column_referenced);
3900                        exclude_columns.retain(column_referenced);
3901
3902                        // Merge the current text / exclude columns into the new text / exclude columns.
3903                        new_text_columns.extend(text_columns);
3904                        new_exclude_columns.extend(exclude_columns);
3905
3906                        // If we have text columns, add them to the options.
3907                        if !new_text_columns.is_empty() {
3908                            new_text_columns.sort();
3909                            let new_text_columns = new_text_columns
3910                                .into_iter()
3911                                .map(WithOptionValue::UnresolvedItemName)
3912                                .collect();
3913
3914                            curr_options.push(MySqlConfigOption {
3915                                name: MySqlConfigOptionName::TextColumns,
3916                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3917                            });
3918                        }
3919                        // If we have exclude columns, add them to the options.
3920                        if !new_exclude_columns.is_empty() {
3921                            new_exclude_columns.sort();
3922                            let new_exclude_columns = new_exclude_columns
3923                                .into_iter()
3924                                .map(WithOptionValue::UnresolvedItemName)
3925                                .collect();
3926
3927                            curr_options.push(MySqlConfigOption {
3928                                name: MySqlConfigOptionName::ExcludeColumns,
3929                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3930                            });
3931                        }
3932                    }
3933                    CreateSourceConnection::SqlServer {
3934                        options: curr_options,
3935                        ..
3936                    } => {
3937                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3938                            mut text_columns,
3939                            mut exclude_columns,
3940                            ..
3941                        } = curr_options.clone().try_into()?;
3942
3943                        // Drop both ignore and text columns; we will add them back in
3944                        // as appropriate below.
3945                        curr_options.retain(|o| {
3946                            !matches!(
3947                                o.name,
3948                                SqlServerConfigOptionName::TextColumns
3949                                    | SqlServerConfigOptionName::ExcludeColumns
3950                            )
3951                        });
3952
3953                        // Drop all text / exclude columns that are not currently referred to.
3954                        let column_referenced =
3955                            |column_qualified_reference: &UnresolvedItemName| {
3956                                mz_ore::soft_assert_eq_or_log!(
3957                                    column_qualified_reference.0.len(),
3958                                    3,
3959                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3960                                );
3961                                let mut table = column_qualified_reference.clone();
3962                                table.0.truncate(2);
3963                                curr_references.contains(&table)
3964                            };
3965                        text_columns.retain(column_referenced);
3966                        exclude_columns.retain(column_referenced);
3967
3968                        // Merge the current text / exclude columns into the new text / exclude columns.
3969                        new_text_columns.extend(text_columns);
3970                        new_exclude_columns.extend(exclude_columns);
3971
3972                        // If we have text columns, add them to the options.
3973                        if !new_text_columns.is_empty() {
3974                            new_text_columns.sort();
3975                            let new_text_columns = new_text_columns
3976                                .into_iter()
3977                                .map(WithOptionValue::UnresolvedItemName)
3978                                .collect();
3979
3980                            curr_options.push(SqlServerConfigOption {
3981                                name: SqlServerConfigOptionName::TextColumns,
3982                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3983                            });
3984                        }
3985                        // If we have exclude columns, add them to the options.
3986                        if !new_exclude_columns.is_empty() {
3987                            new_exclude_columns.sort();
3988                            let new_exclude_columns = new_exclude_columns
3989                                .into_iter()
3990                                .map(WithOptionValue::UnresolvedItemName)
3991                                .collect();
3992
3993                            curr_options.push(SqlServerConfigOption {
3994                                name: SqlServerConfigOptionName::ExcludeColumns,
3995                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3996                            });
3997                        }
3998                    }
3999                    _ => return Err(purification_err()),
4000                };
4001
4002                let mut catalog = self.catalog().for_system_session();
4003                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4004
4005                // Re-define our source in terms of the amended statement
4006                let plan = match mz_sql::plan::plan(
4007                    None,
4008                    &catalog,
4009                    Statement::CreateSource(create_source_stmt),
4010                    &Params::empty(),
4011                    &resolved_ids,
4012                )
4013                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4014                {
4015                    Plan::CreateSource(plan) => plan,
4016                    _ => unreachable!("create source plan is only valid response"),
4017                };
4018
4019                // Asserting that we've done the right thing with dependencies
4020                // here requires mocking out objects in the catalog, which is a
4021                // large task for an operation we have to cover in tests anyway.
4022                let source = Source::new(
4023                    plan,
4024                    cur_source.global_id,
4025                    resolved_ids,
4026                    cur_source.custom_logical_compaction_window,
4027                    cur_source.is_retained_metrics_object,
4028                );
4029
4030                // Get new ingestion description for storage.
4031                let desc = match &source.data_source {
4032                    DataSourceDesc::Ingestion { desc, .. }
4033                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4034                        desc.clone().into_inline_connection(self.catalog().state())
4035                    }
4036                    _ => unreachable!("already verified of type ingestion"),
4037                };
4038
4039                self.controller
4040                    .storage
4041                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4042                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4043
4044                // Redefine source. This must be done before we create any new
4045                // subsources so that it has the right ingestion.
4046                let mut ops = vec![catalog::Op::UpdateItem {
4047                    id: item_id,
4048                    // Look this up again so we don't have to hold an immutable reference to the
4049                    // entry for so long.
4050                    name: self.catalog.get_entry(&item_id).name().clone(),
4051                    to_item: CatalogItem::Source(source),
4052                }];
4053
4054                let CreateSourceInner {
4055                    ops: new_ops,
4056                    sources: _,
4057                    if_not_exists_ids,
4058                } = self.create_source_inner(session, subsources).await?;
4059
4060                ops.extend(new_ops.into_iter());
4061
4062                assert!(
4063                    if_not_exists_ids.is_empty(),
4064                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4065                );
4066
4067                self.catalog_transact(Some(session), ops).await?;
4068            }
4069            plan::AlterSourceAction::RefreshReferences { references } => {
4070                self.catalog_transact(
4071                    Some(session),
4072                    vec![catalog::Op::UpdateSourceReferences {
4073                        source_id: item_id,
4074                        references: references.into(),
4075                    }],
4076                )
4077                .await?;
4078            }
4079        }
4080
4081        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4082    }
4083
4084    #[instrument]
4085    pub(super) async fn sequence_alter_system_set(
4086        &mut self,
4087        session: &Session,
4088        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4089    ) -> Result<ExecuteResponse, AdapterError> {
4090        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4091        // We want to ensure that the network policy we're switching too actually exists.
4092        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4093            self.validate_alter_system_network_policy(session, &value)?;
4094        }
4095
4096        let op = match value {
4097            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4098                name: name.clone(),
4099                value: OwnedVarInput::SqlSet(values),
4100            },
4101            plan::VariableValue::Default => {
4102                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4103            }
4104        };
4105        self.catalog_transact(Some(session), vec![op]).await?;
4106
4107        session.add_notice(AdapterNotice::VarDefaultUpdated {
4108            role: None,
4109            var_name: Some(name),
4110        });
4111        Ok(ExecuteResponse::AlteredSystemConfiguration)
4112    }
4113
4114    #[instrument]
4115    pub(super) async fn sequence_alter_system_reset(
4116        &mut self,
4117        session: &Session,
4118        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4119    ) -> Result<ExecuteResponse, AdapterError> {
4120        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4121        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4122        self.catalog_transact(Some(session), vec![op]).await?;
4123        session.add_notice(AdapterNotice::VarDefaultUpdated {
4124            role: None,
4125            var_name: Some(name),
4126        });
4127        Ok(ExecuteResponse::AlteredSystemConfiguration)
4128    }
4129
4130    #[instrument]
4131    pub(super) async fn sequence_alter_system_reset_all(
4132        &mut self,
4133        session: &Session,
4134        _: plan::AlterSystemResetAllPlan,
4135    ) -> Result<ExecuteResponse, AdapterError> {
4136        self.is_user_allowed_to_alter_system(session, None)?;
4137        let op = catalog::Op::ResetAllSystemConfiguration;
4138        self.catalog_transact(Some(session), vec![op]).await?;
4139        session.add_notice(AdapterNotice::VarDefaultUpdated {
4140            role: None,
4141            var_name: None,
4142        });
4143        Ok(ExecuteResponse::AlteredSystemConfiguration)
4144    }
4145
4146    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4147    fn is_user_allowed_to_alter_system(
4148        &self,
4149        session: &Session,
4150        var_name: Option<&str>,
4151    ) -> Result<(), AdapterError> {
4152        match (session.user().kind(), var_name) {
4153            // Only internal superusers can reset all system variables.
4154            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4155            // Whether or not a variable can be modified depends if we're an internal superuser.
4156            (UserKind::Superuser, Some(name))
4157                if session.user().is_internal()
4158                    || self.catalog().system_config().user_modifiable(name) =>
4159            {
4160                // In lieu of plumbing the user to all system config functions, just check that
4161                // the var is visible.
4162                let var = self.catalog().system_config().get(name)?;
4163                var.visible(session.user(), self.catalog().system_config())?;
4164                Ok(())
4165            }
4166            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4167            // session variables.
4168            (UserKind::Regular, Some(name))
4169                if self.catalog().system_config().user_modifiable(name) =>
4170            {
4171                Err(AdapterError::Unauthorized(
4172                    rbac::UnauthorizedError::Superuser {
4173                        action: format!("toggle the '{name}' system configuration parameter"),
4174                    },
4175                ))
4176            }
4177            _ => Err(AdapterError::Unauthorized(
4178                rbac::UnauthorizedError::MzSystem {
4179                    action: "alter system".into(),
4180                },
4181            )),
4182        }
4183    }
4184
4185    fn validate_alter_system_network_policy(
4186        &self,
4187        session: &Session,
4188        policy_value: &plan::VariableValue,
4189    ) -> Result<(), AdapterError> {
4190        let policy_name = match &policy_value {
4191            // Make sure the compiled in default still exists.
4192            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4193            plan::VariableValue::Values(values) if values.len() == 1 => {
4194                values.iter().next().cloned()
4195            }
4196            plan::VariableValue::Values(values) => {
4197                tracing::warn!(?values, "can't set multiple network policies at once");
4198                None
4199            }
4200        };
4201        let maybe_network_policy = policy_name
4202            .as_ref()
4203            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4204        let Some(network_policy) = maybe_network_policy else {
4205            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4206                VarError::InvalidParameterValue {
4207                    name: NETWORK_POLICY.name(),
4208                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4209                    reason: "no network policy with such name exists".to_string(),
4210                },
4211            )));
4212        };
4213        self.validate_alter_network_policy(session, &network_policy.rules)
4214    }
4215
4216    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4217    ///
4218    /// This helps prevent users from modifying network policies in a way that would lock out their
4219    /// current connection.
4220    fn validate_alter_network_policy(
4221        &self,
4222        session: &Session,
4223        policy_rules: &Vec<NetworkPolicyRule>,
4224    ) -> Result<(), AdapterError> {
4225        // If the user is not an internal user attempt to protect them from
4226        // blocking themselves.
4227        if session.user().is_internal() {
4228            return Ok(());
4229        }
4230        if let Some(ip) = session.meta().client_ip() {
4231            validate_ip_with_policy_rules(ip, policy_rules)
4232                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4233        } else {
4234            // Sessions without IPs are only temporarily constructed for default values
4235            // they should not be permitted here.
4236            return Err(AdapterError::NetworkPolicyDenied(
4237                NetworkPolicyError::MissingIp,
4238            ));
4239        }
4240        Ok(())
4241    }
4242
4243    // Returns the name of the portal to execute.
4244    #[instrument]
4245    pub(super) fn sequence_execute(
4246        &self,
4247        session: &mut Session,
4248        plan: plan::ExecutePlan,
4249    ) -> Result<String, AdapterError> {
4250        // Verify the stmt is still valid.
4251        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4252        let ps = session
4253            .get_prepared_statement_unverified(&plan.name)
4254            .expect("known to exist");
4255        let stmt = ps.stmt().cloned();
4256        let desc = ps.desc().clone();
4257        let state_revision = ps.state_revision;
4258        let logging = Arc::clone(ps.logging());
4259        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4260    }
4261
4262    #[instrument]
4263    pub(super) async fn sequence_grant_privileges(
4264        &mut self,
4265        session: &Session,
4266        plan::GrantPrivilegesPlan {
4267            update_privileges,
4268            grantees,
4269        }: plan::GrantPrivilegesPlan,
4270    ) -> Result<ExecuteResponse, AdapterError> {
4271        self.sequence_update_privileges(
4272            session,
4273            update_privileges,
4274            grantees,
4275            UpdatePrivilegeVariant::Grant,
4276        )
4277        .await
4278    }
4279
4280    #[instrument]
4281    pub(super) async fn sequence_revoke_privileges(
4282        &mut self,
4283        session: &Session,
4284        plan::RevokePrivilegesPlan {
4285            update_privileges,
4286            revokees,
4287        }: plan::RevokePrivilegesPlan,
4288    ) -> Result<ExecuteResponse, AdapterError> {
4289        self.sequence_update_privileges(
4290            session,
4291            update_privileges,
4292            revokees,
4293            UpdatePrivilegeVariant::Revoke,
4294        )
4295        .await
4296    }
4297
4298    #[instrument]
4299    async fn sequence_update_privileges(
4300        &mut self,
4301        session: &Session,
4302        update_privileges: Vec<UpdatePrivilege>,
4303        grantees: Vec<RoleId>,
4304        variant: UpdatePrivilegeVariant,
4305    ) -> Result<ExecuteResponse, AdapterError> {
4306        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4307        let mut warnings = Vec::new();
4308        let catalog = self.catalog().for_session(session);
4309
4310        for UpdatePrivilege {
4311            acl_mode,
4312            target_id,
4313            grantor,
4314        } in update_privileges
4315        {
4316            let actual_object_type = catalog.get_system_object_type(&target_id);
4317            // For all relations we allow all applicable table privileges, but send a warning if the
4318            // privilege isn't actually applicable to the object type.
4319            if actual_object_type.is_relation() {
4320                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4321                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4322                if !non_applicable_privileges.is_empty() {
4323                    let object_description =
4324                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4325                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4326                        non_applicable_privileges,
4327                        object_description,
4328                    })
4329                }
4330            }
4331
4332            if let SystemObjectId::Object(object_id) = &target_id {
4333                self.catalog()
4334                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4335            }
4336
4337            let privileges = self
4338                .catalog()
4339                .get_privileges(&target_id, session.conn_id())
4340                // Should be unreachable since the parser will refuse to parse grant/revoke
4341                // statements on objects without privileges.
4342                .ok_or(AdapterError::Unsupported(
4343                    "GRANTs/REVOKEs on an object type with no privileges",
4344                ))?;
4345
4346            for grantee in &grantees {
4347                self.catalog().ensure_not_system_role(grantee)?;
4348                self.catalog().ensure_not_predefined_role(grantee)?;
4349                let existing_privilege = privileges
4350                    .get_acl_item(grantee, &grantor)
4351                    .map(Cow::Borrowed)
4352                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4353
4354                match variant {
4355                    UpdatePrivilegeVariant::Grant
4356                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4357                    {
4358                        ops.push(catalog::Op::UpdatePrivilege {
4359                            target_id: target_id.clone(),
4360                            privilege: MzAclItem {
4361                                grantee: *grantee,
4362                                grantor,
4363                                acl_mode,
4364                            },
4365                            variant,
4366                        });
4367                    }
4368                    UpdatePrivilegeVariant::Revoke
4369                        if !existing_privilege
4370                            .acl_mode
4371                            .intersection(acl_mode)
4372                            .is_empty() =>
4373                    {
4374                        ops.push(catalog::Op::UpdatePrivilege {
4375                            target_id: target_id.clone(),
4376                            privilege: MzAclItem {
4377                                grantee: *grantee,
4378                                grantor,
4379                                acl_mode,
4380                            },
4381                            variant,
4382                        });
4383                    }
4384                    // no-op
4385                    _ => {}
4386                }
4387            }
4388        }
4389
4390        if ops.is_empty() {
4391            session.add_notices(warnings);
4392            return Ok(variant.into());
4393        }
4394
4395        let res = self
4396            .catalog_transact(Some(session), ops)
4397            .await
4398            .map(|_| match variant {
4399                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4400                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4401            });
4402        if res.is_ok() {
4403            session.add_notices(warnings);
4404        }
4405        res
4406    }
4407
4408    #[instrument]
4409    pub(super) async fn sequence_alter_default_privileges(
4410        &mut self,
4411        session: &Session,
4412        plan::AlterDefaultPrivilegesPlan {
4413            privilege_objects,
4414            privilege_acl_items,
4415            is_grant,
4416        }: plan::AlterDefaultPrivilegesPlan,
4417    ) -> Result<ExecuteResponse, AdapterError> {
4418        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4419        let variant = if is_grant {
4420            UpdatePrivilegeVariant::Grant
4421        } else {
4422            UpdatePrivilegeVariant::Revoke
4423        };
4424        for privilege_object in &privilege_objects {
4425            self.catalog()
4426                .ensure_not_system_role(&privilege_object.role_id)?;
4427            self.catalog()
4428                .ensure_not_predefined_role(&privilege_object.role_id)?;
4429            if let Some(database_id) = privilege_object.database_id {
4430                self.catalog()
4431                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4432            }
4433            if let Some(schema_id) = privilege_object.schema_id {
4434                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4435                let schema_spec: SchemaSpecifier = schema_id.into();
4436
4437                self.catalog().ensure_not_reserved_object(
4438                    &(database_spec, schema_spec).into(),
4439                    session.conn_id(),
4440                )?;
4441            }
4442            for privilege_acl_item in &privilege_acl_items {
4443                self.catalog()
4444                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4445                self.catalog()
4446                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4447                ops.push(catalog::Op::UpdateDefaultPrivilege {
4448                    privilege_object: privilege_object.clone(),
4449                    privilege_acl_item: privilege_acl_item.clone(),
4450                    variant,
4451                })
4452            }
4453        }
4454
4455        self.catalog_transact(Some(session), ops).await?;
4456        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4457    }
4458
4459    #[instrument]
4460    pub(super) async fn sequence_grant_role(
4461        &mut self,
4462        session: &Session,
4463        plan::GrantRolePlan {
4464            role_ids,
4465            member_ids,
4466            grantor_id,
4467        }: plan::GrantRolePlan,
4468    ) -> Result<ExecuteResponse, AdapterError> {
4469        let catalog = self.catalog();
4470        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4471        for role_id in role_ids {
4472            for member_id in &member_ids {
4473                let member_membership: BTreeSet<_> =
4474                    catalog.get_role(member_id).membership().keys().collect();
4475                if member_membership.contains(&role_id) {
4476                    let role_name = catalog.get_role(&role_id).name().to_string();
4477                    let member_name = catalog.get_role(member_id).name().to_string();
4478                    // We need this check so we don't accidentally return a success on a reserved role.
4479                    catalog.ensure_not_reserved_role(member_id)?;
4480                    catalog.ensure_grantable_role(&role_id)?;
4481                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4482                        role_name,
4483                        member_name,
4484                    });
4485                } else {
4486                    ops.push(catalog::Op::GrantRole {
4487                        role_id,
4488                        member_id: *member_id,
4489                        grantor_id,
4490                    });
4491                }
4492            }
4493        }
4494
4495        if ops.is_empty() {
4496            return Ok(ExecuteResponse::GrantedRole);
4497        }
4498
4499        self.catalog_transact(Some(session), ops)
4500            .await
4501            .map(|_| ExecuteResponse::GrantedRole)
4502    }
4503
4504    #[instrument]
4505    pub(super) async fn sequence_revoke_role(
4506        &mut self,
4507        session: &Session,
4508        plan::RevokeRolePlan {
4509            role_ids,
4510            member_ids,
4511            grantor_id,
4512        }: plan::RevokeRolePlan,
4513    ) -> Result<ExecuteResponse, AdapterError> {
4514        let catalog = self.catalog();
4515        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4516        for role_id in role_ids {
4517            for member_id in &member_ids {
4518                let member_membership: BTreeSet<_> =
4519                    catalog.get_role(member_id).membership().keys().collect();
4520                if !member_membership.contains(&role_id) {
4521                    let role_name = catalog.get_role(&role_id).name().to_string();
4522                    let member_name = catalog.get_role(member_id).name().to_string();
4523                    // We need this check so we don't accidentally return a success on a reserved role.
4524                    catalog.ensure_not_reserved_role(member_id)?;
4525                    catalog.ensure_grantable_role(&role_id)?;
4526                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4527                        role_name,
4528                        member_name,
4529                    });
4530                } else {
4531                    ops.push(catalog::Op::RevokeRole {
4532                        role_id,
4533                        member_id: *member_id,
4534                        grantor_id,
4535                    });
4536                }
4537            }
4538        }
4539
4540        if ops.is_empty() {
4541            return Ok(ExecuteResponse::RevokedRole);
4542        }
4543
4544        self.catalog_transact(Some(session), ops)
4545            .await
4546            .map(|_| ExecuteResponse::RevokedRole)
4547    }
4548
4549    #[instrument]
4550    pub(super) async fn sequence_alter_owner(
4551        &mut self,
4552        session: &Session,
4553        plan::AlterOwnerPlan {
4554            id,
4555            object_type,
4556            new_owner,
4557        }: plan::AlterOwnerPlan,
4558    ) -> Result<ExecuteResponse, AdapterError> {
4559        let mut ops = vec![catalog::Op::UpdateOwner {
4560            id: id.clone(),
4561            new_owner,
4562        }];
4563
4564        match &id {
4565            ObjectId::Item(global_id) => {
4566                let entry = self.catalog().get_entry(global_id);
4567
4568                // Cannot directly change the owner of an index.
4569                if entry.is_index() {
4570                    let name = self
4571                        .catalog()
4572                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4573                        .to_string();
4574                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4575                    return Ok(ExecuteResponse::AlteredObject(object_type));
4576                }
4577
4578                // Alter owner cascades down to dependent indexes.
4579                let dependent_index_ops = entry
4580                    .used_by()
4581                    .into_iter()
4582                    .filter(|id| self.catalog().get_entry(id).is_index())
4583                    .map(|id| catalog::Op::UpdateOwner {
4584                        id: ObjectId::Item(*id),
4585                        new_owner,
4586                    });
4587                ops.extend(dependent_index_ops);
4588
4589                // Alter owner cascades down to progress collections.
4590                let dependent_subsources =
4591                    entry
4592                        .progress_id()
4593                        .into_iter()
4594                        .map(|item_id| catalog::Op::UpdateOwner {
4595                            id: ObjectId::Item(item_id),
4596                            new_owner,
4597                        });
4598                ops.extend(dependent_subsources);
4599            }
4600            ObjectId::Cluster(cluster_id) => {
4601                let cluster = self.catalog().get_cluster(*cluster_id);
4602                // Alter owner cascades down to cluster replicas.
4603                let managed_cluster_replica_ops =
4604                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4605                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4606                        new_owner,
4607                    });
4608                ops.extend(managed_cluster_replica_ops);
4609            }
4610            _ => {}
4611        }
4612
4613        self.catalog_transact(Some(session), ops)
4614            .await
4615            .map(|_| ExecuteResponse::AlteredObject(object_type))
4616    }
4617
4618    #[instrument]
4619    pub(super) async fn sequence_reassign_owned(
4620        &mut self,
4621        session: &Session,
4622        plan::ReassignOwnedPlan {
4623            old_roles,
4624            new_role,
4625            reassign_ids,
4626        }: plan::ReassignOwnedPlan,
4627    ) -> Result<ExecuteResponse, AdapterError> {
4628        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4629            self.catalog().ensure_not_reserved_role(role_id)?;
4630        }
4631
4632        let ops = reassign_ids
4633            .into_iter()
4634            .map(|id| catalog::Op::UpdateOwner {
4635                id,
4636                new_owner: new_role,
4637            })
4638            .collect();
4639
4640        self.catalog_transact(Some(session), ops)
4641            .await
4642            .map(|_| ExecuteResponse::ReassignOwned)
4643    }
4644
4645    #[instrument]
4646    pub(crate) async fn handle_deferred_statement(&mut self) {
4647        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4648        // was processed, removing the single element from deferred_statements, so it is expected
4649        // that this is sometimes empty.
4650        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4651            return;
4652        };
4653        match ps {
4654            crate::coord::PlanStatement::Statement { stmt, params } => {
4655                self.handle_execute_inner(stmt, params, ctx).await;
4656            }
4657            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4658                self.sequence_plan(ctx, plan, resolved_ids).await;
4659            }
4660        }
4661    }
4662
4663    #[instrument]
4664    // TODO(parkmycar): Remove this once we have an actual implementation.
4665    #[allow(clippy::unused_async)]
4666    pub(super) async fn sequence_alter_table(
4667        &mut self,
4668        ctx: &mut ExecuteContext,
4669        plan: plan::AlterTablePlan,
4670    ) -> Result<ExecuteResponse, AdapterError> {
4671        let plan::AlterTablePlan {
4672            relation_id,
4673            column_name,
4674            column_type,
4675            raw_sql_type,
4676        } = plan;
4677
4678        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4679        let id_ts = self.get_catalog_write_ts().await;
4680        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4681        let ops = vec![catalog::Op::AlterAddColumn {
4682            id: relation_id,
4683            new_global_id,
4684            name: column_name,
4685            typ: column_type,
4686            sql: raw_sql_type,
4687        }];
4688
4689        self.catalog_transact_with_context(None, Some(ctx), ops)
4690            .await?;
4691
4692        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4693    }
4694
4695    #[instrument]
4696    pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4697        &mut self,
4698        ctx: &ExecuteContext,
4699        plan: AlterMaterializedViewApplyReplacementPlan,
4700    ) -> Result<ExecuteResponse, AdapterError> {
4701        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4702
4703        // TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
4704        // new MV's as-of, to ensure no times are skipped.
4705
4706        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4707        self.catalog_transact(Some(ctx.session()), ops).await?;
4708
4709        Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4710    }
4711
4712    pub(super) async fn statistics_oracle(
4713        &self,
4714        session: &Session,
4715        source_ids: &BTreeSet<GlobalId>,
4716        query_as_of: &Antichain<Timestamp>,
4717        is_oneshot: bool,
4718    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4719        super::statistics_oracle(
4720            session,
4721            source_ids,
4722            query_as_of,
4723            is_oneshot,
4724            self.catalog().system_config(),
4725            self.controller.storage_collections.as_ref(),
4726        )
4727        .await
4728    }
4729}
4730
4731impl Coordinator {
4732    /// Process the metainfo from a newly created non-transient dataflow.
4733    async fn process_dataflow_metainfo(
4734        &mut self,
4735        df_meta: DataflowMetainfo,
4736        export_id: GlobalId,
4737        ctx: Option<&mut ExecuteContext>,
4738        notice_ids: Vec<GlobalId>,
4739    ) -> Option<BuiltinTableAppendNotify> {
4740        // Emit raw notices to the user.
4741        if let Some(ctx) = ctx {
4742            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4743        }
4744
4745        // Create a metainfo with rendered notices.
4746        let df_meta = self
4747            .catalog()
4748            .render_notices(df_meta, notice_ids, Some(export_id));
4749
4750        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4751        // in-memory state.
4752        if self.catalog().state().system_config().enable_mz_notices()
4753            && !df_meta.optimizer_notices.is_empty()
4754        {
4755            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4756            self.catalog().state().pack_optimizer_notices(
4757                &mut builtin_table_updates,
4758                df_meta.optimizer_notices.iter(),
4759                Diff::ONE,
4760            );
4761
4762            // Save the metainfo.
4763            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4764
4765            Some(
4766                self.builtin_table_update()
4767                    .execute(builtin_table_updates)
4768                    .await
4769                    .0,
4770            )
4771        } else {
4772            // Save the metainfo.
4773            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4774
4775            None
4776        }
4777    }
4778}