mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_cloud_resources::VpcEndpointConfig;
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43    RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48    SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58    SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62    StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
103    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
104    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
105    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
106    validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114    WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119mod cluster;
120mod copy_from;
121mod create_continual_task;
122mod create_index;
123mod create_materialized_view;
124mod create_view;
125mod explain_timestamp;
126mod peek;
127mod secret;
128mod subscribe;
129
130/// Attempts to evaluate an expression. If an error is returned then the error is sent
131/// to the client and the function is exited.
132macro_rules! return_if_err {
133    ($expr:expr, $ctx:expr) => {
134        match $expr {
135            Ok(v) => v,
136            Err(e) => return $ctx.retire(Err(e.into())),
137        }
138    };
139}
140
141pub(super) use return_if_err;
142
143struct DropOps {
144    ops: Vec<catalog::Op>,
145    dropped_active_db: bool,
146    dropped_active_cluster: bool,
147    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
148}
149
150// A bundle of values returned from create_source_inner
151struct CreateSourceInner {
152    ops: Vec<catalog::Op>,
153    sources: Vec<(CatalogItemId, Source)>,
154    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
155}
156
157impl Coordinator {
158    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
159    /// execute both on and off of the coordinator thread. Stages can either produce another stage
160    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
161    /// tracing.
162    pub(crate) async fn sequence_staged<S>(
163        &mut self,
164        mut ctx: S::Ctx,
165        parent_span: Span,
166        mut stage: S,
167    ) where
168        S: Staged + 'static,
169        S::Ctx: Send + 'static,
170    {
171        return_if_err!(stage.validity().check(self.catalog()), ctx);
172        loop {
173            let mut cancel_enabled = stage.cancel_enabled();
174            if let Some(session) = ctx.session() {
175                if cancel_enabled {
176                    // Channel to await cancellation. Insert a new channel, but check if the previous one
177                    // was already canceled.
178                    if let Some((_prev_tx, prev_rx)) = self
179                        .staged_cancellation
180                        .insert(session.conn_id().clone(), watch::channel(false))
181                    {
182                        let was_canceled = *prev_rx.borrow();
183                        if was_canceled {
184                            ctx.retire(Err(AdapterError::Canceled));
185                            return;
186                        }
187                    }
188                } else {
189                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
190                    // when cancel_enabled may have been true on an earlier stage.
191                    self.staged_cancellation.remove(session.conn_id());
192                }
193            } else {
194                cancel_enabled = false
195            };
196            let next = stage
197                .stage(self, &mut ctx)
198                .instrument(parent_span.clone())
199                .await;
200            let res = return_if_err!(next, ctx);
201            stage = match res {
202                StageResult::Handle(handle) => {
203                    let internal_cmd_tx = self.internal_cmd_tx.clone();
204                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
205                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
206                    });
207                    return;
208                }
209                StageResult::HandleRetire(handle) => {
210                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
211                        ctx.retire(Ok(resp));
212                    });
213                    return;
214                }
215                StageResult::Response(resp) => {
216                    ctx.retire(Ok(resp));
217                    return;
218                }
219                StageResult::Immediate(stage) => *stage,
220            }
221        }
222    }
223
224    fn handle_spawn<C, T, F>(
225        &self,
226        ctx: C,
227        handle: JoinHandle<Result<T, AdapterError>>,
228        cancel_enabled: bool,
229        f: F,
230    ) where
231        C: StagedContext + Send + 'static,
232        T: Send + 'static,
233        F: FnOnce(C, T) + Send + 'static,
234    {
235        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
236            .session()
237            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
238        {
239            let mut rx = rx.clone();
240            Box::pin(async move {
241                // Wait for true or dropped sender.
242                let _ = rx.wait_for(|v| *v).await;
243                ()
244            })
245        } else {
246            Box::pin(future::pending())
247        };
248        spawn(|| "sequence_staged", async move {
249            tokio::select! {
250                res = handle => {
251                    let next = return_if_err!(res, ctx);
252                    f(ctx, next);
253                }
254                _ = rx, if cancel_enabled => {
255                    ctx.retire(Err(AdapterError::Canceled));
256                }
257            }
258        });
259    }
260
261    async fn create_source_inner(
262        &self,
263        session: &Session,
264        plans: Vec<plan::CreateSourcePlanBundle>,
265    ) -> Result<CreateSourceInner, AdapterError> {
266        let mut ops = vec![];
267        let mut sources = vec![];
268
269        let if_not_exists_ids = plans
270            .iter()
271            .filter_map(
272                |plan::CreateSourcePlanBundle {
273                     item_id,
274                     global_id: _,
275                     plan,
276                     resolved_ids: _,
277                     available_source_references: _,
278                 }| {
279                    if plan.if_not_exists {
280                        Some((*item_id, plan.name.clone()))
281                    } else {
282                        None
283                    }
284                },
285            )
286            .collect::<BTreeMap<_, _>>();
287
288        for plan::CreateSourcePlanBundle {
289            item_id,
290            global_id,
291            mut plan,
292            resolved_ids,
293            available_source_references,
294        } in plans
295        {
296            let name = plan.name.clone();
297
298            match plan.source.data_source {
299                plan::DataSourceDesc::Ingestion(ref desc)
300                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
301                    let cluster_id = plan
302                        .in_cluster
303                        .expect("ingestion plans must specify cluster");
304                    match desc.connection {
305                        GenericSourceConnection::Postgres(_)
306                        | GenericSourceConnection::MySql(_)
307                        | GenericSourceConnection::SqlServer(_)
308                        | GenericSourceConnection::Kafka(_)
309                        | GenericSourceConnection::LoadGenerator(_) => {
310                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
311                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
312                                    .get(self.catalog().system_config().dyncfgs());
313
314                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
315                                {
316                                    return Err(AdapterError::Unsupported(
317                                        "sources in clusters with >1 replicas",
318                                    ));
319                                }
320                            }
321                        }
322                    }
323                }
324                plan::DataSourceDesc::Webhook { .. } => {
325                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
326                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
327                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
328                            .get(self.catalog().system_config().dyncfgs());
329
330                        if !enable_multi_replica_sources {
331                            if cluster.replica_ids().len() > 1 {
332                                return Err(AdapterError::Unsupported(
333                                    "webhook sources in clusters with >1 replicas",
334                                ));
335                            }
336                        }
337                    }
338                }
339                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
340            }
341
342            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
343            if let mz_sql::plan::DataSourceDesc::Webhook {
344                validate_using: Some(validate),
345                ..
346            } = &mut plan.source.data_source
347            {
348                if let Err(reason) = validate.reduce_expression().await {
349                    self.metrics
350                        .webhook_validation_reduce_failures
351                        .with_label_values(&[reason])
352                        .inc();
353                    return Err(AdapterError::Internal(format!(
354                        "failed to reduce check expression, {reason}"
355                    )));
356                }
357            }
358
359            // If this source contained a set of available source references, update the
360            // source references catalog table.
361            let mut reference_ops = vec![];
362            if let Some(references) = &available_source_references {
363                reference_ops.push(catalog::Op::UpdateSourceReferences {
364                    source_id: item_id,
365                    references: references.clone().into(),
366                });
367            }
368
369            let source = Source::new(plan, global_id, resolved_ids, None, false);
370            ops.push(catalog::Op::CreateItem {
371                id: item_id,
372                name,
373                item: CatalogItem::Source(source.clone()),
374                owner_id: *session.current_role_id(),
375            });
376            sources.push((item_id, source));
377            // These operations must be executed after the source is added to the catalog.
378            ops.extend(reference_ops);
379        }
380
381        Ok(CreateSourceInner {
382            ops,
383            sources,
384            if_not_exists_ids,
385        })
386    }
387
388    /// Subsources are planned differently from other statements because they
389    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
390    /// Because of this, we have usually "missed" the opportunity to plan them
391    /// through the normal statement execution life cycle (the exception being
392    /// during bootstrapping).
393    ///
394    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
395    pub(crate) fn plan_subsource(
396        &self,
397        session: &Session,
398        params: &mz_sql::plan::Params,
399        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
400        item_id: CatalogItemId,
401        global_id: GlobalId,
402    ) -> Result<CreateSourcePlanBundle, AdapterError> {
403        let catalog = self.catalog().for_session(session);
404        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
405
406        let plan = self.plan_statement(
407            session,
408            Statement::CreateSubsource(subsource_stmt),
409            params,
410            &resolved_ids,
411        )?;
412        let plan = match plan {
413            Plan::CreateSource(plan) => plan,
414            _ => unreachable!(),
415        };
416        Ok(CreateSourcePlanBundle {
417            item_id,
418            global_id,
419            plan,
420            resolved_ids,
421            available_source_references: None,
422        })
423    }
424
425    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
426    pub(crate) async fn plan_purified_alter_source_add_subsource(
427        &mut self,
428        session: &Session,
429        params: Params,
430        source_name: ResolvedItemName,
431        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
432        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
433    ) -> Result<(Plan, ResolvedIds), AdapterError> {
434        let mut subsource_plans = Vec::with_capacity(subsources.len());
435
436        // Generate subsource statements
437        let conn_catalog = self.catalog().for_system_session();
438        let pcx = plan::PlanContext::zero();
439        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
440
441        let entry = self.catalog().get_entry(source_name.item_id());
442        let source = entry.source().ok_or_else(|| {
443            AdapterError::internal(
444                "plan alter source",
445                format!("expected Source found {entry:?}"),
446            )
447        })?;
448
449        let item_id = entry.id();
450        let ingestion_id = source.global_id();
451        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
452
453        let id_ts = self.get_catalog_write_ts().await;
454        let ids = self
455            .catalog()
456            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
457            .await?;
458        for (subsource_stmt, (item_id, global_id)) in
459            subsource_stmts.into_iter().zip_eq(ids.into_iter())
460        {
461            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
462            subsource_plans.push(s);
463        }
464
465        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
466            subsources: subsource_plans,
467            options,
468        };
469
470        Ok((
471            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
472                item_id,
473                ingestion_id,
474                action,
475            }),
476            ResolvedIds::empty(),
477        ))
478    }
479
480    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
481    pub(crate) fn plan_purified_alter_source_refresh_references(
482        &self,
483        _session: &Session,
484        _params: Params,
485        source_name: ResolvedItemName,
486        available_source_references: plan::SourceReferences,
487    ) -> Result<(Plan, ResolvedIds), AdapterError> {
488        let entry = self.catalog().get_entry(source_name.item_id());
489        let source = entry.source().ok_or_else(|| {
490            AdapterError::internal(
491                "plan alter source",
492                format!("expected Source found {entry:?}"),
493            )
494        })?;
495        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
496            references: available_source_references,
497        };
498
499        Ok((
500            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
501                item_id: entry.id(),
502                ingestion_id: source.global_id(),
503                action,
504            }),
505            ResolvedIds::empty(),
506        ))
507    }
508
509    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
510    /// the primary source, and any ingestion export subsources (e.g. PG
511    /// tables).
512    pub(crate) async fn plan_purified_create_source(
513        &mut self,
514        ctx: &ExecuteContext,
515        params: Params,
516        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
517        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
518        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
519        available_source_references: plan::SourceReferences,
520    ) -> Result<(Plan, ResolvedIds), AdapterError> {
521        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
522
523        // 1. First plan the progress subsource, if any.
524        if let Some(progress_stmt) = progress_stmt {
525            // The primary source depends on this subsource because the primary
526            // source needs to know its shard ID, and the easiest way of
527            // guaranteeing that the shard ID is discoverable is to create this
528            // collection first.
529            assert_none!(progress_stmt.of_source);
530            let id_ts = self.get_catalog_write_ts().await;
531            let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
532            let progress_plan =
533                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
534            let progress_full_name = self
535                .catalog()
536                .resolve_full_name(&progress_plan.plan.name, None);
537            let progress_subsource = ResolvedItemName::Item {
538                id: progress_plan.item_id,
539                qualifiers: progress_plan.plan.name.qualifiers.clone(),
540                full_name: progress_full_name,
541                print_id: true,
542                version: RelationVersionSelector::Latest,
543            };
544
545            create_source_plans.push(progress_plan);
546
547            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548        }
549
550        let catalog = self.catalog().for_session(ctx.session());
551        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553        let propagated_with_options: Vec<_> = source_stmt
554            .with_options
555            .iter()
556            .filter_map(|opt| match opt.name {
557                CreateSourceOptionName::TimestampInterval => None,
558                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559                    name: CreateSubsourceOptionName::RetainHistory,
560                    value: opt.value.clone(),
561                }),
562            })
563            .collect();
564
565        // 2. Then plan the main source.
566        let source_plan = match self.plan_statement(
567            ctx.session(),
568            Statement::CreateSource(source_stmt),
569            &params,
570            &resolved_ids,
571        )? {
572            Plan::CreateSource(plan) => plan,
573            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574        };
575
576        let id_ts = self.get_catalog_write_ts().await;
577        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
578
579        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
580        let of_source = ResolvedItemName::Item {
581            id: item_id,
582            qualifiers: source_plan.name.qualifiers.clone(),
583            full_name: source_full_name,
584            print_id: true,
585            version: RelationVersionSelector::Latest,
586        };
587
588        // Generate subsource statements
589        let conn_catalog = self.catalog().for_system_session();
590        let pcx = plan::PlanContext::zero();
591        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
592
593        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
594
595        for subsource_stmt in subsource_stmts.iter_mut() {
596            subsource_stmt
597                .with_options
598                .extend(propagated_with_options.iter().cloned())
599        }
600
601        create_source_plans.push(CreateSourcePlanBundle {
602            item_id,
603            global_id,
604            plan: source_plan,
605            resolved_ids: resolved_ids.clone(),
606            available_source_references: Some(available_source_references),
607        });
608
609        // 3. Finally, plan all the subsources
610        let id_ts = self.get_catalog_write_ts().await;
611        let ids = self
612            .catalog()
613            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
614            .await?;
615        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
616            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
617            create_source_plans.push(plan);
618        }
619
620        Ok((
621            Plan::CreateSources(create_source_plans),
622            ResolvedIds::empty(),
623        ))
624    }
625
626    #[instrument]
627    pub(super) async fn sequence_create_source(
628        &mut self,
629        ctx: &mut ExecuteContext,
630        plans: Vec<plan::CreateSourcePlanBundle>,
631    ) -> Result<ExecuteResponse, AdapterError> {
632        let CreateSourceInner {
633            ops,
634            sources,
635            if_not_exists_ids,
636        } = self.create_source_inner(ctx.session(), plans).await?;
637
638        let transact_result = self
639            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
640            .await;
641
642        // Check if any sources are webhook sources and report them as created.
643        for (item_id, source) in &sources {
644            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
645                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
646                    ctx.session()
647                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
648                }
649            }
650        }
651
652        match transact_result {
653            Ok(()) => Ok(ExecuteResponse::CreatedSource),
654            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
655                kind:
656                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
657            })) if if_not_exists_ids.contains_key(&id) => {
658                ctx.session()
659                    .add_notice(AdapterNotice::ObjectAlreadyExists {
660                        name: if_not_exists_ids[&id].item.clone(),
661                        ty: "source",
662                    });
663                Ok(ExecuteResponse::CreatedSource)
664            }
665            Err(err) => Err(err),
666        }
667    }
668
669    #[instrument]
670    pub(super) async fn sequence_create_connection(
671        &mut self,
672        mut ctx: ExecuteContext,
673        plan: plan::CreateConnectionPlan,
674        resolved_ids: ResolvedIds,
675    ) {
676        let id_ts = self.get_catalog_write_ts().await;
677        let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
678            Ok(item_id) => item_id,
679            Err(err) => return ctx.retire(Err(err.into())),
680        };
681
682        match &plan.connection.details {
683            ConnectionDetails::Ssh { key_1, key_2, .. } => {
684                let key_1 = match key_1.as_key_pair() {
685                    Some(key_1) => key_1.clone(),
686                    None => {
687                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
688                            "the PUBLIC KEY 1 option cannot be explicitly specified"
689                        ))));
690                    }
691                };
692
693                let key_2 = match key_2.as_key_pair() {
694                    Some(key_2) => key_2.clone(),
695                    None => {
696                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
697                            "the PUBLIC KEY 2 option cannot be explicitly specified"
698                        ))));
699                    }
700                };
701
702                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
703                let secret = key_set.to_bytes();
704                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
705                    return ctx.retire(Err(err.into()));
706                }
707            }
708            _ => (),
709        };
710
711        if plan.validate {
712            let internal_cmd_tx = self.internal_cmd_tx.clone();
713            let transient_revision = self.catalog().transient_revision();
714            let conn_id = ctx.session().conn_id().clone();
715            let otel_ctx = OpenTelemetryContext::obtain();
716            let role_metadata = ctx.session().role_metadata().clone();
717
718            let connection = plan
719                .connection
720                .details
721                .to_connection()
722                .into_inline_connection(self.catalog().state());
723
724            let current_storage_parameters = self.controller.storage.config().clone();
725            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
726                let result = match connection
727                    .validate(connection_id, &current_storage_parameters)
728                    .await
729                {
730                    Ok(()) => Ok(plan),
731                    Err(err) => Err(err.into()),
732                };
733
734                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
735                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
736                    CreateConnectionValidationReady {
737                        ctx,
738                        result,
739                        connection_id,
740                        connection_gid,
741                        plan_validity: PlanValidity::new(
742                            transient_revision,
743                            resolved_ids.items().copied().collect(),
744                            None,
745                            None,
746                            role_metadata,
747                        ),
748                        otel_ctx,
749                        resolved_ids: resolved_ids.clone(),
750                    },
751                ));
752                if let Err(e) = result {
753                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
754                }
755            });
756        } else {
757            let result = self
758                .sequence_create_connection_stage_finish(
759                    &mut ctx,
760                    connection_id,
761                    connection_gid,
762                    plan,
763                    resolved_ids,
764                )
765                .await;
766            ctx.retire(result);
767        }
768    }
769
770    #[instrument]
771    pub(crate) async fn sequence_create_connection_stage_finish(
772        &mut self,
773        ctx: &mut ExecuteContext,
774        connection_id: CatalogItemId,
775        connection_gid: GlobalId,
776        plan: plan::CreateConnectionPlan,
777        resolved_ids: ResolvedIds,
778    ) -> Result<ExecuteResponse, AdapterError> {
779        let ops = vec![catalog::Op::CreateItem {
780            id: connection_id,
781            name: plan.name.clone(),
782            item: CatalogItem::Connection(Connection {
783                create_sql: plan.connection.create_sql,
784                global_id: connection_gid,
785                details: plan.connection.details.clone(),
786                resolved_ids,
787            }),
788            owner_id: *ctx.session().current_role_id(),
789        }];
790
791        let transact_result = self
792            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
793                Box::pin(async move {
794                    match plan.connection.details {
795                        ConnectionDetails::AwsPrivatelink(ref privatelink) => {
796                            let spec = VpcEndpointConfig {
797                                aws_service_name: privatelink.service_name.to_owned(),
798                                availability_zone_ids: privatelink.availability_zones.to_owned(),
799                            };
800                            let cloud_resource_controller =
801                                match coord.cloud_resource_controller.as_ref().cloned() {
802                                    Some(controller) => controller,
803                                    None => {
804                                        tracing::warn!("AWS PrivateLink connections unsupported");
805                                        return;
806                                    }
807                                };
808                            if let Err(err) = cloud_resource_controller
809                                .ensure_vpc_endpoint(connection_id, spec)
810                                .await
811                            {
812                                tracing::warn!(?err, "failed to ensure vpc endpoint!");
813                            }
814                        }
815                        _ => {}
816                    }
817                })
818            })
819            .await;
820
821        match transact_result {
822            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
823            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
824                kind:
825                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
826            })) if plan.if_not_exists => {
827                ctx.session()
828                    .add_notice(AdapterNotice::ObjectAlreadyExists {
829                        name: plan.name.item,
830                        ty: "connection",
831                    });
832                Ok(ExecuteResponse::CreatedConnection)
833            }
834            Err(err) => Err(err),
835        }
836    }
837
838    #[instrument]
839    pub(super) async fn sequence_create_database(
840        &mut self,
841        session: &mut Session,
842        plan: plan::CreateDatabasePlan,
843    ) -> Result<ExecuteResponse, AdapterError> {
844        let ops = vec![catalog::Op::CreateDatabase {
845            name: plan.name.clone(),
846            owner_id: *session.current_role_id(),
847        }];
848        match self.catalog_transact(Some(session), ops).await {
849            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
850            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851                kind:
852                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
853            })) if plan.if_not_exists => {
854                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
855                Ok(ExecuteResponse::CreatedDatabase)
856            }
857            Err(err) => Err(err),
858        }
859    }
860
861    #[instrument]
862    pub(super) async fn sequence_create_schema(
863        &mut self,
864        session: &mut Session,
865        plan: plan::CreateSchemaPlan,
866    ) -> Result<ExecuteResponse, AdapterError> {
867        let op = catalog::Op::CreateSchema {
868            database_id: plan.database_spec,
869            schema_name: plan.schema_name.clone(),
870            owner_id: *session.current_role_id(),
871        };
872        match self.catalog_transact(Some(session), vec![op]).await {
873            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
874            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
875                kind:
876                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
877            })) if plan.if_not_exists => {
878                session.add_notice(AdapterNotice::SchemaAlreadyExists {
879                    name: plan.schema_name,
880                });
881                Ok(ExecuteResponse::CreatedSchema)
882            }
883            Err(err) => Err(err),
884        }
885    }
886
887    /// Validates the role attributes for a `CREATE ROLE` statement.
888    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
889        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
890            if attributes.superuser.is_some()
891                || attributes.password.is_some()
892                || attributes.login.is_some()
893            {
894                return Err(AdapterError::UnavailableFeature {
895                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
896                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
897                });
898            }
899        }
900        Ok(())
901    }
902
903    #[instrument]
904    pub(super) async fn sequence_create_role(
905        &mut self,
906        conn_id: Option<&ConnectionId>,
907        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
908    ) -> Result<ExecuteResponse, AdapterError> {
909        self.validate_role_attributes(&attributes.clone())?;
910        let op = catalog::Op::CreateRole { name, attributes };
911        self.catalog_transact_with_context(conn_id, None, vec![op])
912            .await
913            .map(|_| ExecuteResponse::CreatedRole)
914    }
915
916    #[instrument]
917    pub(super) async fn sequence_create_network_policy(
918        &mut self,
919        session: &Session,
920        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
921    ) -> Result<ExecuteResponse, AdapterError> {
922        let op = catalog::Op::CreateNetworkPolicy {
923            rules,
924            name,
925            owner_id: *session.current_role_id(),
926        };
927        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
928            .await
929            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
930    }
931
932    #[instrument]
933    pub(super) async fn sequence_alter_network_policy(
934        &mut self,
935        session: &Session,
936        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
937    ) -> Result<ExecuteResponse, AdapterError> {
938        // TODO(network_policy): Consider role based network policies here.
939        let current_network_policy_name =
940            self.catalog().system_config().default_network_policy_name();
941        // Check if the way we're alerting the policy is still valid for the current connection.
942        if current_network_policy_name == name {
943            self.validate_alter_network_policy(session, &rules)?;
944        }
945
946        let op = catalog::Op::AlterNetworkPolicy {
947            id,
948            rules,
949            name,
950            owner_id: *session.current_role_id(),
951        };
952        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
953            .await
954            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
955    }
956
957    #[instrument]
958    pub(super) async fn sequence_create_table(
959        &mut self,
960        ctx: &mut ExecuteContext,
961        plan: plan::CreateTablePlan,
962        resolved_ids: ResolvedIds,
963    ) -> Result<ExecuteResponse, AdapterError> {
964        let plan::CreateTablePlan {
965            name,
966            table,
967            if_not_exists,
968        } = plan;
969
970        let conn_id = if table.temporary {
971            Some(ctx.session().conn_id())
972        } else {
973            None
974        };
975        let id_ts = self.get_catalog_write_ts().await;
976        let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
977        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
978
979        let data_source = match table.data_source {
980            plan::TableDataSource::TableWrites { defaults } => {
981                TableDataSource::TableWrites { defaults }
982            }
983            plan::TableDataSource::DataSource {
984                desc: data_source_plan,
985                timeline,
986            } => match data_source_plan {
987                plan::DataSourceDesc::IngestionExport {
988                    ingestion_id,
989                    external_reference,
990                    details,
991                    data_config,
992                } => TableDataSource::DataSource {
993                    desc: DataSourceDesc::IngestionExport {
994                        ingestion_id,
995                        external_reference,
996                        details,
997                        data_config,
998                    },
999                    timeline,
1000                },
1001                plan::DataSourceDesc::Webhook {
1002                    validate_using,
1003                    body_format,
1004                    headers,
1005                    cluster_id,
1006                } => TableDataSource::DataSource {
1007                    desc: DataSourceDesc::Webhook {
1008                        validate_using,
1009                        body_format,
1010                        headers,
1011                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1012                    },
1013                    timeline,
1014                },
1015                o => {
1016                    unreachable!("CREATE TABLE data source got {:?}", o)
1017                }
1018            },
1019        };
1020
1021        let is_webhook = if let TableDataSource::DataSource {
1022            desc: DataSourceDesc::Webhook { .. },
1023            timeline: _,
1024        } = &data_source
1025        {
1026            true
1027        } else {
1028            false
1029        };
1030
1031        let table = Table {
1032            create_sql: Some(table.create_sql),
1033            desc: table.desc,
1034            collections,
1035            conn_id: conn_id.cloned(),
1036            resolved_ids,
1037            custom_logical_compaction_window: table.compaction_window,
1038            is_retained_metrics_object: false,
1039            data_source,
1040        };
1041        let ops = vec![catalog::Op::CreateItem {
1042            id: table_id,
1043            name: name.clone(),
1044            item: CatalogItem::Table(table.clone()),
1045            owner_id: *ctx.session().current_role_id(),
1046        }];
1047
1048        let catalog_result = self
1049            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1050            .await;
1051
1052        if is_webhook {
1053            // try_get_webhook_url will make up a URL for things that are not
1054            // webhooks, so we guard against that here.
1055            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1056                ctx.session()
1057                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1058            }
1059        }
1060
1061        match catalog_result {
1062            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1063            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1064                kind:
1065                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1066            })) if if_not_exists => {
1067                ctx.session_mut()
1068                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1069                        name: name.item,
1070                        ty: "table",
1071                    });
1072                Ok(ExecuteResponse::CreatedTable)
1073            }
1074            Err(err) => Err(err),
1075        }
1076    }
1077
1078    #[instrument]
1079    pub(super) async fn sequence_create_sink(
1080        &mut self,
1081        ctx: ExecuteContext,
1082        plan: plan::CreateSinkPlan,
1083        resolved_ids: ResolvedIds,
1084    ) {
1085        let plan::CreateSinkPlan {
1086            name,
1087            sink,
1088            with_snapshot,
1089            if_not_exists,
1090            in_cluster,
1091        } = plan;
1092
1093        // First try to allocate an ID and an OID. If either fails, we're done.
1094        let id_ts = self.get_catalog_write_ts().await;
1095        let (item_id, global_id) =
1096            return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1097
1098        let catalog_sink = Sink {
1099            create_sql: sink.create_sql,
1100            global_id,
1101            from: sink.from,
1102            connection: sink.connection,
1103            envelope: sink.envelope,
1104            version: sink.version,
1105            with_snapshot,
1106            resolved_ids,
1107            cluster_id: in_cluster,
1108        };
1109
1110        let ops = vec![catalog::Op::CreateItem {
1111            id: item_id,
1112            name: name.clone(),
1113            item: CatalogItem::Sink(catalog_sink.clone()),
1114            owner_id: *ctx.session().current_role_id(),
1115        }];
1116
1117        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1118
1119        match result {
1120            Ok(()) => {}
1121            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1122                kind:
1123                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1124            })) if if_not_exists => {
1125                ctx.session()
1126                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1127                        name: name.item,
1128                        ty: "sink",
1129                    });
1130                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1131                return;
1132            }
1133            Err(e) => {
1134                ctx.retire(Err(e));
1135                return;
1136            }
1137        };
1138
1139        self.create_storage_export(global_id, &catalog_sink)
1140            .await
1141            .unwrap_or_terminate("cannot fail to create exports");
1142
1143        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1144            .await;
1145
1146        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1147    }
1148
1149    /// Validates that a view definition does not contain any expressions that may lead to
1150    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1151    ///
1152    /// We prevent these expressions so that we can add columns to system tables without
1153    /// changing the definition of the view.
1154    ///
1155    /// Here is a bit of a hand wavy proof as to why we only need to check the
1156    /// immediate view definition for system objects and ambiguous column
1157    /// references, and not the entire dependency tree:
1158    ///
1159    ///   - A view with no object references cannot have any ambiguous column
1160    ///   references to a system object, because it has no system objects.
1161    ///   - A view with a direct reference to a system object and a * or
1162    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1163    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1164    ///   any ambiguous column references to a system object, because all column
1165    ///   references are explicitly named.
1166    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1167    ///   system object cannot have any ambiguous column references to a system
1168    ///   object, because there are no system objects in the top level view and
1169    ///   all sub-views are guaranteed to have no ambiguous column references to
1170    ///   system objects.
1171    pub(super) fn validate_system_column_references(
1172        &self,
1173        uses_ambiguous_columns: bool,
1174        depends_on: &BTreeSet<GlobalId>,
1175    ) -> Result<(), AdapterError> {
1176        if uses_ambiguous_columns
1177            && depends_on
1178                .iter()
1179                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1180        {
1181            Err(AdapterError::AmbiguousSystemColumnReference)
1182        } else {
1183            Ok(())
1184        }
1185    }
1186
1187    #[instrument]
1188    pub(super) async fn sequence_create_type(
1189        &mut self,
1190        session: &Session,
1191        plan: plan::CreateTypePlan,
1192        resolved_ids: ResolvedIds,
1193    ) -> Result<ExecuteResponse, AdapterError> {
1194        let id_ts = self.get_catalog_write_ts().await;
1195        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1196        // Validate the type definition (e.g., composite columns) before storing.
1197        plan.typ
1198            .inner
1199            .desc(&self.catalog().for_session(session))
1200            .map_err(AdapterError::from)?;
1201        let typ = Type {
1202            create_sql: Some(plan.typ.create_sql),
1203            global_id,
1204            details: CatalogTypeDetails {
1205                array_id: None,
1206                typ: plan.typ.inner,
1207                pg_metadata: None,
1208            },
1209            resolved_ids,
1210        };
1211        let op = catalog::Op::CreateItem {
1212            id: item_id,
1213            name: plan.name,
1214            item: CatalogItem::Type(typ),
1215            owner_id: *session.current_role_id(),
1216        };
1217        match self.catalog_transact(Some(session), vec![op]).await {
1218            Ok(()) => Ok(ExecuteResponse::CreatedType),
1219            Err(err) => Err(err),
1220        }
1221    }
1222
1223    #[instrument]
1224    pub(super) async fn sequence_comment_on(
1225        &mut self,
1226        session: &Session,
1227        plan: plan::CommentPlan,
1228    ) -> Result<ExecuteResponse, AdapterError> {
1229        let op = catalog::Op::Comment {
1230            object_id: plan.object_id,
1231            sub_component: plan.sub_component,
1232            comment: plan.comment,
1233        };
1234        self.catalog_transact(Some(session), vec![op]).await?;
1235        Ok(ExecuteResponse::Comment)
1236    }
1237
1238    #[instrument]
1239    pub(super) async fn sequence_drop_objects(
1240        &mut self,
1241        ctx: &mut ExecuteContext,
1242        plan::DropObjectsPlan {
1243            drop_ids,
1244            object_type,
1245            referenced_ids,
1246        }: plan::DropObjectsPlan,
1247    ) -> Result<ExecuteResponse, AdapterError> {
1248        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1249        let mut objects = Vec::new();
1250        for obj_id in &drop_ids {
1251            if !referenced_ids_hashset.contains(obj_id) {
1252                let object_info = ErrorMessageObjectDescription::from_id(
1253                    obj_id,
1254                    &self.catalog().for_session(ctx.session()),
1255                )
1256                .to_string();
1257                objects.push(object_info);
1258            }
1259        }
1260
1261        if !objects.is_empty() {
1262            ctx.session()
1263                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1264        }
1265
1266        let DropOps {
1267            ops,
1268            dropped_active_db,
1269            dropped_active_cluster,
1270            dropped_in_use_indexes,
1271        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1272
1273        self.catalog_transact_with_context(None, Some(ctx), ops)
1274            .await?;
1275
1276        fail::fail_point!("after_sequencer_drop_replica");
1277
1278        if dropped_active_db {
1279            ctx.session()
1280                .add_notice(AdapterNotice::DroppedActiveDatabase {
1281                    name: ctx.session().vars().database().to_string(),
1282                });
1283        }
1284        if dropped_active_cluster {
1285            ctx.session()
1286                .add_notice(AdapterNotice::DroppedActiveCluster {
1287                    name: ctx.session().vars().cluster().to_string(),
1288                });
1289        }
1290        for dropped_in_use_index in dropped_in_use_indexes {
1291            ctx.session()
1292                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1293            self.metrics
1294                .optimization_notices
1295                .with_label_values(&["DroppedInUseIndex"])
1296                .inc_by(1);
1297        }
1298        Ok(ExecuteResponse::DroppedObject(object_type))
1299    }
1300
1301    fn validate_dropped_role_ownership(
1302        &self,
1303        session: &Session,
1304        dropped_roles: &BTreeMap<RoleId, &str>,
1305    ) -> Result<(), AdapterError> {
1306        fn privilege_check(
1307            privileges: &PrivilegeMap,
1308            dropped_roles: &BTreeMap<RoleId, &str>,
1309            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1310            object_id: &SystemObjectId,
1311            catalog: &ConnCatalog,
1312        ) {
1313            for privilege in privileges.all_values() {
1314                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1315                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1316                    let object_description =
1317                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1318                    dependent_objects
1319                        .entry(role_name.to_string())
1320                        .or_default()
1321                        .push(format!(
1322                            "privileges on {object_description} granted by {grantor_name}",
1323                        ));
1324                }
1325                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1326                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1327                    let object_description =
1328                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1329                    dependent_objects
1330                        .entry(role_name.to_string())
1331                        .or_default()
1332                        .push(format!(
1333                            "privileges granted on {object_description} to {grantee_name}"
1334                        ));
1335                }
1336            }
1337        }
1338
1339        let catalog = self.catalog().for_session(session);
1340        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1341        for entry in self.catalog.entries() {
1342            let id = SystemObjectId::Object(entry.id().into());
1343            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1344                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1345                dependent_objects
1346                    .entry(role_name.to_string())
1347                    .or_default()
1348                    .push(format!("owner of {object_description}"));
1349            }
1350            privilege_check(
1351                entry.privileges(),
1352                dropped_roles,
1353                &mut dependent_objects,
1354                &id,
1355                &catalog,
1356            );
1357        }
1358        for database in self.catalog.databases() {
1359            let database_id = SystemObjectId::Object(database.id().into());
1360            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1361                let object_description =
1362                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1363                dependent_objects
1364                    .entry(role_name.to_string())
1365                    .or_default()
1366                    .push(format!("owner of {object_description}"));
1367            }
1368            privilege_check(
1369                &database.privileges,
1370                dropped_roles,
1371                &mut dependent_objects,
1372                &database_id,
1373                &catalog,
1374            );
1375            for schema in database.schemas_by_id.values() {
1376                let schema_id = SystemObjectId::Object(
1377                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1378                );
1379                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1380                    let object_description =
1381                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1382                    dependent_objects
1383                        .entry(role_name.to_string())
1384                        .or_default()
1385                        .push(format!("owner of {object_description}"));
1386                }
1387                privilege_check(
1388                    &schema.privileges,
1389                    dropped_roles,
1390                    &mut dependent_objects,
1391                    &schema_id,
1392                    &catalog,
1393                );
1394            }
1395        }
1396        for cluster in self.catalog.clusters() {
1397            let cluster_id = SystemObjectId::Object(cluster.id().into());
1398            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1399                let object_description =
1400                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1401                dependent_objects
1402                    .entry(role_name.to_string())
1403                    .or_default()
1404                    .push(format!("owner of {object_description}"));
1405            }
1406            privilege_check(
1407                &cluster.privileges,
1408                dropped_roles,
1409                &mut dependent_objects,
1410                &cluster_id,
1411                &catalog,
1412            );
1413            for replica in cluster.replicas() {
1414                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1415                    let replica_id =
1416                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1417                    let object_description =
1418                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1419                    dependent_objects
1420                        .entry(role_name.to_string())
1421                        .or_default()
1422                        .push(format!("owner of {object_description}"));
1423                }
1424            }
1425        }
1426        privilege_check(
1427            self.catalog().system_privileges(),
1428            dropped_roles,
1429            &mut dependent_objects,
1430            &SystemObjectId::System,
1431            &catalog,
1432        );
1433        for (default_privilege_object, default_privilege_acl_items) in
1434            self.catalog.default_privileges()
1435        {
1436            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1437                dependent_objects
1438                    .entry(role_name.to_string())
1439                    .or_default()
1440                    .push(format!(
1441                        "default privileges on {}S created by {}",
1442                        default_privilege_object.object_type, role_name
1443                    ));
1444            }
1445            for default_privilege_acl_item in default_privilege_acl_items {
1446                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1447                    dependent_objects
1448                        .entry(role_name.to_string())
1449                        .or_default()
1450                        .push(format!(
1451                            "default privileges on {}S granted to {}",
1452                            default_privilege_object.object_type, role_name
1453                        ));
1454                }
1455            }
1456        }
1457
1458        if !dependent_objects.is_empty() {
1459            Err(AdapterError::DependentObject(dependent_objects))
1460        } else {
1461            Ok(())
1462        }
1463    }
1464
1465    #[instrument]
1466    pub(super) async fn sequence_drop_owned(
1467        &mut self,
1468        session: &Session,
1469        plan: plan::DropOwnedPlan,
1470    ) -> Result<ExecuteResponse, AdapterError> {
1471        for role_id in &plan.role_ids {
1472            self.catalog().ensure_not_reserved_role(role_id)?;
1473        }
1474
1475        let mut privilege_revokes = plan.privilege_revokes;
1476
1477        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1478        let session_catalog = self.catalog().for_session(session);
1479        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1480            && !session.is_superuser()
1481        {
1482            // Obtain all roles that the current session is a member of.
1483            let role_membership =
1484                session_catalog.collect_role_membership(session.current_role_id());
1485            let invalid_revokes: BTreeSet<_> = privilege_revokes
1486                .extract_if(.., |(_, privilege)| {
1487                    !role_membership.contains(&privilege.grantor)
1488                })
1489                .map(|(object_id, _)| object_id)
1490                .collect();
1491            for invalid_revoke in invalid_revokes {
1492                let object_description =
1493                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1494                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1495            }
1496        }
1497
1498        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1499            catalog::Op::UpdatePrivilege {
1500                target_id: object_id,
1501                privilege,
1502                variant: UpdatePrivilegeVariant::Revoke,
1503            }
1504        });
1505        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1506            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1507                privilege_object,
1508                privilege_acl_item,
1509                variant: UpdatePrivilegeVariant::Revoke,
1510            },
1511        );
1512        let DropOps {
1513            ops: drop_ops,
1514            dropped_active_db,
1515            dropped_active_cluster,
1516            dropped_in_use_indexes,
1517        } = self.sequence_drop_common(session, plan.drop_ids)?;
1518
1519        let ops = privilege_revoke_ops
1520            .chain(default_privilege_revoke_ops)
1521            .chain(drop_ops.into_iter())
1522            .collect();
1523
1524        self.catalog_transact(Some(session), ops).await?;
1525
1526        if dropped_active_db {
1527            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1528                name: session.vars().database().to_string(),
1529            });
1530        }
1531        if dropped_active_cluster {
1532            session.add_notice(AdapterNotice::DroppedActiveCluster {
1533                name: session.vars().cluster().to_string(),
1534            });
1535        }
1536        for dropped_in_use_index in dropped_in_use_indexes {
1537            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1538        }
1539        Ok(ExecuteResponse::DroppedOwned)
1540    }
1541
1542    fn sequence_drop_common(
1543        &self,
1544        session: &Session,
1545        ids: Vec<ObjectId>,
1546    ) -> Result<DropOps, AdapterError> {
1547        let mut dropped_active_db = false;
1548        let mut dropped_active_cluster = false;
1549        let mut dropped_in_use_indexes = Vec::new();
1550        let mut dropped_roles = BTreeMap::new();
1551        let mut dropped_databases = BTreeSet::new();
1552        let mut dropped_schemas = BTreeSet::new();
1553        // Dropping either the group role or the member role of a role membership will trigger a
1554        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1555        // role membership twice.
1556        let mut role_revokes = BTreeSet::new();
1557        // Dropping a database or a schema will revoke all default roles associated with that
1558        // database or schema.
1559        let mut default_privilege_revokes = BTreeSet::new();
1560
1561        // Clusters we're dropping
1562        let mut clusters_to_drop = BTreeSet::new();
1563
1564        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1565        for id in &ids {
1566            match id {
1567                ObjectId::Database(id) => {
1568                    let name = self.catalog().get_database(id).name();
1569                    if name == session.vars().database() {
1570                        dropped_active_db = true;
1571                    }
1572                    dropped_databases.insert(id);
1573                }
1574                ObjectId::Schema((_, spec)) => {
1575                    if let SchemaSpecifier::Id(id) = spec {
1576                        dropped_schemas.insert(id);
1577                    }
1578                }
1579                ObjectId::Cluster(id) => {
1580                    clusters_to_drop.insert(*id);
1581                    if let Some(active_id) = self
1582                        .catalog()
1583                        .active_cluster(session)
1584                        .ok()
1585                        .map(|cluster| cluster.id())
1586                    {
1587                        if id == &active_id {
1588                            dropped_active_cluster = true;
1589                        }
1590                    }
1591                }
1592                ObjectId::Role(id) => {
1593                    let role = self.catalog().get_role(id);
1594                    let name = role.name();
1595                    dropped_roles.insert(*id, name);
1596                    // We must revoke all role memberships that the dropped roles belongs to.
1597                    for (group_id, grantor_id) in &role.membership.map {
1598                        role_revokes.insert((*group_id, *id, *grantor_id));
1599                    }
1600                }
1601                ObjectId::Item(id) => {
1602                    if let Some(index) = self.catalog().get_entry(id).index() {
1603                        let humanizer = self.catalog().for_session(session);
1604                        let dependants = self
1605                            .controller
1606                            .compute
1607                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1608                            .ok()
1609                            .into_iter()
1610                            .flatten()
1611                            .filter(|dependant_id| {
1612                                // Transient Ids belong to Peeks. We are not interested for now in
1613                                // peeks depending on a dropped index.
1614                                // TODO: show a different notice in this case. Something like
1615                                // "There is an in-progress ad hoc SELECT that uses the dropped
1616                                // index. The resources used by the index will be freed when all
1617                                // such SELECTs complete."
1618                                if dependant_id.is_transient() {
1619                                    return false;
1620                                }
1621                                // The item should exist, but don't panic if it doesn't.
1622                                let Some(dependent_id) = humanizer
1623                                    .try_get_item_by_global_id(dependant_id)
1624                                    .map(|item| item.id())
1625                                else {
1626                                    return false;
1627                                };
1628                                // If the dependent object is also being dropped, then there is no
1629                                // problem, so we don't want a notice.
1630                                !ids_set.contains(&ObjectId::Item(dependent_id))
1631                            })
1632                            .flat_map(|dependant_id| {
1633                                // If we are not able to find a name for this ID it probably means
1634                                // we have already dropped the compute collection, in which case we
1635                                // can ignore it.
1636                                humanizer.humanize_id(dependant_id)
1637                            })
1638                            .collect_vec();
1639                        if !dependants.is_empty() {
1640                            dropped_in_use_indexes.push(DroppedInUseIndex {
1641                                index_name: humanizer
1642                                    .humanize_id(index.global_id())
1643                                    .unwrap_or_else(|| id.to_string()),
1644                                dependant_objects: dependants,
1645                            });
1646                        }
1647                    }
1648                }
1649                _ => {}
1650            }
1651        }
1652
1653        for id in &ids {
1654            match id {
1655                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1656                // unless they are internal replicas, which exist outside the scope
1657                // of managed clusters.
1658                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1659                    if !clusters_to_drop.contains(cluster_id) {
1660                        let cluster = self.catalog.get_cluster(*cluster_id);
1661                        if cluster.is_managed() {
1662                            let replica =
1663                                cluster.replica(*replica_id).expect("Catalog out of sync");
1664                            if !replica.config.location.internal() {
1665                                coord_bail!("cannot drop replica of managed cluster");
1666                            }
1667                        }
1668                    }
1669                }
1670                _ => {}
1671            }
1672        }
1673
1674        for role_id in dropped_roles.keys() {
1675            self.catalog().ensure_not_reserved_role(role_id)?;
1676        }
1677        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1678        // If any role is a member of a dropped role, then we must revoke that membership.
1679        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1680        for role in self.catalog().user_roles() {
1681            for dropped_role_id in
1682                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1683            {
1684                role_revokes.insert((
1685                    **dropped_role_id,
1686                    role.id(),
1687                    *role
1688                        .membership
1689                        .map
1690                        .get(*dropped_role_id)
1691                        .expect("included in keys above"),
1692                ));
1693            }
1694        }
1695
1696        for (default_privilege_object, default_privilege_acls) in
1697            self.catalog().default_privileges()
1698        {
1699            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1700                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1701            {
1702                for default_privilege_acl in default_privilege_acls {
1703                    default_privilege_revokes.insert((
1704                        default_privilege_object.clone(),
1705                        default_privilege_acl.clone(),
1706                    ));
1707                }
1708            }
1709        }
1710
1711        let ops = role_revokes
1712            .into_iter()
1713            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1714                role_id,
1715                member_id,
1716                grantor_id,
1717            })
1718            .chain(default_privilege_revokes.into_iter().map(
1719                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1720                    privilege_object,
1721                    privilege_acl_item,
1722                    variant: UpdatePrivilegeVariant::Revoke,
1723                },
1724            ))
1725            .chain(iter::once(catalog::Op::DropObjects(
1726                ids.into_iter()
1727                    .map(DropObjectInfo::manual_drop_from_object_id)
1728                    .collect(),
1729            )))
1730            .collect();
1731
1732        Ok(DropOps {
1733            ops,
1734            dropped_active_db,
1735            dropped_active_cluster,
1736            dropped_in_use_indexes,
1737        })
1738    }
1739
1740    pub(super) fn sequence_explain_schema(
1741        &self,
1742        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1743    ) -> Result<ExecuteResponse, AdapterError> {
1744        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1745            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1746        })?;
1747
1748        let json_string = json_string(&json_value);
1749        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1750        Ok(Self::send_immediate_rows(row))
1751    }
1752
1753    pub(super) fn sequence_show_all_variables(
1754        &self,
1755        session: &Session,
1756    ) -> Result<ExecuteResponse, AdapterError> {
1757        let mut rows = viewable_variables(self.catalog().state(), session)
1758            .map(|v| (v.name(), v.value(), v.description()))
1759            .collect::<Vec<_>>();
1760        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1761
1762        // TODO(parkmycar): Pack all of these into a single RowCollection.
1763        let rows: Vec<_> = rows
1764            .into_iter()
1765            .map(|(name, val, desc)| {
1766                Row::pack_slice(&[
1767                    Datum::String(name),
1768                    Datum::String(&val),
1769                    Datum::String(desc),
1770                ])
1771            })
1772            .collect();
1773        Ok(Self::send_immediate_rows(rows))
1774    }
1775
1776    pub(super) fn sequence_show_variable(
1777        &self,
1778        session: &Session,
1779        plan: plan::ShowVariablePlan,
1780    ) -> Result<ExecuteResponse, AdapterError> {
1781        if &plan.name == SCHEMA_ALIAS {
1782            let schemas = self.catalog.resolve_search_path(session);
1783            let schema = schemas.first();
1784            return match schema {
1785                Some((database_spec, schema_spec)) => {
1786                    let schema_name = &self
1787                        .catalog
1788                        .get_schema(database_spec, schema_spec, session.conn_id())
1789                        .name()
1790                        .schema;
1791                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1792                    Ok(Self::send_immediate_rows(row))
1793                }
1794                None => {
1795                    if session.vars().current_object_missing_warnings() {
1796                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1797                            search_path: session
1798                                .vars()
1799                                .search_path()
1800                                .into_iter()
1801                                .map(|schema| schema.to_string())
1802                                .collect(),
1803                        });
1804                    }
1805                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1806                }
1807            };
1808        }
1809
1810        let variable = session
1811            .vars()
1812            .get(self.catalog().system_config(), &plan.name)
1813            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1814
1815        // In lieu of plumbing the user to all system config functions, just check that the var is
1816        // visible.
1817        variable.visible(session.user(), self.catalog().system_config())?;
1818
1819        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1820        if variable.name() == vars::DATABASE.name()
1821            && matches!(
1822                self.catalog().resolve_database(&variable.value()),
1823                Err(CatalogError::UnknownDatabase(_))
1824            )
1825            && session.vars().current_object_missing_warnings()
1826        {
1827            let name = variable.value();
1828            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1829        } else if variable.name() == vars::CLUSTER.name()
1830            && matches!(
1831                self.catalog().resolve_cluster(&variable.value()),
1832                Err(CatalogError::UnknownCluster(_))
1833            )
1834            && session.vars().current_object_missing_warnings()
1835        {
1836            let name = variable.value();
1837            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1838        }
1839        Ok(Self::send_immediate_rows(row))
1840    }
1841
1842    #[instrument]
1843    pub(super) async fn sequence_inspect_shard(
1844        &self,
1845        session: &Session,
1846        plan: plan::InspectShardPlan,
1847    ) -> Result<ExecuteResponse, AdapterError> {
1848        // TODO: Not thrilled about this rbac special case here, but probably
1849        // sufficient for now.
1850        if !session.user().is_internal() {
1851            return Err(AdapterError::Unauthorized(
1852                rbac::UnauthorizedError::MzSystem {
1853                    action: "inspect".into(),
1854                },
1855            ));
1856        }
1857        let state = self
1858            .controller
1859            .storage
1860            .inspect_persist_state(plan.id)
1861            .await?;
1862        let jsonb = Jsonb::from_serde_json(state)?;
1863        Ok(Self::send_immediate_rows(jsonb.into_row()))
1864    }
1865
1866    #[instrument]
1867    pub(super) fn sequence_set_variable(
1868        &self,
1869        session: &mut Session,
1870        plan: plan::SetVariablePlan,
1871    ) -> Result<ExecuteResponse, AdapterError> {
1872        let (name, local) = (plan.name, plan.local);
1873        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1874            self.validate_set_isolation_level(session)?;
1875        }
1876        if &name == vars::CLUSTER.name() {
1877            self.validate_set_cluster(session)?;
1878        }
1879
1880        let vars = session.vars_mut();
1881        let values = match plan.value {
1882            plan::VariableValue::Default => None,
1883            plan::VariableValue::Values(values) => Some(values),
1884        };
1885
1886        match values {
1887            Some(values) => {
1888                vars.set(
1889                    self.catalog().system_config(),
1890                    &name,
1891                    VarInput::SqlSet(&values),
1892                    local,
1893                )?;
1894
1895                let vars = session.vars();
1896
1897                // Emit a warning when deprecated variables are used.
1898                // TODO(database-issues#8069) remove this after sufficient time has passed
1899                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1900                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1901                } else if name == vars::CLUSTER.name()
1902                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1903                {
1904                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1905                }
1906
1907                // Database or cluster value does not correspond to a catalog item.
1908                if name.as_str() == vars::DATABASE.name()
1909                    && matches!(
1910                        self.catalog().resolve_database(vars.database()),
1911                        Err(CatalogError::UnknownDatabase(_))
1912                    )
1913                    && session.vars().current_object_missing_warnings()
1914                {
1915                    let name = vars.database().to_string();
1916                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1917                } else if name.as_str() == vars::CLUSTER.name()
1918                    && matches!(
1919                        self.catalog().resolve_cluster(vars.cluster()),
1920                        Err(CatalogError::UnknownCluster(_))
1921                    )
1922                    && session.vars().current_object_missing_warnings()
1923                {
1924                    let name = vars.cluster().to_string();
1925                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1926                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1927                    let v = values.into_first().to_lowercase();
1928                    if v == IsolationLevel::ReadUncommitted.as_str()
1929                        || v == IsolationLevel::ReadCommitted.as_str()
1930                        || v == IsolationLevel::RepeatableRead.as_str()
1931                    {
1932                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1933                            isolation_level: v,
1934                        });
1935                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1936                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1937                    }
1938                }
1939            }
1940            None => vars.reset(self.catalog().system_config(), &name, local)?,
1941        }
1942
1943        Ok(ExecuteResponse::SetVariable { name, reset: false })
1944    }
1945
1946    pub(super) fn sequence_reset_variable(
1947        &self,
1948        session: &mut Session,
1949        plan: plan::ResetVariablePlan,
1950    ) -> Result<ExecuteResponse, AdapterError> {
1951        let name = plan.name;
1952        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1953            self.validate_set_isolation_level(session)?;
1954        }
1955        if &name == vars::CLUSTER.name() {
1956            self.validate_set_cluster(session)?;
1957        }
1958        session
1959            .vars_mut()
1960            .reset(self.catalog().system_config(), &name, false)?;
1961        Ok(ExecuteResponse::SetVariable { name, reset: true })
1962    }
1963
1964    pub(super) fn sequence_set_transaction(
1965        &self,
1966        session: &mut Session,
1967        plan: plan::SetTransactionPlan,
1968    ) -> Result<ExecuteResponse, AdapterError> {
1969        // TODO(jkosh44) Only supports isolation levels for now.
1970        for mode in plan.modes {
1971            match mode {
1972                TransactionMode::AccessMode(_) => {
1973                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1974                }
1975                TransactionMode::IsolationLevel(isolation_level) => {
1976                    self.validate_set_isolation_level(session)?;
1977
1978                    session.vars_mut().set(
1979                        self.catalog().system_config(),
1980                        TRANSACTION_ISOLATION_VAR_NAME,
1981                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1982                        plan.local,
1983                    )?
1984                }
1985            }
1986        }
1987        Ok(ExecuteResponse::SetVariable {
1988            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1989            reset: false,
1990        })
1991    }
1992
1993    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1994        if session.transaction().contains_ops() {
1995            Err(AdapterError::InvalidSetIsolationLevel)
1996        } else {
1997            Ok(())
1998        }
1999    }
2000
2001    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2002        if session.transaction().contains_ops() {
2003            Err(AdapterError::InvalidSetCluster)
2004        } else {
2005            Ok(())
2006        }
2007    }
2008
2009    #[instrument]
2010    pub(super) async fn sequence_end_transaction(
2011        &mut self,
2012        mut ctx: ExecuteContext,
2013        mut action: EndTransactionAction,
2014    ) {
2015        // If the transaction has failed, we can only rollback.
2016        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2017            (&action, ctx.session().transaction())
2018        {
2019            action = EndTransactionAction::Rollback;
2020        }
2021        let response = match action {
2022            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2023                params: BTreeMap::new(),
2024            }),
2025            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2026                params: BTreeMap::new(),
2027            }),
2028        };
2029
2030        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2031
2032        let (response, action) = match result {
2033            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2034                (response, action)
2035            }
2036            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2037                // Make sure we have the correct set of write locks for this transaction.
2038                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2039                // transactions.
2040                let validated_locks = match write_lock_guards {
2041                    None => None,
2042                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2043                        Ok(locks) => Some(locks),
2044                        Err(missing) => {
2045                            tracing::error!(?missing, "programming error, missing write locks");
2046                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2047                        }
2048                    },
2049                };
2050
2051                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2052                for WriteOp { id, rows } in writes {
2053                    let total_rows = collected_writes.entry(id).or_default();
2054                    total_rows.push(rows);
2055                }
2056
2057                self.submit_write(PendingWriteTxn::User {
2058                    span: Span::current(),
2059                    writes: collected_writes,
2060                    write_locks: validated_locks,
2061                    pending_txn: PendingTxn {
2062                        ctx,
2063                        response,
2064                        action,
2065                    },
2066                });
2067                return;
2068            }
2069            Ok((
2070                Some(TransactionOps::Peeks {
2071                    determination,
2072                    requires_linearization: RequireLinearization::Required,
2073                    ..
2074                }),
2075                _,
2076            )) if ctx.session().vars().transaction_isolation()
2077                == &IsolationLevel::StrictSerializable =>
2078            {
2079                let conn_id = ctx.session().conn_id().clone();
2080                let pending_read_txn = PendingReadTxn {
2081                    txn: PendingRead::Read {
2082                        txn: PendingTxn {
2083                            ctx,
2084                            response,
2085                            action,
2086                        },
2087                    },
2088                    timestamp_context: determination.timestamp_context,
2089                    created: Instant::now(),
2090                    num_requeues: 0,
2091                    otel_ctx: OpenTelemetryContext::obtain(),
2092                };
2093                self.strict_serializable_reads_tx
2094                    .send((conn_id, pending_read_txn))
2095                    .expect("sending to strict_serializable_reads_tx cannot fail");
2096                return;
2097            }
2098            Ok((
2099                Some(TransactionOps::Peeks {
2100                    determination,
2101                    requires_linearization: RequireLinearization::Required,
2102                    ..
2103                }),
2104                _,
2105            )) if ctx.session().vars().transaction_isolation()
2106                == &IsolationLevel::StrongSessionSerializable =>
2107            {
2108                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2109                    ctx.session_mut()
2110                        .ensure_timestamp_oracle(timeline.clone())
2111                        .apply_write(*ts);
2112                }
2113                (response, action)
2114            }
2115            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2116                self.internal_cmd_tx
2117                    .send(Message::ExecuteSingleStatementTransaction {
2118                        ctx,
2119                        otel_ctx: OpenTelemetryContext::obtain(),
2120                        stmt,
2121                        params,
2122                    })
2123                    .expect("must send");
2124                return;
2125            }
2126            Ok((_, _)) => (response, action),
2127            Err(err) => (Err(err), EndTransactionAction::Rollback),
2128        };
2129        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2130        // Append any parameters that changed to the response.
2131        let response = response.map(|mut r| {
2132            r.extend_params(changed);
2133            ExecuteResponse::from(r)
2134        });
2135
2136        ctx.retire(response);
2137    }
2138
2139    #[instrument]
2140    async fn sequence_end_transaction_inner(
2141        &mut self,
2142        ctx: &mut ExecuteContext,
2143        action: EndTransactionAction,
2144    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2145        let txn = self.clear_transaction(ctx.session_mut()).await;
2146
2147        if let EndTransactionAction::Commit = action {
2148            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2149                match &mut ops {
2150                    TransactionOps::Writes(writes) => {
2151                        for WriteOp { id, .. } in &mut writes.iter() {
2152                            // Re-verify this id exists.
2153                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2154                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2155                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2156                                        CatalogError::UnknownItem(id.to_string()),
2157                                    ),
2158                                })
2159                            })?;
2160                        }
2161
2162                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2163                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2164                    }
2165                    TransactionOps::DDL {
2166                        ops,
2167                        state: _,
2168                        side_effects,
2169                        revision,
2170                    } => {
2171                        // Make sure our catalog hasn't changed.
2172                        if *revision != self.catalog().transient_revision() {
2173                            return Err(AdapterError::DDLTransactionRace);
2174                        }
2175                        // Commit all of our queued ops.
2176                        let ops = std::mem::take(ops);
2177                        let side_effects = std::mem::take(side_effects);
2178                        self.catalog_transact_with_side_effects(
2179                            Some(ctx),
2180                            ops,
2181                            move |a, mut ctx| {
2182                                Box::pin(async move {
2183                                    for side_effect in side_effects {
2184                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2185                                    }
2186                                })
2187                            },
2188                        )
2189                        .await?;
2190                    }
2191                    _ => (),
2192                }
2193                return Ok((Some(ops), write_lock_guards));
2194            }
2195        }
2196
2197        Ok((None, None))
2198    }
2199
2200    pub(super) async fn sequence_side_effecting_func(
2201        &mut self,
2202        ctx: ExecuteContext,
2203        plan: SideEffectingFunc,
2204    ) {
2205        match plan {
2206            SideEffectingFunc::PgCancelBackend { connection_id } => {
2207                if ctx.session().conn_id().unhandled() == connection_id {
2208                    // As a special case, if we're canceling ourselves, we send
2209                    // back a canceled resposne to the client issuing the query,
2210                    // and so we need to do no further processing of the cancel.
2211                    ctx.retire(Err(AdapterError::Canceled));
2212                    return;
2213                }
2214
2215                let res = if let Some((id_handle, _conn_meta)) =
2216                    self.active_conns.get_key_value(&connection_id)
2217                {
2218                    // check_plan already verified role membership.
2219                    self.handle_privileged_cancel(id_handle.clone()).await;
2220                    Datum::True
2221                } else {
2222                    Datum::False
2223                };
2224                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2225            }
2226        }
2227    }
2228
2229    /// Inner method that performs the actual real-time recency timestamp determination.
2230    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2231    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2232    pub(crate) async fn determine_real_time_recent_timestamp(
2233        &self,
2234        source_ids: impl Iterator<Item = GlobalId>,
2235        real_time_recency_timeout: Duration,
2236    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2237    {
2238        let item_ids = source_ids
2239            .map(|gid| {
2240                self.catalog
2241                    .try_resolve_item_id(&gid)
2242                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2243            })
2244            .collect::<Result<Vec<_>, _>>()?;
2245
2246        // Find all dependencies transitively because we need to ensure that
2247        // RTR queries determine the timestamp from the sources' (i.e.
2248        // storage objects that ingest data from external systems) remap
2249        // data. We "cheat" a little bit and filter out any IDs that aren't
2250        // user objects because we know they are not a RTR source.
2251        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2252        // If none of the sources are user objects, we don't need to provide
2253        // a RTR timestamp.
2254        if to_visit.is_empty() {
2255            return Ok(None);
2256        }
2257
2258        let mut timestamp_objects = BTreeSet::new();
2259
2260        while let Some(id) = to_visit.pop_front() {
2261            timestamp_objects.insert(id);
2262            to_visit.extend(
2263                self.catalog()
2264                    .get_entry(&id)
2265                    .uses()
2266                    .into_iter()
2267                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2268            );
2269        }
2270        let timestamp_objects = timestamp_objects
2271            .into_iter()
2272            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2273            .collect();
2274
2275        let r = self
2276            .controller
2277            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2278            .await?;
2279
2280        Ok(Some(r))
2281    }
2282
2283    /// Checks to see if the session needs a real time recency timestamp and if so returns
2284    /// a future that will return the timestamp.
2285    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2286        &self,
2287        session: &Session,
2288        source_ids: impl Iterator<Item = GlobalId>,
2289    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2290    {
2291        let vars = session.vars();
2292
2293        if vars.real_time_recency()
2294            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2295            && !session.contains_read_timestamp()
2296        {
2297            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2298                .await
2299        } else {
2300            Ok(None)
2301        }
2302    }
2303
2304    #[instrument]
2305    pub(super) async fn sequence_explain_plan(
2306        &mut self,
2307        ctx: ExecuteContext,
2308        plan: plan::ExplainPlanPlan,
2309        target_cluster: TargetCluster,
2310    ) {
2311        match &plan.explainee {
2312            plan::Explainee::Statement(stmt) => match stmt {
2313                plan::ExplaineeStatement::CreateView { .. } => {
2314                    self.explain_create_view(ctx, plan).await;
2315                }
2316                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2317                    self.explain_create_materialized_view(ctx, plan).await;
2318                }
2319                plan::ExplaineeStatement::CreateIndex { .. } => {
2320                    self.explain_create_index(ctx, plan).await;
2321                }
2322                plan::ExplaineeStatement::Select { .. } => {
2323                    self.explain_peek(ctx, plan, target_cluster).await;
2324                }
2325            },
2326            plan::Explainee::View(_) => {
2327                let result = self.explain_view(&ctx, plan);
2328                ctx.retire(result);
2329            }
2330            plan::Explainee::MaterializedView(_) => {
2331                let result = self.explain_materialized_view(&ctx, plan);
2332                ctx.retire(result);
2333            }
2334            plan::Explainee::Index(_) => {
2335                let result = self.explain_index(&ctx, plan);
2336                ctx.retire(result);
2337            }
2338            plan::Explainee::ReplanView(_) => {
2339                self.explain_replan_view(ctx, plan).await;
2340            }
2341            plan::Explainee::ReplanMaterializedView(_) => {
2342                self.explain_replan_materialized_view(ctx, plan).await;
2343            }
2344            plan::Explainee::ReplanIndex(_) => {
2345                self.explain_replan_index(ctx, plan).await;
2346            }
2347        };
2348    }
2349
2350    pub(super) async fn sequence_explain_pushdown(
2351        &mut self,
2352        ctx: ExecuteContext,
2353        plan: plan::ExplainPushdownPlan,
2354        target_cluster: TargetCluster,
2355    ) {
2356        match plan.explainee {
2357            Explainee::Statement(ExplaineeStatement::Select {
2358                broken: false,
2359                plan,
2360                desc: _,
2361            }) => {
2362                let stage = return_if_err!(
2363                    self.peek_validate(
2364                        ctx.session(),
2365                        plan,
2366                        target_cluster,
2367                        None,
2368                        ExplainContext::Pushdown,
2369                        Some(ctx.session().vars().max_query_result_size()),
2370                    ),
2371                    ctx
2372                );
2373                self.sequence_staged(ctx, Span::current(), stage).await;
2374            }
2375            Explainee::MaterializedView(item_id) => {
2376                self.explain_pushdown_materialized_view(ctx, item_id).await;
2377            }
2378            _ => {
2379                ctx.retire(Err(AdapterError::Unsupported(
2380                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2381                )));
2382            }
2383        };
2384    }
2385
2386    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2387    async fn execute_explain_pushdown_with_read_holds(
2388        &self,
2389        ctx: ExecuteContext,
2390        as_of: Antichain<Timestamp>,
2391        mz_now: ResultSpec<'static>,
2392        read_holds: Option<ReadHolds<Timestamp>>,
2393        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2394    ) {
2395        let fut = self
2396            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2397            .await;
2398        task::spawn(|| "render explain pushdown", async move {
2399            // Transfer the necessary read holds over to the background task
2400            let _read_holds = read_holds;
2401            let res = fut.await;
2402            ctx.retire(res);
2403        });
2404    }
2405
2406    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2407    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2408        &self,
2409        session: &Session,
2410        as_of: Antichain<Timestamp>,
2411        mz_now: ResultSpec<'static>,
2412        imports: I,
2413    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2414        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2415        super::explain_pushdown_future_inner(
2416            session,
2417            &self.catalog,
2418            &self.controller.storage_collections,
2419            as_of,
2420            mz_now,
2421            imports,
2422        )
2423        .await
2424    }
2425
2426    #[instrument]
2427    pub(super) async fn sequence_insert(
2428        &mut self,
2429        mut ctx: ExecuteContext,
2430        plan: plan::InsertPlan,
2431    ) {
2432        // Normally, this would get checked when trying to add "write ops" to
2433        // the transaction but we go down diverging paths below, based on
2434        // whether the INSERT is only constant values or not.
2435        //
2436        // For the non-constant case we sequence an implicit read-then-write,
2437        // which messes with the transaction ops and would allow an implicit
2438        // read-then-write to sneak into a read-only transaction.
2439        if !ctx.session_mut().transaction().allows_writes() {
2440            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2441            return;
2442        }
2443
2444        // The structure of this code originates from a time where
2445        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2446        // optimized `MirRelationExpr`.
2447        //
2448        // Ideally, we would like to make the `selection.as_const().is_some()`
2449        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2450        // are planned as a Wrap($n, $vals) call, so until we can reduce
2451        // HirRelationExpr this will always returns false.
2452        //
2453        // Unfortunately, hitting the default path of the match below also
2454        // causes a lot of tests to fail, so we opted to go with the extra
2455        // `plan.values.clone()` statements when producing the `optimized_mir`
2456        // and re-optimize the values in the `sequence_read_then_write` call.
2457        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2458            // We don't perform any optimizations on an expression that is already
2459            // a constant for writes, as we want to maximize bulk-insert throughput.
2460            let expr = return_if_err!(
2461                plan.values
2462                    .clone()
2463                    .lower(self.catalog().system_config(), None),
2464                ctx
2465            );
2466            OptimizedMirRelationExpr(expr)
2467        } else {
2468            // Collect optimizer parameters.
2469            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2470
2471            // (`optimize::view::Optimizer` has a special case for constant queries.)
2472            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2473
2474            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2475            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2476        };
2477
2478        match optimized_mir.into_inner() {
2479            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2480                let catalog = self.owned_catalog();
2481                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2482                    let result =
2483                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2484                    ctx.retire(result);
2485                });
2486            }
2487            // All non-constant values must be planned as read-then-writes.
2488            _ => {
2489                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2490                    Some(table) => {
2491                        let fullname = self
2492                            .catalog()
2493                            .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2494                        // Inserts always occur at the latest version of the table.
2495                        table
2496                            .desc_latest(&fullname)
2497                            .expect("desc called on table")
2498                            .arity()
2499                    }
2500                    None => {
2501                        ctx.retire(Err(AdapterError::Catalog(
2502                            mz_catalog::memory::error::Error {
2503                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2504                                    CatalogError::UnknownItem(plan.id.to_string()),
2505                                ),
2506                            },
2507                        )));
2508                        return;
2509                    }
2510                };
2511
2512                let finishing = RowSetFinishing {
2513                    order_by: vec![],
2514                    limit: None,
2515                    offset: 0,
2516                    project: (0..desc_arity).collect(),
2517                };
2518
2519                let read_then_write_plan = plan::ReadThenWritePlan {
2520                    id: plan.id,
2521                    selection: plan.values,
2522                    finishing,
2523                    assignments: BTreeMap::new(),
2524                    kind: MutationKind::Insert,
2525                    returning: plan.returning,
2526                };
2527
2528                self.sequence_read_then_write(ctx, read_then_write_plan)
2529                    .await;
2530            }
2531        }
2532    }
2533
2534    /// ReadThenWrite is a plan whose writes depend on the results of a
2535    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2536    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2537    /// serializability violation could occur.
2538    #[instrument]
2539    pub(super) async fn sequence_read_then_write(
2540        &mut self,
2541        mut ctx: ExecuteContext,
2542        plan: plan::ReadThenWritePlan,
2543    ) {
2544        let mut source_ids: BTreeSet<_> = plan
2545            .selection
2546            .depends_on()
2547            .into_iter()
2548            .map(|gid| self.catalog().resolve_item_id(&gid))
2549            .collect();
2550        source_ids.insert(plan.id);
2551
2552        // If the transaction doesn't already have write locks, acquire them.
2553        if ctx.session().transaction().write_locks().is_none() {
2554            // Pre-define all of the locks we need.
2555            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2556
2557            // Try acquiring all of our locks.
2558            for id in &source_ids {
2559                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2560                    write_locks.insert_lock(*id, lock);
2561                }
2562            }
2563
2564            // See if we acquired all of the neccessary locks.
2565            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2566                Ok(locks) => locks,
2567                Err(missing) => {
2568                    // Defer our write if we couldn't acquire all of the locks.
2569                    let role_metadata = ctx.session().role_metadata().clone();
2570                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2571                    let plan = DeferredPlan {
2572                        ctx,
2573                        plan: Plan::ReadThenWrite(plan),
2574                        validity: PlanValidity::new(
2575                            self.catalog.transient_revision(),
2576                            source_ids.clone(),
2577                            None,
2578                            None,
2579                            role_metadata,
2580                        ),
2581                        requires_locks: source_ids,
2582                    };
2583                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2584                }
2585            };
2586
2587            ctx.session_mut()
2588                .try_grant_write_locks(write_locks)
2589                .expect("session has already been granted write locks");
2590        }
2591
2592        let plan::ReadThenWritePlan {
2593            id,
2594            kind,
2595            selection,
2596            mut assignments,
2597            finishing,
2598            mut returning,
2599        } = plan;
2600
2601        // Read then writes can be queued, so re-verify the id exists.
2602        let desc = match self.catalog().try_get_entry(&id) {
2603            Some(table) => {
2604                let full_name = self
2605                    .catalog()
2606                    .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2607                // Inserts always occur at the latest version of the table.
2608                table
2609                    .desc_latest(&full_name)
2610                    .expect("desc called on table")
2611                    .into_owned()
2612            }
2613            None => {
2614                ctx.retire(Err(AdapterError::Catalog(
2615                    mz_catalog::memory::error::Error {
2616                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2617                            id.to_string(),
2618                        )),
2619                    },
2620                )));
2621                return;
2622            }
2623        };
2624
2625        // Disallow mz_now in any position because read time and write time differ.
2626        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2627            || assignments.values().any(|e| e.contains_temporal())
2628            || returning.iter().any(|e| e.contains_temporal());
2629        if contains_temporal {
2630            ctx.retire(Err(AdapterError::Unsupported(
2631                "calls to mz_now in write statements",
2632            )));
2633            return;
2634        }
2635
2636        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2637        //
2638        // - They do not refer to any objects whose notion of time moves differently than that of
2639        // user tables. This limitation is meant to ensure no writes occur between this read and the
2640        // subsequent write.
2641        // - They do not use mz_now(), whose time produced during read will differ from the write
2642        //   timestamp.
2643        fn validate_read_dependencies(
2644            catalog: &Catalog,
2645            id: &CatalogItemId,
2646        ) -> Result<(), AdapterError> {
2647            use CatalogItemType::*;
2648            use mz_catalog::memory::objects;
2649            let mut ids_to_check = Vec::new();
2650            let valid = match catalog.try_get_entry(id) {
2651                Some(entry) => {
2652                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2653                    | CatalogItem::MaterializedView(objects::MaterializedView {
2654                        optimized_expr,
2655                        ..
2656                    }) = entry.item()
2657                    {
2658                        if optimized_expr.contains_temporal() {
2659                            return Err(AdapterError::Unsupported(
2660                                "calls to mz_now in write statements",
2661                            ));
2662                        }
2663                    }
2664                    match entry.item().typ() {
2665                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2666                            ids_to_check.extend(entry.uses());
2667                            let valid_id = id.is_user() || matches!(typ, Func);
2668                            valid_id
2669                        }
2670                        Source | Secret | Connection => false,
2671                        // Cannot select from sinks or indexes.
2672                        Sink | Index => unreachable!(),
2673                        Table => {
2674                            if !id.is_user() {
2675                                // We can't read from non-user tables
2676                                false
2677                            } else {
2678                                // We can't read from tables that are source-exports
2679                                entry.source_export_details().is_none()
2680                            }
2681                        }
2682                        Type => true,
2683                    }
2684                }
2685                None => false,
2686            };
2687            if !valid {
2688                return Err(AdapterError::InvalidTableMutationSelection);
2689            }
2690            for id in ids_to_check {
2691                validate_read_dependencies(catalog, &id)?;
2692            }
2693            Ok(())
2694        }
2695
2696        for gid in selection.depends_on() {
2697            let item_id = self.catalog().resolve_item_id(&gid);
2698            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2699                ctx.retire(Err(err));
2700                return;
2701            }
2702        }
2703
2704        let (peek_tx, peek_rx) = oneshot::channel();
2705        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2706        let (tx, _, session, extra) = ctx.into_parts();
2707        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2708        // execution context, because this peek does not directly correspond to an execute,
2709        // and so we don't need to take any action on its retirement.
2710        // TODO[btv]: we might consider extending statement logging to log the inner
2711        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2712        // and mint a new "real" execution context here. We'd also have to add some logic to
2713        // make sure such "sub-statements" are always sampled when the top-level statement is
2714        //
2715        // It's debatable whether this makes sense conceptually,
2716        // because the inner fragment here is not actually a
2717        // "statement" in its own right.
2718        let peek_ctx = ExecuteContext::from_parts(
2719            peek_client_tx,
2720            self.internal_cmd_tx.clone(),
2721            session,
2722            Default::default(),
2723        );
2724
2725        self.sequence_peek(
2726            peek_ctx,
2727            plan::SelectPlan {
2728                select: None,
2729                source: selection,
2730                when: QueryWhen::FreshestTableWrite,
2731                finishing,
2732                copy_to: None,
2733            },
2734            TargetCluster::Active,
2735            None,
2736        )
2737        .await;
2738
2739        let internal_cmd_tx = self.internal_cmd_tx.clone();
2740        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2741        let catalog = self.owned_catalog();
2742        let max_result_size = self.catalog().system_config().max_result_size();
2743
2744        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2745            let (peek_response, session) = match peek_rx.await {
2746                Ok(Response {
2747                    result: Ok(resp),
2748                    session,
2749                    otel_ctx,
2750                }) => {
2751                    otel_ctx.attach_as_parent();
2752                    (resp, session)
2753                }
2754                Ok(Response {
2755                    result: Err(e),
2756                    session,
2757                    otel_ctx,
2758                }) => {
2759                    let ctx =
2760                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2761                    otel_ctx.attach_as_parent();
2762                    ctx.retire(Err(e));
2763                    return;
2764                }
2765                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2766                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2767            };
2768            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2769            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2770
2771            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2772            if timeout_dur == Duration::ZERO {
2773                timeout_dur = Duration::MAX;
2774            }
2775
2776            let style = ExprPrepStyle::OneShot {
2777                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2778                session: ctx.session(),
2779                catalog_state: catalog.state(),
2780            };
2781            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2782                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
2783            }
2784
2785            let make_diffs =
2786                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2787                    let arena = RowArena::new();
2788                    let mut diffs = Vec::new();
2789                    let mut datum_vec = mz_repr::DatumVec::new();
2790
2791                    while let Some(row) = rows.next() {
2792                        if !assignments.is_empty() {
2793                            assert!(
2794                                matches!(kind, MutationKind::Update),
2795                                "only updates support assignments"
2796                            );
2797                            let mut datums = datum_vec.borrow_with(row);
2798                            let mut updates = vec![];
2799                            for (idx, expr) in &assignments {
2800                                let updated = match expr.eval(&datums, &arena) {
2801                                    Ok(updated) => updated,
2802                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2803                                };
2804                                updates.push((*idx, updated));
2805                            }
2806                            for (idx, new_value) in updates {
2807                                datums[idx] = new_value;
2808                            }
2809                            let updated = Row::pack_slice(&datums);
2810                            diffs.push((updated, Diff::ONE));
2811                        }
2812                        match kind {
2813                            // Updates and deletes always remove the
2814                            // current row. Updates will also add an
2815                            // updated value.
2816                            MutationKind::Update | MutationKind::Delete => {
2817                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2818                            }
2819                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2820                        }
2821                    }
2822
2823                    // Sum of all the rows' byte size, for checking if we go
2824                    // above the max_result_size threshold.
2825                    let mut byte_size: u64 = 0;
2826                    for (row, diff) in &diffs {
2827                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2828                        if diff.is_positive() {
2829                            for (idx, datum) in row.iter().enumerate() {
2830                                desc.constraints_met(idx, &datum)?;
2831                            }
2832                        }
2833                    }
2834                    Ok((diffs, byte_size))
2835                };
2836
2837            let diffs = match peek_response {
2838                ExecuteResponse::SendingRowsStreaming {
2839                    rows: mut rows_stream,
2840                    ..
2841                } => {
2842                    let mut byte_size: u64 = 0;
2843                    let mut diffs = Vec::new();
2844                    let result = loop {
2845                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2846                            Ok(Some(res)) => match res {
2847                                PeekResponseUnary::Rows(new_rows) => {
2848                                    match make_diffs(new_rows) {
2849                                        Ok((mut new_diffs, new_byte_size)) => {
2850                                            byte_size = byte_size.saturating_add(new_byte_size);
2851                                            if byte_size > max_result_size {
2852                                                break Err(AdapterError::ResultSize(format!(
2853                                                    "result exceeds max size of {max_result_size}"
2854                                                )));
2855                                            }
2856                                            diffs.append(&mut new_diffs)
2857                                        }
2858                                        Err(e) => break Err(e),
2859                                    };
2860                                }
2861                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2862                                PeekResponseUnary::Error(e) => {
2863                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2864                                }
2865                            },
2866                            Ok(None) => break Ok(diffs),
2867                            Err(_) => {
2868                                // We timed out, so remove the pending peek. This is
2869                                // best-effort and doesn't guarantee we won't
2870                                // receive a response.
2871                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2872                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2873                                    conn_id: ctx.session().conn_id().clone(),
2874                                });
2875                                if let Err(e) = result {
2876                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2877                                }
2878                                break Err(AdapterError::StatementTimeout);
2879                            }
2880                        }
2881                    };
2882
2883                    result
2884                }
2885                ExecuteResponse::SendingRowsImmediate { rows } => {
2886                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2887                }
2888                resp => Err(AdapterError::Unstructured(anyhow!(
2889                    "unexpected peek response: {resp:?}"
2890                ))),
2891            };
2892
2893            let mut returning_rows = Vec::new();
2894            let mut diff_err: Option<AdapterError> = None;
2895            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2896                let arena = RowArena::new();
2897                for (row, diff) in diffs {
2898                    if !diff.is_positive() {
2899                        continue;
2900                    }
2901                    let mut returning_row = Row::with_capacity(returning.len());
2902                    let mut packer = returning_row.packer();
2903                    for expr in &returning {
2904                        let datums: Vec<_> = row.iter().collect();
2905                        match expr.eval(&datums, &arena) {
2906                            Ok(datum) => {
2907                                packer.push(datum);
2908                            }
2909                            Err(err) => {
2910                                diff_err = Some(err.into());
2911                                break;
2912                            }
2913                        }
2914                    }
2915                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2916                    let diff = match NonZeroUsize::try_from(diff) {
2917                        Ok(diff) => diff,
2918                        Err(err) => {
2919                            diff_err = Some(err.into());
2920                            break;
2921                        }
2922                    };
2923                    returning_rows.push((returning_row, diff));
2924                    if diff_err.is_some() {
2925                        break;
2926                    }
2927                }
2928            }
2929            let diffs = if let Some(err) = diff_err {
2930                Err(err)
2931            } else {
2932                diffs
2933            };
2934
2935            // We need to clear out the timestamp context so the write doesn't fail due to a
2936            // read only transaction.
2937            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2938            // No matter what isolation level the client is using, we must linearize this
2939            // read. The write will be performed right after this, as part of a single
2940            // transaction, so the write must have a timestamp greater than or equal to the
2941            // read.
2942            //
2943            // Note: It's only OK for the write to have a greater timestamp than the read
2944            // because the write lock prevents any other writes from happening in between
2945            // the read and write.
2946            if let Some(timestamp_context) = timestamp_context {
2947                let (tx, rx) = tokio::sync::oneshot::channel();
2948                let conn_id = ctx.session().conn_id().clone();
2949                let pending_read_txn = PendingReadTxn {
2950                    txn: PendingRead::ReadThenWrite { ctx, tx },
2951                    timestamp_context,
2952                    created: Instant::now(),
2953                    num_requeues: 0,
2954                    otel_ctx: OpenTelemetryContext::obtain(),
2955                };
2956                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2957                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
2958                if let Err(e) = result {
2959                    warn!(
2960                        "strict_serializable_reads_tx dropped before we could send: {:?}",
2961                        e
2962                    );
2963                    return;
2964                }
2965                let result = rx.await;
2966                // It is not an error for these results to be ready after `tx` has been dropped.
2967                ctx = match result {
2968                    Ok(Some(ctx)) => ctx,
2969                    Ok(None) => {
2970                        // Coordinator took our context and will handle responding to the client.
2971                        // This usually indicates that our transaction was aborted.
2972                        return;
2973                    }
2974                    Err(e) => {
2975                        warn!(
2976                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2977                            e
2978                        );
2979                        return;
2980                    }
2981                };
2982            }
2983
2984            match diffs {
2985                Ok(diffs) => {
2986                    let result = Self::send_diffs(
2987                        ctx.session_mut(),
2988                        plan::SendDiffsPlan {
2989                            id,
2990                            updates: diffs,
2991                            kind,
2992                            returning: returning_rows,
2993                            max_result_size,
2994                        },
2995                    );
2996                    ctx.retire(result);
2997                }
2998                Err(e) => {
2999                    ctx.retire(Err(e));
3000                }
3001            }
3002        });
3003    }
3004
3005    #[instrument]
3006    pub(super) async fn sequence_alter_item_rename(
3007        &mut self,
3008        ctx: &mut ExecuteContext,
3009        plan: plan::AlterItemRenamePlan,
3010    ) -> Result<ExecuteResponse, AdapterError> {
3011        let op = catalog::Op::RenameItem {
3012            id: plan.id,
3013            current_full_name: plan.current_full_name,
3014            to_name: plan.to_name,
3015        };
3016        match self
3017            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3018            .await
3019        {
3020            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3021            Err(err) => Err(err),
3022        }
3023    }
3024
3025    #[instrument]
3026    pub(super) async fn sequence_alter_retain_history(
3027        &mut self,
3028        ctx: &mut ExecuteContext,
3029        plan: plan::AlterRetainHistoryPlan,
3030    ) -> Result<ExecuteResponse, AdapterError> {
3031        let ops = vec![catalog::Op::AlterRetainHistory {
3032            id: plan.id,
3033            value: plan.value,
3034            window: plan.window,
3035        }];
3036        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3037            Box::pin(async move {
3038                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3039                let cluster = match catalog_item {
3040                    CatalogItem::Table(_)
3041                    | CatalogItem::MaterializedView(_)
3042                    | CatalogItem::Source(_)
3043                    | CatalogItem::ContinualTask(_) => None,
3044                    CatalogItem::Index(index) => Some(index.cluster_id),
3045                    CatalogItem::Log(_)
3046                    | CatalogItem::View(_)
3047                    | CatalogItem::Sink(_)
3048                    | CatalogItem::Type(_)
3049                    | CatalogItem::Func(_)
3050                    | CatalogItem::Secret(_)
3051                    | CatalogItem::Connection(_) => unreachable!(),
3052                };
3053                match cluster {
3054                    Some(cluster) => {
3055                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3056                    }
3057                    None => {
3058                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3059                    }
3060                }
3061            })
3062        })
3063        .await?;
3064        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3065    }
3066
3067    #[instrument]
3068    pub(super) async fn sequence_alter_schema_rename(
3069        &mut self,
3070        ctx: &mut ExecuteContext,
3071        plan: plan::AlterSchemaRenamePlan,
3072    ) -> Result<ExecuteResponse, AdapterError> {
3073        let (database_spec, schema_spec) = plan.cur_schema_spec;
3074        let op = catalog::Op::RenameSchema {
3075            database_spec,
3076            schema_spec,
3077            new_name: plan.new_schema_name,
3078            check_reserved_names: true,
3079        };
3080        match self
3081            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3082            .await
3083        {
3084            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3085            Err(err) => Err(err),
3086        }
3087    }
3088
3089    #[instrument]
3090    pub(super) async fn sequence_alter_schema_swap(
3091        &mut self,
3092        ctx: &mut ExecuteContext,
3093        plan: plan::AlterSchemaSwapPlan,
3094    ) -> Result<ExecuteResponse, AdapterError> {
3095        let plan::AlterSchemaSwapPlan {
3096            schema_a_spec: (schema_a_db, schema_a),
3097            schema_a_name,
3098            schema_b_spec: (schema_b_db, schema_b),
3099            schema_b_name,
3100            name_temp,
3101        } = plan;
3102
3103        let op_a = catalog::Op::RenameSchema {
3104            database_spec: schema_a_db,
3105            schema_spec: schema_a,
3106            new_name: name_temp,
3107            check_reserved_names: false,
3108        };
3109        let op_b = catalog::Op::RenameSchema {
3110            database_spec: schema_b_db,
3111            schema_spec: schema_b,
3112            new_name: schema_a_name,
3113            check_reserved_names: false,
3114        };
3115        let op_c = catalog::Op::RenameSchema {
3116            database_spec: schema_a_db,
3117            schema_spec: schema_a,
3118            new_name: schema_b_name,
3119            check_reserved_names: false,
3120        };
3121
3122        match self
3123            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3124                Box::pin(async {})
3125            })
3126            .await
3127        {
3128            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3129            Err(err) => Err(err),
3130        }
3131    }
3132
3133    #[instrument]
3134    pub(super) async fn sequence_alter_role(
3135        &mut self,
3136        session: &Session,
3137        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3138    ) -> Result<ExecuteResponse, AdapterError> {
3139        let catalog = self.catalog().for_session(session);
3140        let role = catalog.get_role(&id);
3141
3142        // We'll send these notices to the user, if the operation is successful.
3143        let mut notices = vec![];
3144
3145        // Get the attributes and variables from the role, as they currently are.
3146        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3147        let mut vars = role.vars().clone();
3148
3149        // Whether to set the password to NULL. This is a special case since the existing
3150        // password is not stored in the role attributes.
3151        let mut nopassword = false;
3152
3153        // Apply our updates.
3154        match option {
3155            PlannedAlterRoleOption::Attributes(attrs) => {
3156                self.validate_role_attributes(&attrs.clone().into())?;
3157
3158                if let Some(inherit) = attrs.inherit {
3159                    attributes.inherit = inherit;
3160                }
3161
3162                if let Some(password) = attrs.password {
3163                    attributes.password = Some(password);
3164                    attributes.scram_iterations =
3165                        Some(self.catalog().system_config().scram_iterations())
3166                }
3167
3168                if let Some(superuser) = attrs.superuser {
3169                    attributes.superuser = Some(superuser);
3170                }
3171
3172                if let Some(login) = attrs.login {
3173                    attributes.login = Some(login);
3174                }
3175
3176                if attrs.nopassword.unwrap_or(false) {
3177                    nopassword = true;
3178                }
3179
3180                if let Some(notice) = self.should_emit_rbac_notice(session) {
3181                    notices.push(notice);
3182                }
3183            }
3184            PlannedAlterRoleOption::Variable(variable) => {
3185                // Get the variable to make sure it's valid and visible.
3186                let session_var = session.vars().inspect(variable.name())?;
3187                // Return early if it's not visible.
3188                session_var.visible(session.user(), catalog.system_vars())?;
3189
3190                // Emit a warning when deprecated variables are used.
3191                // TODO(database-issues#8069) remove this after sufficient time has passed
3192                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3193                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3194                } else if let PlannedRoleVariable::Set {
3195                    name,
3196                    value: VariableValue::Values(vals),
3197                } = &variable
3198                {
3199                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3200                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3201                    }
3202                }
3203
3204                let var_name = match variable {
3205                    PlannedRoleVariable::Set { name, value } => {
3206                        // Update our persisted set.
3207                        match &value {
3208                            VariableValue::Default => {
3209                                vars.remove(&name);
3210                            }
3211                            VariableValue::Values(vals) => {
3212                                let var = match &vals[..] {
3213                                    [val] => OwnedVarInput::Flat(val.clone()),
3214                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3215                                };
3216                                // Make sure the input is valid.
3217                                session_var.check(var.borrow())?;
3218
3219                                vars.insert(name.clone(), var);
3220                            }
3221                        };
3222                        name
3223                    }
3224                    PlannedRoleVariable::Reset { name } => {
3225                        // Remove it from our persisted values.
3226                        vars.remove(&name);
3227                        name
3228                    }
3229                };
3230
3231                // Emit a notice that they need to reconnect to see the change take effect.
3232                notices.push(AdapterNotice::VarDefaultUpdated {
3233                    role: Some(name.clone()),
3234                    var_name: Some(var_name),
3235                });
3236            }
3237        }
3238
3239        let op = catalog::Op::AlterRole {
3240            id,
3241            name,
3242            attributes,
3243            nopassword,
3244            vars: RoleVars { map: vars },
3245        };
3246        let response = self
3247            .catalog_transact(Some(session), vec![op])
3248            .await
3249            .map(|_| ExecuteResponse::AlteredRole)?;
3250
3251        // Send all of our queued notices.
3252        session.add_notices(notices);
3253
3254        Ok(response)
3255    }
3256
3257    #[instrument]
3258    pub(super) async fn sequence_alter_sink_prepare(
3259        &mut self,
3260        ctx: ExecuteContext,
3261        plan: plan::AlterSinkPlan,
3262    ) {
3263        // Put a read hold on the new relation
3264        let id_bundle = crate::CollectionIdBundle {
3265            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3266            compute_ids: BTreeMap::new(),
3267        };
3268        let read_hold = self.acquire_read_holds(&id_bundle);
3269
3270        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3271            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3272            return;
3273        };
3274
3275        let otel_ctx = OpenTelemetryContext::obtain();
3276        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3277
3278        let plan_validity = PlanValidity::new(
3279            self.catalog().transient_revision(),
3280            BTreeSet::from_iter([plan.item_id, from_item_id]),
3281            Some(plan.in_cluster),
3282            None,
3283            ctx.session().role_metadata().clone(),
3284        );
3285
3286        info!(
3287            "preparing alter sink for {}: frontiers={:?} export={:?}",
3288            plan.global_id,
3289            self.controller
3290                .storage_collections
3291                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3292            self.controller.storage.export(plan.global_id)
3293        );
3294
3295        // Now we must wait for the sink to make enough progress such that there is overlap between
3296        // the new `from` collection's read hold and the sink's write frontier.
3297        //
3298        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3299        // the watch set never completes and neither does the `ALTER SINK` command.
3300        self.install_storage_watch_set(
3301            ctx.session().conn_id().clone(),
3302            BTreeSet::from_iter([plan.global_id]),
3303            read_ts,
3304            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3305                ctx: Some(ctx),
3306                otel_ctx,
3307                plan,
3308                plan_validity,
3309                read_hold,
3310            }),
3311        );
3312    }
3313
3314    #[instrument]
3315    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3316        ctx.otel_ctx.attach_as_parent();
3317
3318        let plan::AlterSinkPlan {
3319            item_id,
3320            global_id,
3321            sink: sink_plan,
3322            with_snapshot,
3323            in_cluster,
3324        } = ctx.plan.clone();
3325
3326        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3327        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3328        // arbitrarily changed since we performed planning, and we must re-assert that it still
3329        // matches our requirements.
3330        //
3331        // The `PlanValidity` check ensures that both the sink and the new source relation still
3332        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3333        // time, which we do by comparing the catalog sink version to the one in the plan.
3334        match ctx.plan_validity.check(self.catalog()) {
3335            Ok(()) => {}
3336            Err(err) => {
3337                ctx.retire(Err(err));
3338                return;
3339            }
3340        }
3341
3342        let entry = self.catalog().get_entry(&item_id);
3343        let CatalogItem::Sink(old_sink) = entry.item() else {
3344            panic!("invalid item kind for `AlterSinkPlan`");
3345        };
3346
3347        if sink_plan.version != old_sink.version + 1 {
3348            ctx.retire(Err(AdapterError::ChangedPlan(
3349                "sink was altered concurrently".into(),
3350            )));
3351            return;
3352        }
3353
3354        info!(
3355            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3356            self.controller
3357                .storage_collections
3358                .collections_frontiers(vec![global_id, sink_plan.from]),
3359            self.controller.storage.export(global_id),
3360        );
3361
3362        // Assert that we can recover the updates that happened at the timestamps of the write
3363        // frontier. This must be true in this call.
3364        let write_frontier = &self
3365            .controller
3366            .storage
3367            .export(global_id)
3368            .expect("sink known to exist")
3369            .write_frontier;
3370        let as_of = ctx.read_hold.least_valid_read();
3371        assert!(
3372            write_frontier.iter().all(|t| as_of.less_than(t)),
3373            "{:?} should be strictly less than {:?}",
3374            &*as_of,
3375            &**write_frontier
3376        );
3377
3378        // Parse the `create_sql` so we can update it to the new sink definition.
3379        //
3380        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3381        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3382        // names in the `create_sql` may have changed, for example due to a schema swap.
3383        let create_sql = &old_sink.create_sql;
3384        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3385        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3386            unreachable!("invalid statement kind for sink");
3387        };
3388
3389        // Update the sink version.
3390        stmt.with_options
3391            .retain(|o| o.name != CreateSinkOptionName::Version);
3392        stmt.with_options.push(CreateSinkOption {
3393            name: CreateSinkOptionName::Version,
3394            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3395                sink_plan.version.to_string(),
3396            ))),
3397        });
3398
3399        let conn_catalog = self.catalog().for_system_session();
3400        let (mut stmt, resolved_ids) =
3401            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3402
3403        // Update the `from` relation.
3404        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3405        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3406        stmt.from = ResolvedItemName::Item {
3407            id: from_entry.id(),
3408            qualifiers: from_entry.name.qualifiers.clone(),
3409            full_name,
3410            print_id: true,
3411            version: from_entry.version,
3412        };
3413
3414        let new_sink = Sink {
3415            create_sql: stmt.to_ast_string_stable(),
3416            global_id,
3417            from: sink_plan.from,
3418            connection: sink_plan.connection.clone(),
3419            envelope: sink_plan.envelope,
3420            version: sink_plan.version,
3421            with_snapshot,
3422            resolved_ids: resolved_ids.clone(),
3423            cluster_id: in_cluster,
3424        };
3425
3426        let ops = vec![catalog::Op::UpdateItem {
3427            id: item_id,
3428            name: entry.name().clone(),
3429            to_item: CatalogItem::Sink(new_sink),
3430        }];
3431
3432        match self
3433            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3434            .await
3435        {
3436            Ok(()) => {}
3437            Err(err) => {
3438                ctx.retire(Err(err));
3439                return;
3440            }
3441        }
3442
3443        let storage_sink_desc = StorageSinkDesc {
3444            from: sink_plan.from,
3445            from_desc: from_entry
3446                .desc_opt()
3447                .expect("sinks can only be built on items with descs")
3448                .into_owned(),
3449            connection: sink_plan
3450                .connection
3451                .clone()
3452                .into_inline_connection(self.catalog().state()),
3453            envelope: sink_plan.envelope,
3454            as_of,
3455            with_snapshot,
3456            version: sink_plan.version,
3457            from_storage_metadata: (),
3458            to_storage_metadata: (),
3459        };
3460
3461        self.controller
3462            .storage
3463            .alter_export(
3464                global_id,
3465                ExportDescription {
3466                    sink: storage_sink_desc,
3467                    instance_id: in_cluster,
3468                },
3469            )
3470            .await
3471            .unwrap_or_terminate("cannot fail to alter source desc");
3472
3473        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3474    }
3475
3476    #[instrument]
3477    pub(super) async fn sequence_alter_connection(
3478        &mut self,
3479        ctx: ExecuteContext,
3480        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3481    ) {
3482        match action {
3483            AlterConnectionAction::RotateKeys => {
3484                self.sequence_rotate_keys(ctx, id).await;
3485            }
3486            AlterConnectionAction::AlterOptions {
3487                set_options,
3488                drop_options,
3489                validate,
3490            } => {
3491                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3492                    .await
3493            }
3494        }
3495    }
3496
3497    #[instrument]
3498    async fn sequence_alter_connection_options(
3499        &mut self,
3500        mut ctx: ExecuteContext,
3501        id: CatalogItemId,
3502        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3503        drop_options: BTreeSet<ConnectionOptionName>,
3504        validate: bool,
3505    ) {
3506        let cur_entry = self.catalog().get_entry(&id);
3507        let cur_conn = cur_entry.connection().expect("known to be connection");
3508        let connection_gid = cur_conn.global_id();
3509
3510        let inner = || -> Result<Connection, AdapterError> {
3511            // Parse statement.
3512            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3513                .expect("invalid create sql persisted to catalog")
3514                .into_element()
3515                .ast
3516            {
3517                Statement::CreateConnection(stmt) => stmt,
3518                _ => unreachable!("proved type is source"),
3519            };
3520
3521            let catalog = self.catalog().for_system_session();
3522
3523            // Resolve items in statement
3524            let (mut create_conn_stmt, resolved_ids) =
3525                mz_sql::names::resolve(&catalog, create_conn_stmt)
3526                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3527
3528            // Retain options that are neither set nor dropped.
3529            create_conn_stmt
3530                .values
3531                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3532
3533            // Set new values
3534            create_conn_stmt.values.extend(
3535                set_options
3536                    .into_iter()
3537                    .map(|(name, value)| ConnectionOption { name, value }),
3538            );
3539
3540            // Open a new catalog, which we will use to re-plan our
3541            // statement with the desired config.
3542            let mut catalog = self.catalog().for_system_session();
3543            catalog.mark_id_unresolvable_for_replanning(id);
3544
3545            // Re-define our source in terms of the amended statement
3546            let plan = match mz_sql::plan::plan(
3547                None,
3548                &catalog,
3549                Statement::CreateConnection(create_conn_stmt),
3550                &Params::empty(),
3551                &resolved_ids,
3552            )
3553            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3554            {
3555                Plan::CreateConnection(plan) => plan,
3556                _ => unreachable!("create source plan is only valid response"),
3557            };
3558
3559            // Parse statement.
3560            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3561                .expect("invalid create sql persisted to catalog")
3562                .into_element()
3563                .ast
3564            {
3565                Statement::CreateConnection(stmt) => stmt,
3566                _ => unreachable!("proved type is source"),
3567            };
3568
3569            let catalog = self.catalog().for_system_session();
3570
3571            // Resolve items in statement
3572            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3573                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3574
3575            Ok(Connection {
3576                create_sql: plan.connection.create_sql,
3577                global_id: cur_conn.global_id,
3578                details: plan.connection.details,
3579                resolved_ids: new_deps,
3580            })
3581        };
3582
3583        let conn = match inner() {
3584            Ok(conn) => conn,
3585            Err(e) => {
3586                return ctx.retire(Err(e));
3587            }
3588        };
3589
3590        if validate {
3591            let connection = conn
3592                .details
3593                .to_connection()
3594                .into_inline_connection(self.catalog().state());
3595
3596            let internal_cmd_tx = self.internal_cmd_tx.clone();
3597            let transient_revision = self.catalog().transient_revision();
3598            let conn_id = ctx.session().conn_id().clone();
3599            let otel_ctx = OpenTelemetryContext::obtain();
3600            let role_metadata = ctx.session().role_metadata().clone();
3601            let current_storage_parameters = self.controller.storage.config().clone();
3602
3603            task::spawn(
3604                || format!("validate_alter_connection:{conn_id}"),
3605                async move {
3606                    let resolved_ids = conn.resolved_ids.clone();
3607                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3608                    let result = match connection.validate(id, &current_storage_parameters).await {
3609                        Ok(()) => Ok(conn),
3610                        Err(err) => Err(err.into()),
3611                    };
3612
3613                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3614                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3615                        AlterConnectionValidationReady {
3616                            ctx,
3617                            result,
3618                            connection_id: id,
3619                            connection_gid,
3620                            plan_validity: PlanValidity::new(
3621                                transient_revision,
3622                                dependency_ids.clone(),
3623                                None,
3624                                None,
3625                                role_metadata,
3626                            ),
3627                            otel_ctx,
3628                            resolved_ids,
3629                        },
3630                    ));
3631                    if let Err(e) = result {
3632                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3633                    }
3634                },
3635            );
3636        } else {
3637            let result = self
3638                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3639                .await;
3640            ctx.retire(result);
3641        }
3642    }
3643
3644    #[instrument]
3645    pub(crate) async fn sequence_alter_connection_stage_finish(
3646        &mut self,
3647        session: &mut Session,
3648        id: CatalogItemId,
3649        connection: Connection,
3650    ) -> Result<ExecuteResponse, AdapterError> {
3651        match self.catalog.get_entry(&id).item() {
3652            CatalogItem::Connection(curr_conn) => {
3653                curr_conn
3654                    .details
3655                    .to_connection()
3656                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3657                    .map_err(StorageError::from)?;
3658            }
3659            _ => unreachable!("known to be a connection"),
3660        };
3661
3662        let ops = vec![catalog::Op::UpdateItem {
3663            id,
3664            name: self.catalog.get_entry(&id).name().clone(),
3665            to_item: CatalogItem::Connection(connection.clone()),
3666        }];
3667
3668        self.catalog_transact(Some(session), ops).await?;
3669
3670        match connection.details {
3671            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3672                let spec = VpcEndpointConfig {
3673                    aws_service_name: privatelink.service_name.to_owned(),
3674                    availability_zone_ids: privatelink.availability_zones.to_owned(),
3675                };
3676                self.cloud_resource_controller
3677                    .as_ref()
3678                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3679                    .ensure_vpc_endpoint(id, spec)
3680                    .await?;
3681            }
3682            _ => {}
3683        };
3684
3685        let entry = self.catalog().get_entry(&id);
3686
3687        let mut connections = VecDeque::new();
3688        connections.push_front(entry.id());
3689
3690        let mut source_connections = BTreeMap::new();
3691        let mut sink_connections = BTreeMap::new();
3692        let mut source_export_data_configs = BTreeMap::new();
3693
3694        while let Some(id) = connections.pop_front() {
3695            for id in self.catalog.get_entry(&id).used_by() {
3696                let entry = self.catalog.get_entry(id);
3697                match entry.item() {
3698                    CatalogItem::Connection(_) => connections.push_back(*id),
3699                    CatalogItem::Source(source) => {
3700                        let desc = match &entry.source().expect("known to be source").data_source {
3701                            DataSourceDesc::Ingestion { desc, .. }
3702                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3703                                desc.clone().into_inline_connection(self.catalog().state())
3704                            }
3705                            _ => unreachable!("only ingestions reference connections"),
3706                        };
3707
3708                        source_connections.insert(source.global_id, desc.connection);
3709                    }
3710                    CatalogItem::Sink(sink) => {
3711                        let export = entry.sink().expect("known to be sink");
3712                        sink_connections.insert(
3713                            sink.global_id,
3714                            export
3715                                .connection
3716                                .clone()
3717                                .into_inline_connection(self.catalog().state()),
3718                        );
3719                    }
3720                    CatalogItem::Table(table) => {
3721                        // This is a source-fed table that reference a schema registry
3722                        // connection as a part of its encoding / data config
3723                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
3724                            let data_config = export_data_config.clone();
3725                            source_export_data_configs.insert(
3726                                table.global_id_writes(),
3727                                data_config.into_inline_connection(self.catalog().state()),
3728                            );
3729                        }
3730                    }
3731                    t => unreachable!("connection dependency not expected on {:?}", t),
3732                }
3733            }
3734        }
3735
3736        if !source_connections.is_empty() {
3737            self.controller
3738                .storage
3739                .alter_ingestion_connections(source_connections)
3740                .await
3741                .unwrap_or_terminate("cannot fail to alter ingestion connection");
3742        }
3743
3744        if !sink_connections.is_empty() {
3745            self.controller
3746                .storage
3747                .alter_export_connections(sink_connections)
3748                .await
3749                .unwrap_or_terminate("altering exports after txn must succeed");
3750        }
3751
3752        if !source_export_data_configs.is_empty() {
3753            self.controller
3754                .storage
3755                .alter_ingestion_export_data_configs(source_export_data_configs)
3756                .await
3757                .unwrap_or_terminate("altering source export data configs after txn must succeed");
3758        }
3759
3760        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3761    }
3762
3763    #[instrument]
3764    pub(super) async fn sequence_alter_source(
3765        &mut self,
3766        session: &Session,
3767        plan::AlterSourcePlan {
3768            item_id,
3769            ingestion_id,
3770            action,
3771        }: plan::AlterSourcePlan,
3772    ) -> Result<ExecuteResponse, AdapterError> {
3773        let cur_entry = self.catalog().get_entry(&item_id);
3774        let cur_source = cur_entry.source().expect("known to be source");
3775
3776        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3777            // Parse statement.
3778            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3779                .expect("invalid create sql persisted to catalog")
3780                .into_element()
3781                .ast
3782            {
3783                Statement::CreateSource(stmt) => stmt,
3784                _ => unreachable!("proved type is source"),
3785            };
3786
3787            let catalog = coord.catalog().for_system_session();
3788
3789            // Resolve items in statement
3790            mz_sql::names::resolve(&catalog, create_source_stmt)
3791                .map_err(|e| AdapterError::internal(err_cx, e))
3792        };
3793
3794        match action {
3795            plan::AlterSourceAction::AddSubsourceExports {
3796                subsources,
3797                options,
3798            } => {
3799                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3800
3801                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3802                    text_columns: mut new_text_columns,
3803                    exclude_columns: mut new_exclude_columns,
3804                    ..
3805                } = options.try_into()?;
3806
3807                // Resolve items in statement
3808                let (mut create_source_stmt, resolved_ids) =
3809                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3810
3811                // Get all currently referred-to items
3812                let catalog = self.catalog();
3813                let curr_references: BTreeSet<_> = catalog
3814                    .get_entry(&item_id)
3815                    .used_by()
3816                    .into_iter()
3817                    .filter_map(|subsource| {
3818                        catalog
3819                            .get_entry(subsource)
3820                            .subsource_details()
3821                            .map(|(_id, reference, _details)| reference)
3822                    })
3823                    .collect();
3824
3825                // We are doing a lot of unwrapping, so just make an error to reference; all of
3826                // these invariants are guaranteed to be true because of how we plan subsources.
3827                let purification_err =
3828                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3829
3830                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3831                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3832                // statement.
3833                match &mut create_source_stmt.connection {
3834                    CreateSourceConnection::Postgres {
3835                        options: curr_options,
3836                        ..
3837                    } => {
3838                        let mz_sql::plan::PgConfigOptionExtracted {
3839                            mut text_columns, ..
3840                        } = curr_options.clone().try_into()?;
3841
3842                        // Drop text columns; we will add them back in
3843                        // as appropriate below.
3844                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3845
3846                        // Drop all text columns that are not currently referred to.
3847                        text_columns.retain(|column_qualified_reference| {
3848                            mz_ore::soft_assert_eq_or_log!(
3849                                column_qualified_reference.0.len(),
3850                                4,
3851                                "all TEXT COLUMNS values must be column-qualified references"
3852                            );
3853                            let mut table = column_qualified_reference.clone();
3854                            table.0.truncate(3);
3855                            curr_references.contains(&table)
3856                        });
3857
3858                        // Merge the current text columns into the new text columns.
3859                        new_text_columns.extend(text_columns);
3860
3861                        // If we have text columns, add them to the options.
3862                        if !new_text_columns.is_empty() {
3863                            new_text_columns.sort();
3864                            let new_text_columns = new_text_columns
3865                                .into_iter()
3866                                .map(WithOptionValue::UnresolvedItemName)
3867                                .collect();
3868
3869                            curr_options.push(PgConfigOption {
3870                                name: PgConfigOptionName::TextColumns,
3871                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3872                            });
3873                        }
3874                    }
3875                    CreateSourceConnection::MySql {
3876                        options: curr_options,
3877                        ..
3878                    } => {
3879                        let mz_sql::plan::MySqlConfigOptionExtracted {
3880                            mut text_columns,
3881                            mut exclude_columns,
3882                            ..
3883                        } = curr_options.clone().try_into()?;
3884
3885                        // Drop both ignore and text columns; we will add them back in
3886                        // as appropriate below.
3887                        curr_options.retain(|o| {
3888                            !matches!(
3889                                o.name,
3890                                MySqlConfigOptionName::TextColumns
3891                                    | MySqlConfigOptionName::ExcludeColumns
3892                            )
3893                        });
3894
3895                        // Drop all text / exclude columns that are not currently referred to.
3896                        let column_referenced =
3897                            |column_qualified_reference: &UnresolvedItemName| {
3898                                mz_ore::soft_assert_eq_or_log!(
3899                                    column_qualified_reference.0.len(),
3900                                    3,
3901                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3902                                );
3903                                let mut table = column_qualified_reference.clone();
3904                                table.0.truncate(2);
3905                                curr_references.contains(&table)
3906                            };
3907                        text_columns.retain(column_referenced);
3908                        exclude_columns.retain(column_referenced);
3909
3910                        // Merge the current text / exclude columns into the new text / exclude columns.
3911                        new_text_columns.extend(text_columns);
3912                        new_exclude_columns.extend(exclude_columns);
3913
3914                        // If we have text columns, add them to the options.
3915                        if !new_text_columns.is_empty() {
3916                            new_text_columns.sort();
3917                            let new_text_columns = new_text_columns
3918                                .into_iter()
3919                                .map(WithOptionValue::UnresolvedItemName)
3920                                .collect();
3921
3922                            curr_options.push(MySqlConfigOption {
3923                                name: MySqlConfigOptionName::TextColumns,
3924                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3925                            });
3926                        }
3927                        // If we have exclude columns, add them to the options.
3928                        if !new_exclude_columns.is_empty() {
3929                            new_exclude_columns.sort();
3930                            let new_exclude_columns = new_exclude_columns
3931                                .into_iter()
3932                                .map(WithOptionValue::UnresolvedItemName)
3933                                .collect();
3934
3935                            curr_options.push(MySqlConfigOption {
3936                                name: MySqlConfigOptionName::ExcludeColumns,
3937                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3938                            });
3939                        }
3940                    }
3941                    CreateSourceConnection::SqlServer {
3942                        options: curr_options,
3943                        ..
3944                    } => {
3945                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3946                            mut text_columns,
3947                            mut exclude_columns,
3948                            ..
3949                        } = curr_options.clone().try_into()?;
3950
3951                        // Drop both ignore and text columns; we will add them back in
3952                        // as appropriate below.
3953                        curr_options.retain(|o| {
3954                            !matches!(
3955                                o.name,
3956                                SqlServerConfigOptionName::TextColumns
3957                                    | SqlServerConfigOptionName::ExcludeColumns
3958                            )
3959                        });
3960
3961                        // Drop all text / exclude columns that are not currently referred to.
3962                        let column_referenced =
3963                            |column_qualified_reference: &UnresolvedItemName| {
3964                                mz_ore::soft_assert_eq_or_log!(
3965                                    column_qualified_reference.0.len(),
3966                                    3,
3967                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3968                                );
3969                                let mut table = column_qualified_reference.clone();
3970                                table.0.truncate(2);
3971                                curr_references.contains(&table)
3972                            };
3973                        text_columns.retain(column_referenced);
3974                        exclude_columns.retain(column_referenced);
3975
3976                        // Merge the current text / exclude columns into the new text / exclude columns.
3977                        new_text_columns.extend(text_columns);
3978                        new_exclude_columns.extend(exclude_columns);
3979
3980                        // If we have text columns, add them to the options.
3981                        if !new_text_columns.is_empty() {
3982                            new_text_columns.sort();
3983                            let new_text_columns = new_text_columns
3984                                .into_iter()
3985                                .map(WithOptionValue::UnresolvedItemName)
3986                                .collect();
3987
3988                            curr_options.push(SqlServerConfigOption {
3989                                name: SqlServerConfigOptionName::TextColumns,
3990                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3991                            });
3992                        }
3993                        // If we have exclude columns, add them to the options.
3994                        if !new_exclude_columns.is_empty() {
3995                            new_exclude_columns.sort();
3996                            let new_exclude_columns = new_exclude_columns
3997                                .into_iter()
3998                                .map(WithOptionValue::UnresolvedItemName)
3999                                .collect();
4000
4001                            curr_options.push(SqlServerConfigOption {
4002                                name: SqlServerConfigOptionName::ExcludeColumns,
4003                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4004                            });
4005                        }
4006                    }
4007                    _ => return Err(purification_err()),
4008                };
4009
4010                let mut catalog = self.catalog().for_system_session();
4011                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4012
4013                // Re-define our source in terms of the amended statement
4014                let plan = match mz_sql::plan::plan(
4015                    None,
4016                    &catalog,
4017                    Statement::CreateSource(create_source_stmt),
4018                    &Params::empty(),
4019                    &resolved_ids,
4020                )
4021                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4022                {
4023                    Plan::CreateSource(plan) => plan,
4024                    _ => unreachable!("create source plan is only valid response"),
4025                };
4026
4027                // Asserting that we've done the right thing with dependencies
4028                // here requires mocking out objects in the catalog, which is a
4029                // large task for an operation we have to cover in tests anyway.
4030                let source = Source::new(
4031                    plan,
4032                    cur_source.global_id,
4033                    resolved_ids,
4034                    cur_source.custom_logical_compaction_window,
4035                    cur_source.is_retained_metrics_object,
4036                );
4037
4038                // Get new ingestion description for storage.
4039                let desc = match &source.data_source {
4040                    DataSourceDesc::Ingestion { desc, .. }
4041                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4042                        desc.clone().into_inline_connection(self.catalog().state())
4043                    }
4044                    _ => unreachable!("already verified of type ingestion"),
4045                };
4046
4047                self.controller
4048                    .storage
4049                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4050                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4051
4052                // Redefine source. This must be done before we create any new
4053                // subsources so that it has the right ingestion.
4054                let mut ops = vec![catalog::Op::UpdateItem {
4055                    id: item_id,
4056                    // Look this up again so we don't have to hold an immutable reference to the
4057                    // entry for so long.
4058                    name: self.catalog.get_entry(&item_id).name().clone(),
4059                    to_item: CatalogItem::Source(source),
4060                }];
4061
4062                let CreateSourceInner {
4063                    ops: new_ops,
4064                    sources: _,
4065                    if_not_exists_ids,
4066                } = self.create_source_inner(session, subsources).await?;
4067
4068                ops.extend(new_ops.into_iter());
4069
4070                assert!(
4071                    if_not_exists_ids.is_empty(),
4072                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4073                );
4074
4075                self.catalog_transact(Some(session), ops).await?;
4076            }
4077            plan::AlterSourceAction::RefreshReferences { references } => {
4078                self.catalog_transact(
4079                    Some(session),
4080                    vec![catalog::Op::UpdateSourceReferences {
4081                        source_id: item_id,
4082                        references: references.into(),
4083                    }],
4084                )
4085                .await?;
4086            }
4087        }
4088
4089        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4090    }
4091
4092    #[instrument]
4093    pub(super) async fn sequence_alter_system_set(
4094        &mut self,
4095        session: &Session,
4096        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4097    ) -> Result<ExecuteResponse, AdapterError> {
4098        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4099        // We want to ensure that the network policy we're switching too actually exists.
4100        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4101            self.validate_alter_system_network_policy(session, &value)?;
4102        }
4103
4104        let op = match value {
4105            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4106                name: name.clone(),
4107                value: OwnedVarInput::SqlSet(values),
4108            },
4109            plan::VariableValue::Default => {
4110                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4111            }
4112        };
4113        self.catalog_transact(Some(session), vec![op]).await?;
4114
4115        session.add_notice(AdapterNotice::VarDefaultUpdated {
4116            role: None,
4117            var_name: Some(name),
4118        });
4119        Ok(ExecuteResponse::AlteredSystemConfiguration)
4120    }
4121
4122    #[instrument]
4123    pub(super) async fn sequence_alter_system_reset(
4124        &mut self,
4125        session: &Session,
4126        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4127    ) -> Result<ExecuteResponse, AdapterError> {
4128        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4129        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4130        self.catalog_transact(Some(session), vec![op]).await?;
4131        session.add_notice(AdapterNotice::VarDefaultUpdated {
4132            role: None,
4133            var_name: Some(name),
4134        });
4135        Ok(ExecuteResponse::AlteredSystemConfiguration)
4136    }
4137
4138    #[instrument]
4139    pub(super) async fn sequence_alter_system_reset_all(
4140        &mut self,
4141        session: &Session,
4142        _: plan::AlterSystemResetAllPlan,
4143    ) -> Result<ExecuteResponse, AdapterError> {
4144        self.is_user_allowed_to_alter_system(session, None)?;
4145        let op = catalog::Op::ResetAllSystemConfiguration;
4146        self.catalog_transact(Some(session), vec![op]).await?;
4147        session.add_notice(AdapterNotice::VarDefaultUpdated {
4148            role: None,
4149            var_name: None,
4150        });
4151        Ok(ExecuteResponse::AlteredSystemConfiguration)
4152    }
4153
4154    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4155    fn is_user_allowed_to_alter_system(
4156        &self,
4157        session: &Session,
4158        var_name: Option<&str>,
4159    ) -> Result<(), AdapterError> {
4160        match (session.user().kind(), var_name) {
4161            // Only internal superusers can reset all system variables.
4162            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4163            // Whether or not a variable can be modified depends if we're an internal superuser.
4164            (UserKind::Superuser, Some(name))
4165                if session.user().is_internal()
4166                    || self.catalog().system_config().user_modifiable(name) =>
4167            {
4168                // In lieu of plumbing the user to all system config functions, just check that
4169                // the var is visible.
4170                let var = self.catalog().system_config().get(name)?;
4171                var.visible(session.user(), self.catalog().system_config())?;
4172                Ok(())
4173            }
4174            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4175            // session variables.
4176            (UserKind::Regular, Some(name))
4177                if self.catalog().system_config().user_modifiable(name) =>
4178            {
4179                Err(AdapterError::Unauthorized(
4180                    rbac::UnauthorizedError::Superuser {
4181                        action: format!("toggle the '{name}' system configuration parameter"),
4182                    },
4183                ))
4184            }
4185            _ => Err(AdapterError::Unauthorized(
4186                rbac::UnauthorizedError::MzSystem {
4187                    action: "alter system".into(),
4188                },
4189            )),
4190        }
4191    }
4192
4193    fn validate_alter_system_network_policy(
4194        &self,
4195        session: &Session,
4196        policy_value: &plan::VariableValue,
4197    ) -> Result<(), AdapterError> {
4198        let policy_name = match &policy_value {
4199            // Make sure the compiled in default still exists.
4200            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4201            plan::VariableValue::Values(values) if values.len() == 1 => {
4202                values.iter().next().cloned()
4203            }
4204            plan::VariableValue::Values(values) => {
4205                tracing::warn!(?values, "can't set multiple network policies at once");
4206                None
4207            }
4208        };
4209        let maybe_network_policy = policy_name
4210            .as_ref()
4211            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4212        let Some(network_policy) = maybe_network_policy else {
4213            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4214                VarError::InvalidParameterValue {
4215                    name: NETWORK_POLICY.name(),
4216                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4217                    reason: "no network policy with such name exists".to_string(),
4218                },
4219            )));
4220        };
4221        self.validate_alter_network_policy(session, &network_policy.rules)
4222    }
4223
4224    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4225    ///
4226    /// This helps prevent users from modifying network policies in a way that would lock out their
4227    /// current connection.
4228    fn validate_alter_network_policy(
4229        &self,
4230        session: &Session,
4231        policy_rules: &Vec<NetworkPolicyRule>,
4232    ) -> Result<(), AdapterError> {
4233        // If the user is not an internal user attempt to protect them from
4234        // blocking themselves.
4235        if session.user().is_internal() {
4236            return Ok(());
4237        }
4238        if let Some(ip) = session.meta().client_ip() {
4239            validate_ip_with_policy_rules(ip, policy_rules)
4240                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4241        } else {
4242            // Sessions without IPs are only temporarily constructed for default values
4243            // they should not be permitted here.
4244            return Err(AdapterError::NetworkPolicyDenied(
4245                NetworkPolicyError::MissingIp,
4246            ));
4247        }
4248        Ok(())
4249    }
4250
4251    // Returns the name of the portal to execute.
4252    #[instrument]
4253    pub(super) fn sequence_execute(
4254        &mut self,
4255        session: &mut Session,
4256        plan: plan::ExecutePlan,
4257    ) -> Result<String, AdapterError> {
4258        // Verify the stmt is still valid.
4259        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4260        let ps = session
4261            .get_prepared_statement_unverified(&plan.name)
4262            .expect("known to exist");
4263        let stmt = ps.stmt().cloned();
4264        let desc = ps.desc().clone();
4265        let state_revision = ps.state_revision;
4266        let logging = Arc::clone(ps.logging());
4267        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4268    }
4269
4270    #[instrument]
4271    pub(super) async fn sequence_grant_privileges(
4272        &mut self,
4273        session: &Session,
4274        plan::GrantPrivilegesPlan {
4275            update_privileges,
4276            grantees,
4277        }: plan::GrantPrivilegesPlan,
4278    ) -> Result<ExecuteResponse, AdapterError> {
4279        self.sequence_update_privileges(
4280            session,
4281            update_privileges,
4282            grantees,
4283            UpdatePrivilegeVariant::Grant,
4284        )
4285        .await
4286    }
4287
4288    #[instrument]
4289    pub(super) async fn sequence_revoke_privileges(
4290        &mut self,
4291        session: &Session,
4292        plan::RevokePrivilegesPlan {
4293            update_privileges,
4294            revokees,
4295        }: plan::RevokePrivilegesPlan,
4296    ) -> Result<ExecuteResponse, AdapterError> {
4297        self.sequence_update_privileges(
4298            session,
4299            update_privileges,
4300            revokees,
4301            UpdatePrivilegeVariant::Revoke,
4302        )
4303        .await
4304    }
4305
4306    #[instrument]
4307    async fn sequence_update_privileges(
4308        &mut self,
4309        session: &Session,
4310        update_privileges: Vec<UpdatePrivilege>,
4311        grantees: Vec<RoleId>,
4312        variant: UpdatePrivilegeVariant,
4313    ) -> Result<ExecuteResponse, AdapterError> {
4314        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4315        let mut warnings = Vec::new();
4316        let catalog = self.catalog().for_session(session);
4317
4318        for UpdatePrivilege {
4319            acl_mode,
4320            target_id,
4321            grantor,
4322        } in update_privileges
4323        {
4324            let actual_object_type = catalog.get_system_object_type(&target_id);
4325            // For all relations we allow all applicable table privileges, but send a warning if the
4326            // privilege isn't actually applicable to the object type.
4327            if actual_object_type.is_relation() {
4328                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4329                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4330                if !non_applicable_privileges.is_empty() {
4331                    let object_description =
4332                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4333                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4334                        non_applicable_privileges,
4335                        object_description,
4336                    })
4337                }
4338            }
4339
4340            if let SystemObjectId::Object(object_id) = &target_id {
4341                self.catalog()
4342                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4343            }
4344
4345            let privileges = self
4346                .catalog()
4347                .get_privileges(&target_id, session.conn_id())
4348                // Should be unreachable since the parser will refuse to parse grant/revoke
4349                // statements on objects without privileges.
4350                .ok_or(AdapterError::Unsupported(
4351                    "GRANTs/REVOKEs on an object type with no privileges",
4352                ))?;
4353
4354            for grantee in &grantees {
4355                self.catalog().ensure_not_system_role(grantee)?;
4356                self.catalog().ensure_not_predefined_role(grantee)?;
4357                let existing_privilege = privileges
4358                    .get_acl_item(grantee, &grantor)
4359                    .map(Cow::Borrowed)
4360                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4361
4362                match variant {
4363                    UpdatePrivilegeVariant::Grant
4364                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4365                    {
4366                        ops.push(catalog::Op::UpdatePrivilege {
4367                            target_id: target_id.clone(),
4368                            privilege: MzAclItem {
4369                                grantee: *grantee,
4370                                grantor,
4371                                acl_mode,
4372                            },
4373                            variant,
4374                        });
4375                    }
4376                    UpdatePrivilegeVariant::Revoke
4377                        if !existing_privilege
4378                            .acl_mode
4379                            .intersection(acl_mode)
4380                            .is_empty() =>
4381                    {
4382                        ops.push(catalog::Op::UpdatePrivilege {
4383                            target_id: target_id.clone(),
4384                            privilege: MzAclItem {
4385                                grantee: *grantee,
4386                                grantor,
4387                                acl_mode,
4388                            },
4389                            variant,
4390                        });
4391                    }
4392                    // no-op
4393                    _ => {}
4394                }
4395            }
4396        }
4397
4398        if ops.is_empty() {
4399            session.add_notices(warnings);
4400            return Ok(variant.into());
4401        }
4402
4403        let res = self
4404            .catalog_transact(Some(session), ops)
4405            .await
4406            .map(|_| match variant {
4407                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4408                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4409            });
4410        if res.is_ok() {
4411            session.add_notices(warnings);
4412        }
4413        res
4414    }
4415
4416    #[instrument]
4417    pub(super) async fn sequence_alter_default_privileges(
4418        &mut self,
4419        session: &Session,
4420        plan::AlterDefaultPrivilegesPlan {
4421            privilege_objects,
4422            privilege_acl_items,
4423            is_grant,
4424        }: plan::AlterDefaultPrivilegesPlan,
4425    ) -> Result<ExecuteResponse, AdapterError> {
4426        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4427        let variant = if is_grant {
4428            UpdatePrivilegeVariant::Grant
4429        } else {
4430            UpdatePrivilegeVariant::Revoke
4431        };
4432        for privilege_object in &privilege_objects {
4433            self.catalog()
4434                .ensure_not_system_role(&privilege_object.role_id)?;
4435            self.catalog()
4436                .ensure_not_predefined_role(&privilege_object.role_id)?;
4437            if let Some(database_id) = privilege_object.database_id {
4438                self.catalog()
4439                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4440            }
4441            if let Some(schema_id) = privilege_object.schema_id {
4442                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4443                let schema_spec: SchemaSpecifier = schema_id.into();
4444
4445                self.catalog().ensure_not_reserved_object(
4446                    &(database_spec, schema_spec).into(),
4447                    session.conn_id(),
4448                )?;
4449            }
4450            for privilege_acl_item in &privilege_acl_items {
4451                self.catalog()
4452                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4453                self.catalog()
4454                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4455                ops.push(catalog::Op::UpdateDefaultPrivilege {
4456                    privilege_object: privilege_object.clone(),
4457                    privilege_acl_item: privilege_acl_item.clone(),
4458                    variant,
4459                })
4460            }
4461        }
4462
4463        self.catalog_transact(Some(session), ops).await?;
4464        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4465    }
4466
4467    #[instrument]
4468    pub(super) async fn sequence_grant_role(
4469        &mut self,
4470        session: &Session,
4471        plan::GrantRolePlan {
4472            role_ids,
4473            member_ids,
4474            grantor_id,
4475        }: plan::GrantRolePlan,
4476    ) -> Result<ExecuteResponse, AdapterError> {
4477        let catalog = self.catalog();
4478        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4479        for role_id in role_ids {
4480            for member_id in &member_ids {
4481                let member_membership: BTreeSet<_> =
4482                    catalog.get_role(member_id).membership().keys().collect();
4483                if member_membership.contains(&role_id) {
4484                    let role_name = catalog.get_role(&role_id).name().to_string();
4485                    let member_name = catalog.get_role(member_id).name().to_string();
4486                    // We need this check so we don't accidentally return a success on a reserved role.
4487                    catalog.ensure_not_reserved_role(member_id)?;
4488                    catalog.ensure_grantable_role(&role_id)?;
4489                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4490                        role_name,
4491                        member_name,
4492                    });
4493                } else {
4494                    ops.push(catalog::Op::GrantRole {
4495                        role_id,
4496                        member_id: *member_id,
4497                        grantor_id,
4498                    });
4499                }
4500            }
4501        }
4502
4503        if ops.is_empty() {
4504            return Ok(ExecuteResponse::GrantedRole);
4505        }
4506
4507        self.catalog_transact(Some(session), ops)
4508            .await
4509            .map(|_| ExecuteResponse::GrantedRole)
4510    }
4511
4512    #[instrument]
4513    pub(super) async fn sequence_revoke_role(
4514        &mut self,
4515        session: &Session,
4516        plan::RevokeRolePlan {
4517            role_ids,
4518            member_ids,
4519            grantor_id,
4520        }: plan::RevokeRolePlan,
4521    ) -> Result<ExecuteResponse, AdapterError> {
4522        let catalog = self.catalog();
4523        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4524        for role_id in role_ids {
4525            for member_id in &member_ids {
4526                let member_membership: BTreeSet<_> =
4527                    catalog.get_role(member_id).membership().keys().collect();
4528                if !member_membership.contains(&role_id) {
4529                    let role_name = catalog.get_role(&role_id).name().to_string();
4530                    let member_name = catalog.get_role(member_id).name().to_string();
4531                    // We need this check so we don't accidentally return a success on a reserved role.
4532                    catalog.ensure_not_reserved_role(member_id)?;
4533                    catalog.ensure_grantable_role(&role_id)?;
4534                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4535                        role_name,
4536                        member_name,
4537                    });
4538                } else {
4539                    ops.push(catalog::Op::RevokeRole {
4540                        role_id,
4541                        member_id: *member_id,
4542                        grantor_id,
4543                    });
4544                }
4545            }
4546        }
4547
4548        if ops.is_empty() {
4549            return Ok(ExecuteResponse::RevokedRole);
4550        }
4551
4552        self.catalog_transact(Some(session), ops)
4553            .await
4554            .map(|_| ExecuteResponse::RevokedRole)
4555    }
4556
4557    #[instrument]
4558    pub(super) async fn sequence_alter_owner(
4559        &mut self,
4560        session: &Session,
4561        plan::AlterOwnerPlan {
4562            id,
4563            object_type,
4564            new_owner,
4565        }: plan::AlterOwnerPlan,
4566    ) -> Result<ExecuteResponse, AdapterError> {
4567        let mut ops = vec![catalog::Op::UpdateOwner {
4568            id: id.clone(),
4569            new_owner,
4570        }];
4571
4572        match &id {
4573            ObjectId::Item(global_id) => {
4574                let entry = self.catalog().get_entry(global_id);
4575
4576                // Cannot directly change the owner of an index.
4577                if entry.is_index() {
4578                    let name = self
4579                        .catalog()
4580                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4581                        .to_string();
4582                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4583                    return Ok(ExecuteResponse::AlteredObject(object_type));
4584                }
4585
4586                // Alter owner cascades down to dependent indexes.
4587                let dependent_index_ops = entry
4588                    .used_by()
4589                    .into_iter()
4590                    .filter(|id| self.catalog().get_entry(id).is_index())
4591                    .map(|id| catalog::Op::UpdateOwner {
4592                        id: ObjectId::Item(*id),
4593                        new_owner,
4594                    });
4595                ops.extend(dependent_index_ops);
4596
4597                // Alter owner cascades down to progress collections.
4598                let dependent_subsources =
4599                    entry
4600                        .progress_id()
4601                        .into_iter()
4602                        .map(|item_id| catalog::Op::UpdateOwner {
4603                            id: ObjectId::Item(item_id),
4604                            new_owner,
4605                        });
4606                ops.extend(dependent_subsources);
4607            }
4608            ObjectId::Cluster(cluster_id) => {
4609                let cluster = self.catalog().get_cluster(*cluster_id);
4610                // Alter owner cascades down to cluster replicas.
4611                let managed_cluster_replica_ops =
4612                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4613                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4614                        new_owner,
4615                    });
4616                ops.extend(managed_cluster_replica_ops);
4617            }
4618            _ => {}
4619        }
4620
4621        self.catalog_transact(Some(session), ops)
4622            .await
4623            .map(|_| ExecuteResponse::AlteredObject(object_type))
4624    }
4625
4626    #[instrument]
4627    pub(super) async fn sequence_reassign_owned(
4628        &mut self,
4629        session: &Session,
4630        plan::ReassignOwnedPlan {
4631            old_roles,
4632            new_role,
4633            reassign_ids,
4634        }: plan::ReassignOwnedPlan,
4635    ) -> Result<ExecuteResponse, AdapterError> {
4636        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4637            self.catalog().ensure_not_reserved_role(role_id)?;
4638        }
4639
4640        let ops = reassign_ids
4641            .into_iter()
4642            .map(|id| catalog::Op::UpdateOwner {
4643                id,
4644                new_owner: new_role,
4645            })
4646            .collect();
4647
4648        self.catalog_transact(Some(session), ops)
4649            .await
4650            .map(|_| ExecuteResponse::ReassignOwned)
4651    }
4652
4653    #[instrument]
4654    pub(crate) async fn handle_deferred_statement(&mut self) {
4655        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4656        // was processed, removing the single element from deferred_statements, so it is expected
4657        // that this is sometimes empty.
4658        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4659            return;
4660        };
4661        match ps {
4662            crate::coord::PlanStatement::Statement { stmt, params } => {
4663                self.handle_execute_inner(stmt, params, ctx).await;
4664            }
4665            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4666                self.sequence_plan(ctx, plan, resolved_ids).await;
4667            }
4668        }
4669    }
4670
4671    #[instrument]
4672    // TODO(parkmycar): Remove this once we have an actual implementation.
4673    #[allow(clippy::unused_async)]
4674    pub(super) async fn sequence_alter_table(
4675        &mut self,
4676        ctx: &mut ExecuteContext,
4677        plan: plan::AlterTablePlan,
4678    ) -> Result<ExecuteResponse, AdapterError> {
4679        let plan::AlterTablePlan {
4680            relation_id,
4681            column_name,
4682            column_type,
4683            raw_sql_type,
4684        } = plan;
4685
4686        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4687        let id_ts = self.get_catalog_write_ts().await;
4688        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4689        let ops = vec![catalog::Op::AlterAddColumn {
4690            id: relation_id,
4691            new_global_id,
4692            name: column_name,
4693            typ: column_type,
4694            sql: raw_sql_type,
4695        }];
4696
4697        self.catalog_transact_with_context(None, Some(ctx), ops)
4698            .await?;
4699
4700        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4701    }
4702
4703    #[instrument]
4704    pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4705        &mut self,
4706        ctx: &mut ExecuteContext,
4707        plan: AlterMaterializedViewApplyReplacementPlan,
4708    ) -> Result<ExecuteResponse, AdapterError> {
4709        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4710
4711        // TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
4712        // new MV's as-of, to ensure no times are skipped.
4713
4714        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4715        self.catalog_transact(Some(ctx.session()), ops).await?;
4716
4717        Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4718    }
4719
4720    pub(super) async fn statistics_oracle(
4721        &self,
4722        session: &Session,
4723        source_ids: &BTreeSet<GlobalId>,
4724        query_as_of: &Antichain<Timestamp>,
4725        is_oneshot: bool,
4726    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4727        super::statistics_oracle(
4728            session,
4729            source_ids,
4730            query_as_of,
4731            is_oneshot,
4732            self.catalog().system_config(),
4733            self.controller.storage_collections.as_ref(),
4734        )
4735        .await
4736    }
4737}
4738
4739impl Coordinator {
4740    /// Process the metainfo from a newly created non-transient dataflow.
4741    async fn process_dataflow_metainfo(
4742        &mut self,
4743        df_meta: DataflowMetainfo,
4744        export_id: GlobalId,
4745        ctx: Option<&mut ExecuteContext>,
4746        notice_ids: Vec<GlobalId>,
4747    ) -> Option<BuiltinTableAppendNotify> {
4748        // Emit raw notices to the user.
4749        if let Some(ctx) = ctx {
4750            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4751        }
4752
4753        // Create a metainfo with rendered notices.
4754        let df_meta = self
4755            .catalog()
4756            .render_notices(df_meta, notice_ids, Some(export_id));
4757
4758        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4759        // in-memory state.
4760        if self.catalog().state().system_config().enable_mz_notices()
4761            && !df_meta.optimizer_notices.is_empty()
4762        {
4763            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4764            self.catalog().state().pack_optimizer_notices(
4765                &mut builtin_table_updates,
4766                df_meta.optimizer_notices.iter(),
4767                Diff::ONE,
4768            );
4769
4770            // Save the metainfo.
4771            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4772
4773            Some(
4774                self.builtin_table_update()
4775                    .execute(builtin_table_updates)
4776                    .await
4777                    .0,
4778            )
4779        } else {
4780            // Save the metainfo.
4781            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4782
4783            None
4784        }
4785    }
4786}