Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43    RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48    SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58    SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62    StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106    WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114    WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119/// A future that resolves to a real-time recency timestamp.
120type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = return_if_err!(res, ctx);
255                    f(ctx, next);
256                }
257                _ = rx, if cancel_enabled => {
258                    ctx.retire(Err(AdapterError::Canceled));
259                }
260            }
261        });
262    }
263
264    async fn create_source_inner(
265        &self,
266        session: &Session,
267        plans: Vec<plan::CreateSourcePlanBundle>,
268    ) -> Result<CreateSourceInner, AdapterError> {
269        let mut ops = vec![];
270        let mut sources = vec![];
271
272        let if_not_exists_ids = plans
273            .iter()
274            .filter_map(
275                |plan::CreateSourcePlanBundle {
276                     item_id,
277                     global_id: _,
278                     plan,
279                     resolved_ids: _,
280                     available_source_references: _,
281                 }| {
282                    if plan.if_not_exists {
283                        Some((*item_id, plan.name.clone()))
284                    } else {
285                        None
286                    }
287                },
288            )
289            .collect::<BTreeMap<_, _>>();
290
291        for plan::CreateSourcePlanBundle {
292            item_id,
293            global_id,
294            mut plan,
295            resolved_ids,
296            available_source_references,
297        } in plans
298        {
299            let name = plan.name.clone();
300
301            match plan.source.data_source {
302                plan::DataSourceDesc::Ingestion(ref desc)
303                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304                    let cluster_id = plan
305                        .in_cluster
306                        .expect("ingestion plans must specify cluster");
307                    match desc.connection {
308                        GenericSourceConnection::Postgres(_)
309                        | GenericSourceConnection::MySql(_)
310                        | GenericSourceConnection::SqlServer(_)
311                        | GenericSourceConnection::Kafka(_)
312                        | GenericSourceConnection::LoadGenerator(_) => {
313                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315                                    .get(self.catalog().system_config().dyncfgs());
316
317                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318                                {
319                                    return Err(AdapterError::Unsupported(
320                                        "sources in clusters with >1 replicas",
321                                    ));
322                                }
323                            }
324                        }
325                    }
326                }
327                plan::DataSourceDesc::Webhook { .. } => {
328                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331                            .get(self.catalog().system_config().dyncfgs());
332
333                        if !enable_multi_replica_sources {
334                            if cluster.replica_ids().len() > 1 {
335                                return Err(AdapterError::Unsupported(
336                                    "webhook sources in clusters with >1 replicas",
337                                ));
338                            }
339                        }
340                    }
341                }
342                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343            }
344
345            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
346            if let mz_sql::plan::DataSourceDesc::Webhook {
347                validate_using: Some(validate),
348                ..
349            } = &mut plan.source.data_source
350            {
351                if let Err(reason) = validate.reduce_expression().await {
352                    self.metrics
353                        .webhook_validation_reduce_failures
354                        .with_label_values(&[reason])
355                        .inc();
356                    return Err(AdapterError::Internal(format!(
357                        "failed to reduce check expression, {reason}"
358                    )));
359                }
360            }
361
362            // If this source contained a set of available source references, update the
363            // source references catalog table.
364            let mut reference_ops = vec![];
365            if let Some(references) = &available_source_references {
366                reference_ops.push(catalog::Op::UpdateSourceReferences {
367                    source_id: item_id,
368                    references: references.clone().into(),
369                });
370            }
371
372            let source = Source::new(plan, global_id, resolved_ids, None, false);
373            ops.push(catalog::Op::CreateItem {
374                id: item_id,
375                name,
376                item: CatalogItem::Source(source.clone()),
377                owner_id: *session.current_role_id(),
378            });
379            sources.push((item_id, source));
380            // These operations must be executed after the source is added to the catalog.
381            ops.extend(reference_ops);
382        }
383
384        Ok(CreateSourceInner {
385            ops,
386            sources,
387            if_not_exists_ids,
388        })
389    }
390
391    /// Subsources are planned differently from other statements because they
392    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
393    /// Because of this, we have usually "missed" the opportunity to plan them
394    /// through the normal statement execution life cycle (the exception being
395    /// during bootstrapping).
396    ///
397    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
398    pub(crate) fn plan_subsource(
399        &self,
400        session: &Session,
401        params: &mz_sql::plan::Params,
402        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403        item_id: CatalogItemId,
404        global_id: GlobalId,
405    ) -> Result<CreateSourcePlanBundle, AdapterError> {
406        let catalog = self.catalog().for_session(session);
407        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409        let plan = self.plan_statement(
410            session,
411            Statement::CreateSubsource(subsource_stmt),
412            params,
413            &resolved_ids,
414        )?;
415        let plan = match plan {
416            Plan::CreateSource(plan) => plan,
417            _ => unreachable!(),
418        };
419        Ok(CreateSourcePlanBundle {
420            item_id,
421            global_id,
422            plan,
423            resolved_ids,
424            available_source_references: None,
425        })
426    }
427
428    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
429    pub(crate) async fn plan_purified_alter_source_add_subsource(
430        &mut self,
431        session: &Session,
432        params: Params,
433        source_name: ResolvedItemName,
434        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436    ) -> Result<(Plan, ResolvedIds), AdapterError> {
437        let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439        // Generate subsource statements
440        let conn_catalog = self.catalog().for_system_session();
441        let pcx = plan::PlanContext::zero();
442        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444        let entry = self.catalog().get_entry(source_name.item_id());
445        let source = entry.source().ok_or_else(|| {
446            AdapterError::internal(
447                "plan alter source",
448                format!("expected Source found {entry:?}"),
449            )
450        })?;
451
452        let item_id = entry.id();
453        let ingestion_id = source.global_id();
454        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456        let ids = self
457            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
458            .await?;
459        for (subsource_stmt, (item_id, global_id)) in
460            subsource_stmts.into_iter().zip_eq(ids.into_iter())
461        {
462            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
463            subsource_plans.push(s);
464        }
465
466        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
467            subsources: subsource_plans,
468            options,
469        };
470
471        Ok((
472            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
473                item_id,
474                ingestion_id,
475                action,
476            }),
477            ResolvedIds::empty(),
478        ))
479    }
480
481    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
482    pub(crate) fn plan_purified_alter_source_refresh_references(
483        &self,
484        _session: &Session,
485        _params: Params,
486        source_name: ResolvedItemName,
487        available_source_references: plan::SourceReferences,
488    ) -> Result<(Plan, ResolvedIds), AdapterError> {
489        let entry = self.catalog().get_entry(source_name.item_id());
490        let source = entry.source().ok_or_else(|| {
491            AdapterError::internal(
492                "plan alter source",
493                format!("expected Source found {entry:?}"),
494            )
495        })?;
496        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
497            references: available_source_references,
498        };
499
500        Ok((
501            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
502                item_id: entry.id(),
503                ingestion_id: source.global_id(),
504                action,
505            }),
506            ResolvedIds::empty(),
507        ))
508    }
509
510    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
511    /// the primary source, and any ingestion export subsources (e.g. PG
512    /// tables).
513    pub(crate) async fn plan_purified_create_source(
514        &mut self,
515        ctx: &ExecuteContext,
516        params: Params,
517        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
518        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
519        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
520        available_source_references: plan::SourceReferences,
521    ) -> Result<(Plan, ResolvedIds), AdapterError> {
522        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
523
524        // 1. First plan the progress subsource, if any.
525        if let Some(progress_stmt) = progress_stmt {
526            // The primary source depends on this subsource because the primary
527            // source needs to know its shard ID, and the easiest way of
528            // guaranteeing that the shard ID is discoverable is to create this
529            // collection first.
530            assert_none!(progress_stmt.of_source);
531            let (item_id, global_id) = self.allocate_user_id().await?;
532            let progress_plan =
533                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
534            let progress_full_name = self
535                .catalog()
536                .resolve_full_name(&progress_plan.plan.name, None);
537            let progress_subsource = ResolvedItemName::Item {
538                id: progress_plan.item_id,
539                qualifiers: progress_plan.plan.name.qualifiers.clone(),
540                full_name: progress_full_name,
541                print_id: true,
542                version: RelationVersionSelector::Latest,
543            };
544
545            create_source_plans.push(progress_plan);
546
547            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548        }
549
550        let catalog = self.catalog().for_session(ctx.session());
551        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553        let propagated_with_options: Vec<_> = source_stmt
554            .with_options
555            .iter()
556            .filter_map(|opt| match opt.name {
557                CreateSourceOptionName::TimestampInterval => None,
558                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559                    name: CreateSubsourceOptionName::RetainHistory,
560                    value: opt.value.clone(),
561                }),
562            })
563            .collect();
564
565        // 2. Then plan the main source.
566        let source_plan = match self.plan_statement(
567            ctx.session(),
568            Statement::CreateSource(source_stmt),
569            &params,
570            &resolved_ids,
571        )? {
572            Plan::CreateSource(plan) => plan,
573            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574        };
575
576        let (item_id, global_id) = self.allocate_user_id().await?;
577
578        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579        let of_source = ResolvedItemName::Item {
580            id: item_id,
581            qualifiers: source_plan.name.qualifiers.clone(),
582            full_name: source_full_name,
583            print_id: true,
584            version: RelationVersionSelector::Latest,
585        };
586
587        // Generate subsource statements
588        let conn_catalog = self.catalog().for_system_session();
589        let pcx = plan::PlanContext::zero();
590        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594        for subsource_stmt in subsource_stmts.iter_mut() {
595            subsource_stmt
596                .with_options
597                .extend(propagated_with_options.iter().cloned())
598        }
599
600        create_source_plans.push(CreateSourcePlanBundle {
601            item_id,
602            global_id,
603            plan: source_plan,
604            resolved_ids: resolved_ids.clone(),
605            available_source_references: Some(available_source_references),
606        });
607
608        // 3. Finally, plan all the subsources
609        let ids = self
610            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
611            .await?;
612        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
613            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
614            create_source_plans.push(plan);
615        }
616
617        Ok((
618            Plan::CreateSources(create_source_plans),
619            ResolvedIds::empty(),
620        ))
621    }
622
623    #[instrument]
624    pub(super) async fn sequence_create_source(
625        &mut self,
626        ctx: &mut ExecuteContext,
627        plans: Vec<plan::CreateSourcePlanBundle>,
628    ) -> Result<ExecuteResponse, AdapterError> {
629        let CreateSourceInner {
630            ops,
631            sources,
632            if_not_exists_ids,
633        } = self.create_source_inner(ctx.session(), plans).await?;
634
635        let transact_result = self
636            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
637            .await;
638
639        // Check if any sources are webhook sources and report them as created.
640        for (item_id, source) in &sources {
641            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
642                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
643                    ctx.session()
644                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
645                }
646            }
647        }
648
649        match transact_result {
650            Ok(()) => Ok(ExecuteResponse::CreatedSource),
651            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
652                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
653            })) if if_not_exists_ids.contains_key(&id) => {
654                ctx.session()
655                    .add_notice(AdapterNotice::ObjectAlreadyExists {
656                        name: if_not_exists_ids[&id].item.clone(),
657                        ty: "source",
658                    });
659                Ok(ExecuteResponse::CreatedSource)
660            }
661            Err(err) => Err(err),
662        }
663    }
664
665    #[instrument]
666    pub(super) async fn sequence_create_connection(
667        &mut self,
668        mut ctx: ExecuteContext,
669        plan: plan::CreateConnectionPlan,
670        resolved_ids: ResolvedIds,
671    ) {
672        let (connection_id, connection_gid) = match self.allocate_user_id().await {
673            Ok(item_id) => item_id,
674            Err(err) => return ctx.retire(Err(err)),
675        };
676
677        match &plan.connection.details {
678            ConnectionDetails::Ssh { key_1, key_2, .. } => {
679                let key_1 = match key_1.as_key_pair() {
680                    Some(key_1) => key_1.clone(),
681                    None => {
682                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
683                            "the PUBLIC KEY 1 option cannot be explicitly specified"
684                        ))));
685                    }
686                };
687
688                let key_2 = match key_2.as_key_pair() {
689                    Some(key_2) => key_2.clone(),
690                    None => {
691                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
692                            "the PUBLIC KEY 2 option cannot be explicitly specified"
693                        ))));
694                    }
695                };
696
697                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
698                let secret = key_set.to_bytes();
699                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
700                    return ctx.retire(Err(err.into()));
701                }
702            }
703            _ => (),
704        };
705
706        if plan.validate {
707            let internal_cmd_tx = self.internal_cmd_tx.clone();
708            let transient_revision = self.catalog().transient_revision();
709            let conn_id = ctx.session().conn_id().clone();
710            let otel_ctx = OpenTelemetryContext::obtain();
711            let role_metadata = ctx.session().role_metadata().clone();
712
713            let connection = plan
714                .connection
715                .details
716                .to_connection()
717                .into_inline_connection(self.catalog().state());
718
719            let current_storage_parameters = self.controller.storage.config().clone();
720            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
721                let result = match connection
722                    .validate(connection_id, &current_storage_parameters)
723                    .await
724                {
725                    Ok(()) => Ok(plan),
726                    Err(err) => Err(err.into()),
727                };
728
729                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
730                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
731                    CreateConnectionValidationReady {
732                        ctx,
733                        result,
734                        connection_id,
735                        connection_gid,
736                        plan_validity: PlanValidity::new(
737                            transient_revision,
738                            resolved_ids.items().copied().collect(),
739                            None,
740                            None,
741                            role_metadata,
742                        ),
743                        otel_ctx,
744                        resolved_ids: resolved_ids.clone(),
745                    },
746                ));
747                if let Err(e) = result {
748                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
749                }
750            });
751        } else {
752            let result = self
753                .sequence_create_connection_stage_finish(
754                    &mut ctx,
755                    connection_id,
756                    connection_gid,
757                    plan,
758                    resolved_ids,
759                )
760                .await;
761            ctx.retire(result);
762        }
763    }
764
765    #[instrument]
766    pub(crate) async fn sequence_create_connection_stage_finish(
767        &mut self,
768        ctx: &mut ExecuteContext,
769        connection_id: CatalogItemId,
770        connection_gid: GlobalId,
771        plan: plan::CreateConnectionPlan,
772        resolved_ids: ResolvedIds,
773    ) -> Result<ExecuteResponse, AdapterError> {
774        let ops = vec![catalog::Op::CreateItem {
775            id: connection_id,
776            name: plan.name.clone(),
777            item: CatalogItem::Connection(Connection {
778                create_sql: plan.connection.create_sql,
779                global_id: connection_gid,
780                details: plan.connection.details.clone(),
781                resolved_ids,
782            }),
783            owner_id: *ctx.session().current_role_id(),
784        }];
785
786        // VPC endpoint creation for AWS PrivateLink connections is now handled
787        // in apply_catalog_implications.
788        let conn_id = ctx.session().conn_id().clone();
789        let transact_result = self
790            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
791            .await;
792
793        match transact_result {
794            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
795            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
796                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
797            })) if plan.if_not_exists => {
798                ctx.session()
799                    .add_notice(AdapterNotice::ObjectAlreadyExists {
800                        name: plan.name.item,
801                        ty: "connection",
802                    });
803                Ok(ExecuteResponse::CreatedConnection)
804            }
805            Err(err) => Err(err),
806        }
807    }
808
809    #[instrument]
810    pub(super) async fn sequence_create_database(
811        &mut self,
812        session: &Session,
813        plan: plan::CreateDatabasePlan,
814    ) -> Result<ExecuteResponse, AdapterError> {
815        let ops = vec![catalog::Op::CreateDatabase {
816            name: plan.name.clone(),
817            owner_id: *session.current_role_id(),
818        }];
819        match self.catalog_transact(Some(session), ops).await {
820            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
821            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
822                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
823            })) if plan.if_not_exists => {
824                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
825                Ok(ExecuteResponse::CreatedDatabase)
826            }
827            Err(err) => Err(err),
828        }
829    }
830
831    #[instrument]
832    pub(super) async fn sequence_create_schema(
833        &mut self,
834        session: &Session,
835        plan: plan::CreateSchemaPlan,
836    ) -> Result<ExecuteResponse, AdapterError> {
837        let op = catalog::Op::CreateSchema {
838            database_id: plan.database_spec,
839            schema_name: plan.schema_name.clone(),
840            owner_id: *session.current_role_id(),
841        };
842        match self.catalog_transact(Some(session), vec![op]).await {
843            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
844            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
845                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
846            })) if plan.if_not_exists => {
847                session.add_notice(AdapterNotice::SchemaAlreadyExists {
848                    name: plan.schema_name,
849                });
850                Ok(ExecuteResponse::CreatedSchema)
851            }
852            Err(err) => Err(err),
853        }
854    }
855
856    /// Validates the role attributes for a `CREATE ROLE` statement.
857    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
858        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
859            if attributes.superuser.is_some()
860                || attributes.password.is_some()
861                || attributes.login.is_some()
862            {
863                return Err(AdapterError::UnavailableFeature {
864                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
865                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
866                });
867            }
868        }
869        Ok(())
870    }
871
872    #[instrument]
873    pub(super) async fn sequence_create_role(
874        &mut self,
875        conn_id: Option<&ConnectionId>,
876        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
877    ) -> Result<ExecuteResponse, AdapterError> {
878        self.validate_role_attributes(&attributes.clone())?;
879        let op = catalog::Op::CreateRole { name, attributes };
880        self.catalog_transact_with_context(conn_id, None, vec![op])
881            .await
882            .map(|_| ExecuteResponse::CreatedRole)
883    }
884
885    #[instrument]
886    pub(super) async fn sequence_create_network_policy(
887        &mut self,
888        session: &Session,
889        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
890    ) -> Result<ExecuteResponse, AdapterError> {
891        let op = catalog::Op::CreateNetworkPolicy {
892            rules,
893            name,
894            owner_id: *session.current_role_id(),
895        };
896        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
897            .await
898            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
899    }
900
901    #[instrument]
902    pub(super) async fn sequence_alter_network_policy(
903        &mut self,
904        session: &Session,
905        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
906    ) -> Result<ExecuteResponse, AdapterError> {
907        // TODO(network_policy): Consider role based network policies here.
908        let current_network_policy_name =
909            self.catalog().system_config().default_network_policy_name();
910        // Check if the way we're alerting the policy is still valid for the current connection.
911        if current_network_policy_name == name {
912            self.validate_alter_network_policy(session, &rules)?;
913        }
914
915        let op = catalog::Op::AlterNetworkPolicy {
916            id,
917            rules,
918            name,
919            owner_id: *session.current_role_id(),
920        };
921        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
922            .await
923            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
924    }
925
926    #[instrument]
927    pub(super) async fn sequence_create_table(
928        &mut self,
929        ctx: &mut ExecuteContext,
930        plan: plan::CreateTablePlan,
931        resolved_ids: ResolvedIds,
932    ) -> Result<ExecuteResponse, AdapterError> {
933        let plan::CreateTablePlan {
934            name,
935            table,
936            if_not_exists,
937        } = plan;
938
939        let conn_id = if table.temporary {
940            Some(ctx.session().conn_id())
941        } else {
942            None
943        };
944        let (table_id, global_id) = self.allocate_user_id().await?;
945        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
946
947        let data_source = match table.data_source {
948            plan::TableDataSource::TableWrites { defaults } => {
949                TableDataSource::TableWrites { defaults }
950            }
951            plan::TableDataSource::DataSource {
952                desc: data_source_plan,
953                timeline,
954            } => match data_source_plan {
955                plan::DataSourceDesc::IngestionExport {
956                    ingestion_id,
957                    external_reference,
958                    details,
959                    data_config,
960                } => TableDataSource::DataSource {
961                    desc: DataSourceDesc::IngestionExport {
962                        ingestion_id,
963                        external_reference,
964                        details,
965                        data_config,
966                    },
967                    timeline,
968                },
969                plan::DataSourceDesc::Webhook {
970                    validate_using,
971                    body_format,
972                    headers,
973                    cluster_id,
974                } => TableDataSource::DataSource {
975                    desc: DataSourceDesc::Webhook {
976                        validate_using,
977                        body_format,
978                        headers,
979                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
980                    },
981                    timeline,
982                },
983                o => {
984                    unreachable!("CREATE TABLE data source got {:?}", o)
985                }
986            },
987        };
988
989        let is_webhook = if let TableDataSource::DataSource {
990            desc: DataSourceDesc::Webhook { .. },
991            timeline: _,
992        } = &data_source
993        {
994            true
995        } else {
996            false
997        };
998
999        let table = Table {
1000            create_sql: Some(table.create_sql),
1001            desc: table.desc,
1002            collections,
1003            conn_id: conn_id.cloned(),
1004            resolved_ids,
1005            custom_logical_compaction_window: table.compaction_window,
1006            is_retained_metrics_object: false,
1007            data_source,
1008        };
1009        let ops = vec![catalog::Op::CreateItem {
1010            id: table_id,
1011            name: name.clone(),
1012            item: CatalogItem::Table(table.clone()),
1013            owner_id: *ctx.session().current_role_id(),
1014        }];
1015
1016        let catalog_result = self
1017            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1018            .await;
1019
1020        if is_webhook {
1021            // try_get_webhook_url will make up a URL for things that are not
1022            // webhooks, so we guard against that here.
1023            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1024                ctx.session()
1025                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1026            }
1027        }
1028
1029        match catalog_result {
1030            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1031            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1032                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1033            })) if if_not_exists => {
1034                ctx.session_mut()
1035                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1036                        name: name.item,
1037                        ty: "table",
1038                    });
1039                Ok(ExecuteResponse::CreatedTable)
1040            }
1041            Err(err) => Err(err),
1042        }
1043    }
1044
1045    #[instrument]
1046    pub(super) async fn sequence_create_sink(
1047        &mut self,
1048        ctx: ExecuteContext,
1049        plan: plan::CreateSinkPlan,
1050        resolved_ids: ResolvedIds,
1051    ) {
1052        let plan::CreateSinkPlan {
1053            name,
1054            sink,
1055            with_snapshot,
1056            if_not_exists,
1057            in_cluster,
1058        } = plan;
1059
1060        // First try to allocate an ID and an OID. If either fails, we're done.
1061        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1062
1063        let catalog_sink = Sink {
1064            create_sql: sink.create_sql,
1065            global_id,
1066            from: sink.from,
1067            connection: sink.connection,
1068            envelope: sink.envelope,
1069            version: sink.version,
1070            with_snapshot,
1071            resolved_ids,
1072            cluster_id: in_cluster,
1073            commit_interval: sink.commit_interval,
1074        };
1075
1076        let ops = vec![catalog::Op::CreateItem {
1077            id: item_id,
1078            name: name.clone(),
1079            item: CatalogItem::Sink(catalog_sink.clone()),
1080            owner_id: *ctx.session().current_role_id(),
1081        }];
1082
1083        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1084
1085        match result {
1086            Ok(()) => {}
1087            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1088                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1089            })) if if_not_exists => {
1090                ctx.session()
1091                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1092                        name: name.item,
1093                        ty: "sink",
1094                    });
1095                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1096                return;
1097            }
1098            Err(e) => {
1099                ctx.retire(Err(e));
1100                return;
1101            }
1102        };
1103
1104        self.create_storage_export(global_id, &catalog_sink)
1105            .await
1106            .unwrap_or_terminate("cannot fail to create exports");
1107
1108        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1109            .await;
1110
1111        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1112    }
1113
1114    /// Validates that a view definition does not contain any expressions that may lead to
1115    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1116    ///
1117    /// We prevent these expressions so that we can add columns to system tables without
1118    /// changing the definition of the view.
1119    ///
1120    /// Here is a bit of a hand wavy proof as to why we only need to check the
1121    /// immediate view definition for system objects and ambiguous column
1122    /// references, and not the entire dependency tree:
1123    ///
1124    ///   - A view with no object references cannot have any ambiguous column
1125    ///   references to a system object, because it has no system objects.
1126    ///   - A view with a direct reference to a system object and a * or
1127    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1128    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1129    ///   any ambiguous column references to a system object, because all column
1130    ///   references are explicitly named.
1131    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1132    ///   system object cannot have any ambiguous column references to a system
1133    ///   object, because there are no system objects in the top level view and
1134    ///   all sub-views are guaranteed to have no ambiguous column references to
1135    ///   system objects.
1136    pub(super) fn validate_system_column_references(
1137        &self,
1138        uses_ambiguous_columns: bool,
1139        depends_on: &BTreeSet<GlobalId>,
1140    ) -> Result<(), AdapterError> {
1141        if uses_ambiguous_columns
1142            && depends_on
1143                .iter()
1144                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1145        {
1146            Err(AdapterError::AmbiguousSystemColumnReference)
1147        } else {
1148            Ok(())
1149        }
1150    }
1151
1152    #[instrument]
1153    pub(super) async fn sequence_create_type(
1154        &mut self,
1155        session: &Session,
1156        plan: plan::CreateTypePlan,
1157        resolved_ids: ResolvedIds,
1158    ) -> Result<ExecuteResponse, AdapterError> {
1159        let (item_id, global_id) = self.allocate_user_id().await?;
1160        // Validate the type definition (e.g., composite columns) before storing.
1161        plan.typ
1162            .inner
1163            .desc(&self.catalog().for_session(session))
1164            .map_err(AdapterError::from)?;
1165        let typ = Type {
1166            create_sql: Some(plan.typ.create_sql),
1167            global_id,
1168            details: CatalogTypeDetails {
1169                array_id: None,
1170                typ: plan.typ.inner,
1171                pg_metadata: None,
1172            },
1173            resolved_ids,
1174        };
1175        let op = catalog::Op::CreateItem {
1176            id: item_id,
1177            name: plan.name,
1178            item: CatalogItem::Type(typ),
1179            owner_id: *session.current_role_id(),
1180        };
1181        match self.catalog_transact(Some(session), vec![op]).await {
1182            Ok(()) => Ok(ExecuteResponse::CreatedType),
1183            Err(err) => Err(err),
1184        }
1185    }
1186
1187    #[instrument]
1188    pub(super) async fn sequence_comment_on(
1189        &mut self,
1190        session: &Session,
1191        plan: plan::CommentPlan,
1192    ) -> Result<ExecuteResponse, AdapterError> {
1193        let op = catalog::Op::Comment {
1194            object_id: plan.object_id,
1195            sub_component: plan.sub_component,
1196            comment: plan.comment,
1197        };
1198        self.catalog_transact(Some(session), vec![op]).await?;
1199        Ok(ExecuteResponse::Comment)
1200    }
1201
1202    #[instrument]
1203    pub(super) async fn sequence_drop_objects(
1204        &mut self,
1205        ctx: &mut ExecuteContext,
1206        plan::DropObjectsPlan {
1207            drop_ids,
1208            object_type,
1209            referenced_ids,
1210        }: plan::DropObjectsPlan,
1211    ) -> Result<ExecuteResponse, AdapterError> {
1212        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1213        let mut objects = Vec::new();
1214        for obj_id in &drop_ids {
1215            if !referenced_ids_hashset.contains(obj_id) {
1216                let object_info = ErrorMessageObjectDescription::from_id(
1217                    obj_id,
1218                    &self.catalog().for_session(ctx.session()),
1219                )
1220                .to_string();
1221                objects.push(object_info);
1222            }
1223        }
1224
1225        if !objects.is_empty() {
1226            ctx.session()
1227                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1228        }
1229
1230        // Collect GlobalIds for expression cache invalidation.
1231        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1232            .iter()
1233            .filter_map(|id| match id {
1234                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1235                _ => None,
1236            })
1237            .flatten()
1238            .collect();
1239
1240        let DropOps {
1241            ops,
1242            dropped_active_db,
1243            dropped_active_cluster,
1244            dropped_in_use_indexes,
1245        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1246
1247        self.catalog_transact_with_context(None, Some(ctx), ops)
1248            .await?;
1249
1250        // Invalidate expression cache entries for dropped objects.
1251        if !expr_cache_invalidate_ids.is_empty() {
1252            let _fut = self.catalog().update_expression_cache(
1253                Default::default(),
1254                Default::default(),
1255                expr_cache_invalidate_ids,
1256            );
1257        }
1258
1259        fail::fail_point!("after_sequencer_drop_replica");
1260
1261        if dropped_active_db {
1262            ctx.session()
1263                .add_notice(AdapterNotice::DroppedActiveDatabase {
1264                    name: ctx.session().vars().database().to_string(),
1265                });
1266        }
1267        if dropped_active_cluster {
1268            ctx.session()
1269                .add_notice(AdapterNotice::DroppedActiveCluster {
1270                    name: ctx.session().vars().cluster().to_string(),
1271                });
1272        }
1273        for dropped_in_use_index in dropped_in_use_indexes {
1274            ctx.session()
1275                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1276            self.metrics
1277                .optimization_notices
1278                .with_label_values(&["DroppedInUseIndex"])
1279                .inc_by(1);
1280        }
1281        Ok(ExecuteResponse::DroppedObject(object_type))
1282    }
1283
1284    fn validate_dropped_role_ownership(
1285        &self,
1286        session: &Session,
1287        dropped_roles: &BTreeMap<RoleId, &str>,
1288    ) -> Result<(), AdapterError> {
1289        fn privilege_check(
1290            privileges: &PrivilegeMap,
1291            dropped_roles: &BTreeMap<RoleId, &str>,
1292            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1293            object_id: &SystemObjectId,
1294            catalog: &ConnCatalog,
1295        ) {
1296            for privilege in privileges.all_values() {
1297                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1298                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1299                    let object_description =
1300                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1301                    dependent_objects
1302                        .entry(role_name.to_string())
1303                        .or_default()
1304                        .push(format!(
1305                            "privileges on {object_description} granted by {grantor_name}",
1306                        ));
1307                }
1308                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1309                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1310                    let object_description =
1311                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1312                    dependent_objects
1313                        .entry(role_name.to_string())
1314                        .or_default()
1315                        .push(format!(
1316                            "privileges granted on {object_description} to {grantee_name}"
1317                        ));
1318                }
1319            }
1320        }
1321
1322        let catalog = self.catalog().for_session(session);
1323        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1324        for entry in self.catalog.entries() {
1325            let id = SystemObjectId::Object(entry.id().into());
1326            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1327                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1328                dependent_objects
1329                    .entry(role_name.to_string())
1330                    .or_default()
1331                    .push(format!("owner of {object_description}"));
1332            }
1333            privilege_check(
1334                entry.privileges(),
1335                dropped_roles,
1336                &mut dependent_objects,
1337                &id,
1338                &catalog,
1339            );
1340        }
1341        for database in self.catalog.databases() {
1342            let database_id = SystemObjectId::Object(database.id().into());
1343            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1344                let object_description =
1345                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1346                dependent_objects
1347                    .entry(role_name.to_string())
1348                    .or_default()
1349                    .push(format!("owner of {object_description}"));
1350            }
1351            privilege_check(
1352                &database.privileges,
1353                dropped_roles,
1354                &mut dependent_objects,
1355                &database_id,
1356                &catalog,
1357            );
1358            for schema in database.schemas_by_id.values() {
1359                let schema_id = SystemObjectId::Object(
1360                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1361                );
1362                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1363                    let object_description =
1364                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1365                    dependent_objects
1366                        .entry(role_name.to_string())
1367                        .or_default()
1368                        .push(format!("owner of {object_description}"));
1369                }
1370                privilege_check(
1371                    &schema.privileges,
1372                    dropped_roles,
1373                    &mut dependent_objects,
1374                    &schema_id,
1375                    &catalog,
1376                );
1377            }
1378        }
1379        for cluster in self.catalog.clusters() {
1380            let cluster_id = SystemObjectId::Object(cluster.id().into());
1381            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1382                let object_description =
1383                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1384                dependent_objects
1385                    .entry(role_name.to_string())
1386                    .or_default()
1387                    .push(format!("owner of {object_description}"));
1388            }
1389            privilege_check(
1390                &cluster.privileges,
1391                dropped_roles,
1392                &mut dependent_objects,
1393                &cluster_id,
1394                &catalog,
1395            );
1396            for replica in cluster.replicas() {
1397                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1398                    let replica_id =
1399                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1400                    let object_description =
1401                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1402                    dependent_objects
1403                        .entry(role_name.to_string())
1404                        .or_default()
1405                        .push(format!("owner of {object_description}"));
1406                }
1407            }
1408        }
1409        privilege_check(
1410            self.catalog().system_privileges(),
1411            dropped_roles,
1412            &mut dependent_objects,
1413            &SystemObjectId::System,
1414            &catalog,
1415        );
1416        for (default_privilege_object, default_privilege_acl_items) in
1417            self.catalog.default_privileges()
1418        {
1419            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1420                dependent_objects
1421                    .entry(role_name.to_string())
1422                    .or_default()
1423                    .push(format!(
1424                        "default privileges on {}S created by {}",
1425                        default_privilege_object.object_type, role_name
1426                    ));
1427            }
1428            for default_privilege_acl_item in default_privilege_acl_items {
1429                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1430                    dependent_objects
1431                        .entry(role_name.to_string())
1432                        .or_default()
1433                        .push(format!(
1434                            "default privileges on {}S granted to {}",
1435                            default_privilege_object.object_type, role_name
1436                        ));
1437                }
1438            }
1439        }
1440
1441        if !dependent_objects.is_empty() {
1442            Err(AdapterError::DependentObject(dependent_objects))
1443        } else {
1444            Ok(())
1445        }
1446    }
1447
1448    #[instrument]
1449    pub(super) async fn sequence_drop_owned(
1450        &mut self,
1451        session: &Session,
1452        plan: plan::DropOwnedPlan,
1453    ) -> Result<ExecuteResponse, AdapterError> {
1454        for role_id in &plan.role_ids {
1455            self.catalog().ensure_not_reserved_role(role_id)?;
1456        }
1457
1458        let mut privilege_revokes = plan.privilege_revokes;
1459
1460        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1461        let session_catalog = self.catalog().for_session(session);
1462        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1463            && !session.is_superuser()
1464        {
1465            // Obtain all roles that the current session is a member of.
1466            let role_membership =
1467                session_catalog.collect_role_membership(session.current_role_id());
1468            let invalid_revokes: BTreeSet<_> = privilege_revokes
1469                .extract_if(.., |(_, privilege)| {
1470                    !role_membership.contains(&privilege.grantor)
1471                })
1472                .map(|(object_id, _)| object_id)
1473                .collect();
1474            for invalid_revoke in invalid_revokes {
1475                let object_description =
1476                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1477                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1478            }
1479        }
1480
1481        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1482            catalog::Op::UpdatePrivilege {
1483                target_id: object_id,
1484                privilege,
1485                variant: UpdatePrivilegeVariant::Revoke,
1486            }
1487        });
1488        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1489            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1490                privilege_object,
1491                privilege_acl_item,
1492                variant: UpdatePrivilegeVariant::Revoke,
1493            },
1494        );
1495        let DropOps {
1496            ops: drop_ops,
1497            dropped_active_db,
1498            dropped_active_cluster,
1499            dropped_in_use_indexes,
1500        } = self.sequence_drop_common(session, plan.drop_ids)?;
1501
1502        let ops = privilege_revoke_ops
1503            .chain(default_privilege_revoke_ops)
1504            .chain(drop_ops.into_iter())
1505            .collect();
1506
1507        self.catalog_transact(Some(session), ops).await?;
1508
1509        if dropped_active_db {
1510            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1511                name: session.vars().database().to_string(),
1512            });
1513        }
1514        if dropped_active_cluster {
1515            session.add_notice(AdapterNotice::DroppedActiveCluster {
1516                name: session.vars().cluster().to_string(),
1517            });
1518        }
1519        for dropped_in_use_index in dropped_in_use_indexes {
1520            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1521        }
1522        Ok(ExecuteResponse::DroppedOwned)
1523    }
1524
1525    fn sequence_drop_common(
1526        &self,
1527        session: &Session,
1528        ids: Vec<ObjectId>,
1529    ) -> Result<DropOps, AdapterError> {
1530        let mut dropped_active_db = false;
1531        let mut dropped_active_cluster = false;
1532        let mut dropped_in_use_indexes = Vec::new();
1533        let mut dropped_roles = BTreeMap::new();
1534        let mut dropped_databases = BTreeSet::new();
1535        let mut dropped_schemas = BTreeSet::new();
1536        // Dropping either the group role or the member role of a role membership will trigger a
1537        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1538        // role membership twice.
1539        let mut role_revokes = BTreeSet::new();
1540        // Dropping a database or a schema will revoke all default roles associated with that
1541        // database or schema.
1542        let mut default_privilege_revokes = BTreeSet::new();
1543
1544        // Clusters we're dropping
1545        let mut clusters_to_drop = BTreeSet::new();
1546
1547        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1548        for id in &ids {
1549            match id {
1550                ObjectId::Database(id) => {
1551                    let name = self.catalog().get_database(id).name();
1552                    if name == session.vars().database() {
1553                        dropped_active_db = true;
1554                    }
1555                    dropped_databases.insert(id);
1556                }
1557                ObjectId::Schema((_, spec)) => {
1558                    if let SchemaSpecifier::Id(id) = spec {
1559                        dropped_schemas.insert(id);
1560                    }
1561                }
1562                ObjectId::Cluster(id) => {
1563                    clusters_to_drop.insert(*id);
1564                    if let Some(active_id) = self
1565                        .catalog()
1566                        .active_cluster(session)
1567                        .ok()
1568                        .map(|cluster| cluster.id())
1569                    {
1570                        if id == &active_id {
1571                            dropped_active_cluster = true;
1572                        }
1573                    }
1574                }
1575                ObjectId::Role(id) => {
1576                    let role = self.catalog().get_role(id);
1577                    let name = role.name();
1578                    dropped_roles.insert(*id, name);
1579                    // We must revoke all role memberships that the dropped roles belongs to.
1580                    for (group_id, grantor_id) in &role.membership.map {
1581                        role_revokes.insert((*group_id, *id, *grantor_id));
1582                    }
1583                }
1584                ObjectId::Item(id) => {
1585                    if let Some(index) = self.catalog().get_entry(id).index() {
1586                        let humanizer = self.catalog().for_session(session);
1587                        let dependants = self
1588                            .controller
1589                            .compute
1590                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1591                            .ok()
1592                            .into_iter()
1593                            .flatten()
1594                            .filter(|dependant_id| {
1595                                // Transient Ids belong to Peeks. We are not interested for now in
1596                                // peeks depending on a dropped index.
1597                                // TODO: show a different notice in this case. Something like
1598                                // "There is an in-progress ad hoc SELECT that uses the dropped
1599                                // index. The resources used by the index will be freed when all
1600                                // such SELECTs complete."
1601                                if dependant_id.is_transient() {
1602                                    return false;
1603                                }
1604                                // The item should exist, but don't panic if it doesn't.
1605                                let Some(dependent_id) = humanizer
1606                                    .try_get_item_by_global_id(dependant_id)
1607                                    .map(|item| item.id())
1608                                else {
1609                                    return false;
1610                                };
1611                                // If the dependent object is also being dropped, then there is no
1612                                // problem, so we don't want a notice.
1613                                !ids_set.contains(&ObjectId::Item(dependent_id))
1614                            })
1615                            .flat_map(|dependant_id| {
1616                                // If we are not able to find a name for this ID it probably means
1617                                // we have already dropped the compute collection, in which case we
1618                                // can ignore it.
1619                                humanizer.humanize_id(dependant_id)
1620                            })
1621                            .collect_vec();
1622                        if !dependants.is_empty() {
1623                            dropped_in_use_indexes.push(DroppedInUseIndex {
1624                                index_name: humanizer
1625                                    .humanize_id(index.global_id())
1626                                    .unwrap_or_else(|| id.to_string()),
1627                                dependant_objects: dependants,
1628                            });
1629                        }
1630                    }
1631                }
1632                _ => {}
1633            }
1634        }
1635
1636        for id in &ids {
1637            match id {
1638                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1639                // unless they are internal replicas, which exist outside the scope
1640                // of managed clusters.
1641                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1642                    if !clusters_to_drop.contains(cluster_id) {
1643                        let cluster = self.catalog.get_cluster(*cluster_id);
1644                        if cluster.is_managed() {
1645                            let replica =
1646                                cluster.replica(*replica_id).expect("Catalog out of sync");
1647                            if !replica.config.location.internal() {
1648                                coord_bail!("cannot drop replica of managed cluster");
1649                            }
1650                        }
1651                    }
1652                }
1653                _ => {}
1654            }
1655        }
1656
1657        for role_id in dropped_roles.keys() {
1658            self.catalog().ensure_not_reserved_role(role_id)?;
1659        }
1660        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1661        // If any role is a member of a dropped role, then we must revoke that membership.
1662        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1663        for role in self.catalog().user_roles() {
1664            for dropped_role_id in
1665                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1666            {
1667                role_revokes.insert((
1668                    **dropped_role_id,
1669                    role.id(),
1670                    *role
1671                        .membership
1672                        .map
1673                        .get(*dropped_role_id)
1674                        .expect("included in keys above"),
1675                ));
1676            }
1677        }
1678
1679        for (default_privilege_object, default_privilege_acls) in
1680            self.catalog().default_privileges()
1681        {
1682            if matches!(
1683                &default_privilege_object.database_id,
1684                Some(database_id) if dropped_databases.contains(database_id),
1685            ) || matches!(
1686                &default_privilege_object.schema_id,
1687                Some(schema_id) if dropped_schemas.contains(schema_id),
1688            ) {
1689                for default_privilege_acl in default_privilege_acls {
1690                    default_privilege_revokes.insert((
1691                        default_privilege_object.clone(),
1692                        default_privilege_acl.clone(),
1693                    ));
1694                }
1695            }
1696        }
1697
1698        let ops = role_revokes
1699            .into_iter()
1700            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1701                role_id,
1702                member_id,
1703                grantor_id,
1704            })
1705            .chain(default_privilege_revokes.into_iter().map(
1706                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1707                    privilege_object,
1708                    privilege_acl_item,
1709                    variant: UpdatePrivilegeVariant::Revoke,
1710                },
1711            ))
1712            .chain(iter::once(catalog::Op::DropObjects(
1713                ids.into_iter()
1714                    .map(DropObjectInfo::manual_drop_from_object_id)
1715                    .collect(),
1716            )))
1717            .collect();
1718
1719        Ok(DropOps {
1720            ops,
1721            dropped_active_db,
1722            dropped_active_cluster,
1723            dropped_in_use_indexes,
1724        })
1725    }
1726
1727    pub(super) fn sequence_explain_schema(
1728        &self,
1729        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1730    ) -> Result<ExecuteResponse, AdapterError> {
1731        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1732            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1733        })?;
1734
1735        let json_string = json_string(&json_value);
1736        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1737        Ok(Self::send_immediate_rows(row))
1738    }
1739
1740    pub(super) fn sequence_show_all_variables(
1741        &self,
1742        session: &Session,
1743    ) -> Result<ExecuteResponse, AdapterError> {
1744        let mut rows = viewable_variables(self.catalog().state(), session)
1745            .map(|v| (v.name(), v.value(), v.description()))
1746            .collect::<Vec<_>>();
1747        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1748
1749        // TODO(parkmycar): Pack all of these into a single RowCollection.
1750        let rows: Vec<_> = rows
1751            .into_iter()
1752            .map(|(name, val, desc)| {
1753                Row::pack_slice(&[
1754                    Datum::String(name),
1755                    Datum::String(&val),
1756                    Datum::String(desc),
1757                ])
1758            })
1759            .collect();
1760        Ok(Self::send_immediate_rows(rows))
1761    }
1762
1763    pub(super) fn sequence_show_variable(
1764        &self,
1765        session: &Session,
1766        plan: plan::ShowVariablePlan,
1767    ) -> Result<ExecuteResponse, AdapterError> {
1768        if &plan.name == SCHEMA_ALIAS {
1769            let schemas = self.catalog.resolve_search_path(session);
1770            let schema = schemas.first();
1771            return match schema {
1772                Some((database_spec, schema_spec)) => {
1773                    let schema_name = &self
1774                        .catalog
1775                        .get_schema(database_spec, schema_spec, session.conn_id())
1776                        .name()
1777                        .schema;
1778                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1779                    Ok(Self::send_immediate_rows(row))
1780                }
1781                None => {
1782                    if session.vars().current_object_missing_warnings() {
1783                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1784                            search_path: session
1785                                .vars()
1786                                .search_path()
1787                                .into_iter()
1788                                .map(|schema| schema.to_string())
1789                                .collect(),
1790                        });
1791                    }
1792                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1793                }
1794            };
1795        }
1796
1797        let variable = session
1798            .vars()
1799            .get(self.catalog().system_config(), &plan.name)
1800            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1801
1802        // In lieu of plumbing the user to all system config functions, just check that the var is
1803        // visible.
1804        variable.visible(session.user(), self.catalog().system_config())?;
1805
1806        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1807        if variable.name() == vars::DATABASE.name()
1808            && matches!(
1809                self.catalog().resolve_database(&variable.value()),
1810                Err(CatalogError::UnknownDatabase(_))
1811            )
1812            && session.vars().current_object_missing_warnings()
1813        {
1814            let name = variable.value();
1815            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1816        } else if variable.name() == vars::CLUSTER.name()
1817            && matches!(
1818                self.catalog().resolve_cluster(&variable.value()),
1819                Err(CatalogError::UnknownCluster(_))
1820            )
1821            && session.vars().current_object_missing_warnings()
1822        {
1823            let name = variable.value();
1824            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1825        }
1826        Ok(Self::send_immediate_rows(row))
1827    }
1828
1829    #[instrument]
1830    pub(super) async fn sequence_inspect_shard(
1831        &self,
1832        session: &Session,
1833        plan: plan::InspectShardPlan,
1834    ) -> Result<ExecuteResponse, AdapterError> {
1835        // TODO: Not thrilled about this rbac special case here, but probably
1836        // sufficient for now.
1837        if !session.user().is_internal() {
1838            return Err(AdapterError::Unauthorized(
1839                rbac::UnauthorizedError::MzSystem {
1840                    action: "inspect".into(),
1841                },
1842            ));
1843        }
1844        let state = self
1845            .controller
1846            .storage
1847            .inspect_persist_state(plan.id)
1848            .await?;
1849        let jsonb = Jsonb::from_serde_json(state)?;
1850        Ok(Self::send_immediate_rows(jsonb.into_row()))
1851    }
1852
1853    #[instrument]
1854    pub(super) fn sequence_set_variable(
1855        &self,
1856        session: &mut Session,
1857        plan: plan::SetVariablePlan,
1858    ) -> Result<ExecuteResponse, AdapterError> {
1859        let (name, local) = (plan.name, plan.local);
1860        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1861            self.validate_set_isolation_level(session)?;
1862        }
1863        if &name == vars::CLUSTER.name() {
1864            self.validate_set_cluster(session)?;
1865        }
1866
1867        let vars = session.vars_mut();
1868        let values = match plan.value {
1869            plan::VariableValue::Default => None,
1870            plan::VariableValue::Values(values) => Some(values),
1871        };
1872
1873        match values {
1874            Some(values) => {
1875                vars.set(
1876                    self.catalog().system_config(),
1877                    &name,
1878                    VarInput::SqlSet(&values),
1879                    local,
1880                )?;
1881
1882                let vars = session.vars();
1883
1884                // Emit a warning when deprecated variables are used.
1885                // TODO(database-issues#8069) remove this after sufficient time has passed
1886                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1887                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1888                } else if name == vars::CLUSTER.name()
1889                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1890                {
1891                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1892                }
1893
1894                // Database or cluster value does not correspond to a catalog item.
1895                if name.as_str() == vars::DATABASE.name()
1896                    && matches!(
1897                        self.catalog().resolve_database(vars.database()),
1898                        Err(CatalogError::UnknownDatabase(_))
1899                    )
1900                    && session.vars().current_object_missing_warnings()
1901                {
1902                    let name = vars.database().to_string();
1903                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1904                } else if name.as_str() == vars::CLUSTER.name()
1905                    && matches!(
1906                        self.catalog().resolve_cluster(vars.cluster()),
1907                        Err(CatalogError::UnknownCluster(_))
1908                    )
1909                    && session.vars().current_object_missing_warnings()
1910                {
1911                    let name = vars.cluster().to_string();
1912                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1913                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1914                    let v = values.into_first().to_lowercase();
1915                    if v == IsolationLevel::ReadUncommitted.as_str()
1916                        || v == IsolationLevel::ReadCommitted.as_str()
1917                        || v == IsolationLevel::RepeatableRead.as_str()
1918                    {
1919                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1920                            isolation_level: v,
1921                        });
1922                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1923                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1924                    }
1925                }
1926            }
1927            None => vars.reset(self.catalog().system_config(), &name, local)?,
1928        }
1929
1930        Ok(ExecuteResponse::SetVariable { name, reset: false })
1931    }
1932
1933    pub(super) fn sequence_reset_variable(
1934        &self,
1935        session: &mut Session,
1936        plan: plan::ResetVariablePlan,
1937    ) -> Result<ExecuteResponse, AdapterError> {
1938        let name = plan.name;
1939        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1940            self.validate_set_isolation_level(session)?;
1941        }
1942        if &name == vars::CLUSTER.name() {
1943            self.validate_set_cluster(session)?;
1944        }
1945        session
1946            .vars_mut()
1947            .reset(self.catalog().system_config(), &name, false)?;
1948        Ok(ExecuteResponse::SetVariable { name, reset: true })
1949    }
1950
1951    pub(super) fn sequence_set_transaction(
1952        &self,
1953        session: &mut Session,
1954        plan: plan::SetTransactionPlan,
1955    ) -> Result<ExecuteResponse, AdapterError> {
1956        // TODO(jkosh44) Only supports isolation levels for now.
1957        for mode in plan.modes {
1958            match mode {
1959                TransactionMode::AccessMode(_) => {
1960                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1961                }
1962                TransactionMode::IsolationLevel(isolation_level) => {
1963                    self.validate_set_isolation_level(session)?;
1964
1965                    session.vars_mut().set(
1966                        self.catalog().system_config(),
1967                        TRANSACTION_ISOLATION_VAR_NAME,
1968                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1969                        plan.local,
1970                    )?
1971                }
1972            }
1973        }
1974        Ok(ExecuteResponse::SetVariable {
1975            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1976            reset: false,
1977        })
1978    }
1979
1980    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1981        if session.transaction().contains_ops() {
1982            Err(AdapterError::InvalidSetIsolationLevel)
1983        } else {
1984            Ok(())
1985        }
1986    }
1987
1988    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1989        if session.transaction().contains_ops() {
1990            Err(AdapterError::InvalidSetCluster)
1991        } else {
1992            Ok(())
1993        }
1994    }
1995
1996    #[instrument]
1997    pub(super) async fn sequence_end_transaction(
1998        &mut self,
1999        mut ctx: ExecuteContext,
2000        mut action: EndTransactionAction,
2001    ) {
2002        // If the transaction has failed, we can only rollback.
2003        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2004            (&action, ctx.session().transaction())
2005        {
2006            action = EndTransactionAction::Rollback;
2007        }
2008        let response = match action {
2009            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2010                params: BTreeMap::new(),
2011            }),
2012            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2013                params: BTreeMap::new(),
2014            }),
2015        };
2016
2017        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2018
2019        let (response, action) = match result {
2020            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2021                (response, action)
2022            }
2023            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2024                // Make sure we have the correct set of write locks for this transaction.
2025                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2026                // transactions.
2027                let validated_locks = match write_lock_guards {
2028                    None => None,
2029                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2030                        Ok(locks) => Some(locks),
2031                        Err(missing) => {
2032                            tracing::error!(?missing, "programming error, missing write locks");
2033                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2034                        }
2035                    },
2036                };
2037
2038                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2039                for WriteOp { id, rows } in writes {
2040                    let total_rows = collected_writes.entry(id).or_default();
2041                    total_rows.push(rows);
2042                }
2043
2044                self.submit_write(PendingWriteTxn::User {
2045                    span: Span::current(),
2046                    writes: collected_writes,
2047                    write_locks: validated_locks,
2048                    pending_txn: PendingTxn {
2049                        ctx,
2050                        response,
2051                        action,
2052                    },
2053                });
2054                return;
2055            }
2056            Ok((
2057                Some(TransactionOps::Peeks {
2058                    determination,
2059                    requires_linearization: RequireLinearization::Required,
2060                    ..
2061                }),
2062                _,
2063            )) if ctx.session().vars().transaction_isolation()
2064                == &IsolationLevel::StrictSerializable =>
2065            {
2066                let conn_id = ctx.session().conn_id().clone();
2067                let pending_read_txn = PendingReadTxn {
2068                    txn: PendingRead::Read {
2069                        txn: PendingTxn {
2070                            ctx,
2071                            response,
2072                            action,
2073                        },
2074                    },
2075                    timestamp_context: determination.timestamp_context,
2076                    created: Instant::now(),
2077                    num_requeues: 0,
2078                    otel_ctx: OpenTelemetryContext::obtain(),
2079                };
2080                self.strict_serializable_reads_tx
2081                    .send((conn_id, pending_read_txn))
2082                    .expect("sending to strict_serializable_reads_tx cannot fail");
2083                return;
2084            }
2085            Ok((
2086                Some(TransactionOps::Peeks {
2087                    determination,
2088                    requires_linearization: RequireLinearization::Required,
2089                    ..
2090                }),
2091                _,
2092            )) if ctx.session().vars().transaction_isolation()
2093                == &IsolationLevel::StrongSessionSerializable =>
2094            {
2095                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2096                    ctx.session_mut()
2097                        .ensure_timestamp_oracle(timeline.clone())
2098                        .apply_write(*ts);
2099                }
2100                (response, action)
2101            }
2102            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2103                self.internal_cmd_tx
2104                    .send(Message::ExecuteSingleStatementTransaction {
2105                        ctx,
2106                        otel_ctx: OpenTelemetryContext::obtain(),
2107                        stmt,
2108                        params,
2109                    })
2110                    .expect("must send");
2111                return;
2112            }
2113            Ok((_, _)) => (response, action),
2114            Err(err) => (Err(err), EndTransactionAction::Rollback),
2115        };
2116        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2117        // Append any parameters that changed to the response.
2118        let response = response.map(|mut r| {
2119            r.extend_params(changed);
2120            ExecuteResponse::from(r)
2121        });
2122
2123        ctx.retire(response);
2124    }
2125
2126    #[instrument]
2127    async fn sequence_end_transaction_inner(
2128        &mut self,
2129        ctx: &mut ExecuteContext,
2130        action: EndTransactionAction,
2131    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2132        let txn = self.clear_transaction(ctx.session_mut()).await;
2133
2134        if let EndTransactionAction::Commit = action {
2135            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2136                match &mut ops {
2137                    TransactionOps::Writes(writes) => {
2138                        for WriteOp { id, .. } in &mut writes.iter() {
2139                            // Re-verify this id exists.
2140                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2141                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2142                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2143                                })
2144                            })?;
2145                        }
2146
2147                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2148                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2149                    }
2150                    TransactionOps::DDL {
2151                        ops,
2152                        state: _,
2153                        side_effects,
2154                        revision,
2155                        snapshot: _,
2156                    } => {
2157                        // Make sure our catalog hasn't changed.
2158                        if *revision != self.catalog().transient_revision() {
2159                            return Err(AdapterError::DDLTransactionRace);
2160                        }
2161                        // Commit all of our queued ops.
2162                        let ops = std::mem::take(ops);
2163                        let side_effects = std::mem::take(side_effects);
2164                        self.catalog_transact_with_side_effects(
2165                            Some(ctx),
2166                            ops,
2167                            move |a, mut ctx| {
2168                                Box::pin(async move {
2169                                    for side_effect in side_effects {
2170                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2171                                    }
2172                                })
2173                            },
2174                        )
2175                        .await?;
2176                    }
2177                    _ => (),
2178                }
2179                return Ok((Some(ops), write_lock_guards));
2180            }
2181        }
2182
2183        Ok((None, None))
2184    }
2185
2186    pub(super) async fn sequence_side_effecting_func(
2187        &mut self,
2188        ctx: ExecuteContext,
2189        plan: SideEffectingFunc,
2190    ) {
2191        match plan {
2192            SideEffectingFunc::PgCancelBackend { connection_id } => {
2193                if ctx.session().conn_id().unhandled() == connection_id {
2194                    // As a special case, if we're canceling ourselves, we send
2195                    // back a canceled resposne to the client issuing the query,
2196                    // and so we need to do no further processing of the cancel.
2197                    ctx.retire(Err(AdapterError::Canceled));
2198                    return;
2199                }
2200
2201                let res = if let Some((id_handle, _conn_meta)) =
2202                    self.active_conns.get_key_value(&connection_id)
2203                {
2204                    // check_plan already verified role membership.
2205                    self.handle_privileged_cancel(id_handle.clone()).await;
2206                    Datum::True
2207                } else {
2208                    Datum::False
2209                };
2210                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2211            }
2212        }
2213    }
2214
2215    /// Execute a side-effecting function from the frontend peek path.
2216    /// This is separate from `sequence_side_effecting_func` because
2217    /// - It doesn't have an ExecuteContext.
2218    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2219    ///   peek sequencing can't look at `active_conns`.
2220    ///
2221    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2222    /// sequencing.
2223    pub(crate) async fn execute_side_effecting_func(
2224        &mut self,
2225        plan: SideEffectingFunc,
2226        conn_id: ConnectionId,
2227        current_role: RoleId,
2228    ) -> Result<ExecuteResponse, AdapterError> {
2229        match plan {
2230            SideEffectingFunc::PgCancelBackend { connection_id } => {
2231                if conn_id.unhandled() == connection_id {
2232                    // As a special case, if we're canceling ourselves, we return
2233                    // a canceled response to the client issuing the query,
2234                    // and so we need to do no further processing of the cancel.
2235                    return Err(AdapterError::Canceled);
2236                }
2237
2238                // Perform RBAC check: the current user must be a member of the role
2239                // that owns the connection being cancelled.
2240                if let Some((_id_handle, conn_meta)) =
2241                    self.active_conns.get_key_value(&connection_id)
2242                {
2243                    let target_role = *conn_meta.authenticated_role_id();
2244                    let role_membership = self
2245                        .catalog()
2246                        .state()
2247                        .collect_role_membership(&current_role);
2248                    if !role_membership.contains(&target_role) {
2249                        let target_role_name = self
2250                            .catalog()
2251                            .try_get_role(&target_role)
2252                            .map(|role| role.name().to_string())
2253                            .unwrap_or_else(|| target_role.to_string());
2254                        return Err(AdapterError::Unauthorized(
2255                            rbac::UnauthorizedError::RoleMembership {
2256                                role_names: vec![target_role_name],
2257                            },
2258                        ));
2259                    }
2260
2261                    // RBAC check passed, proceed with cancellation.
2262                    let id_handle = self
2263                        .active_conns
2264                        .get_key_value(&connection_id)
2265                        .map(|(id, _)| id.clone())
2266                        .expect("checked above");
2267                    self.handle_privileged_cancel(id_handle).await;
2268                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2269                } else {
2270                    // Connection not found, return false.
2271                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2272                }
2273            }
2274        }
2275    }
2276
2277    /// Inner method that performs the actual real-time recency timestamp determination.
2278    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2279    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2280    pub(crate) async fn determine_real_time_recent_timestamp(
2281        &self,
2282        source_ids: impl Iterator<Item = GlobalId>,
2283        real_time_recency_timeout: Duration,
2284    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2285        let item_ids = source_ids
2286            .map(|gid| {
2287                self.catalog
2288                    .try_resolve_item_id(&gid)
2289                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2290            })
2291            .collect::<Result<Vec<_>, _>>()?;
2292
2293        // Find all dependencies transitively because we need to ensure that
2294        // RTR queries determine the timestamp from the sources' (i.e.
2295        // storage objects that ingest data from external systems) remap
2296        // data. We "cheat" a little bit and filter out any IDs that aren't
2297        // user objects because we know they are not a RTR source.
2298        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2299        // If none of the sources are user objects, we don't need to provide
2300        // a RTR timestamp.
2301        if to_visit.is_empty() {
2302            return Ok(None);
2303        }
2304
2305        let mut timestamp_objects = BTreeSet::new();
2306
2307        while let Some(id) = to_visit.pop_front() {
2308            timestamp_objects.insert(id);
2309            to_visit.extend(
2310                self.catalog()
2311                    .get_entry(&id)
2312                    .uses()
2313                    .into_iter()
2314                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2315            );
2316        }
2317        let timestamp_objects = timestamp_objects
2318            .into_iter()
2319            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2320            .collect();
2321
2322        let r = self
2323            .controller
2324            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2325            .await?;
2326
2327        Ok(Some(r))
2328    }
2329
2330    /// Checks to see if the session needs a real time recency timestamp and if so returns
2331    /// a future that will return the timestamp.
2332    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2333        &self,
2334        session: &Session,
2335        source_ids: impl Iterator<Item = GlobalId>,
2336    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2337        let vars = session.vars();
2338
2339        if vars.real_time_recency()
2340            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2341            && !session.contains_read_timestamp()
2342        {
2343            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2344                .await
2345        } else {
2346            Ok(None)
2347        }
2348    }
2349
2350    #[instrument]
2351    pub(super) async fn sequence_explain_plan(
2352        &mut self,
2353        ctx: ExecuteContext,
2354        plan: plan::ExplainPlanPlan,
2355        target_cluster: TargetCluster,
2356    ) {
2357        match &plan.explainee {
2358            plan::Explainee::Statement(stmt) => match stmt {
2359                plan::ExplaineeStatement::CreateView { .. } => {
2360                    self.explain_create_view(ctx, plan).await;
2361                }
2362                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2363                    self.explain_create_materialized_view(ctx, plan).await;
2364                }
2365                plan::ExplaineeStatement::CreateIndex { .. } => {
2366                    self.explain_create_index(ctx, plan).await;
2367                }
2368                plan::ExplaineeStatement::Select { .. } => {
2369                    self.explain_peek(ctx, plan, target_cluster).await;
2370                }
2371                plan::ExplaineeStatement::Subscribe { .. } => {
2372                    self.explain_subscribe(ctx, plan, target_cluster).await;
2373                }
2374            },
2375            plan::Explainee::View(_) => {
2376                let result = self.explain_view(&ctx, plan);
2377                ctx.retire(result);
2378            }
2379            plan::Explainee::MaterializedView(_) => {
2380                let result = self.explain_materialized_view(&ctx, plan);
2381                ctx.retire(result);
2382            }
2383            plan::Explainee::Index(_) => {
2384                let result = self.explain_index(&ctx, plan);
2385                ctx.retire(result);
2386            }
2387            plan::Explainee::ReplanView(_) => {
2388                self.explain_replan_view(ctx, plan).await;
2389            }
2390            plan::Explainee::ReplanMaterializedView(_) => {
2391                self.explain_replan_materialized_view(ctx, plan).await;
2392            }
2393            plan::Explainee::ReplanIndex(_) => {
2394                self.explain_replan_index(ctx, plan).await;
2395            }
2396        };
2397    }
2398
2399    pub(super) async fn sequence_explain_pushdown(
2400        &mut self,
2401        ctx: ExecuteContext,
2402        plan: plan::ExplainPushdownPlan,
2403        target_cluster: TargetCluster,
2404    ) {
2405        match plan.explainee {
2406            Explainee::Statement(ExplaineeStatement::Select {
2407                broken: false,
2408                plan,
2409                desc: _,
2410            }) => {
2411                let stage = return_if_err!(
2412                    self.peek_validate(
2413                        ctx.session(),
2414                        plan,
2415                        target_cluster,
2416                        None,
2417                        ExplainContext::Pushdown,
2418                        Some(ctx.session().vars().max_query_result_size()),
2419                    ),
2420                    ctx
2421                );
2422                self.sequence_staged(ctx, Span::current(), stage).await;
2423            }
2424            Explainee::MaterializedView(item_id) => {
2425                self.explain_pushdown_materialized_view(ctx, item_id).await;
2426            }
2427            _ => {
2428                ctx.retire(Err(AdapterError::Unsupported(
2429                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2430                )));
2431            }
2432        };
2433    }
2434
2435    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2436    async fn execute_explain_pushdown_with_read_holds(
2437        &self,
2438        ctx: ExecuteContext,
2439        as_of: Antichain<Timestamp>,
2440        mz_now: ResultSpec<'static>,
2441        read_holds: Option<ReadHolds<Timestamp>>,
2442        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2443    ) {
2444        let fut = self
2445            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2446            .await;
2447        task::spawn(|| "render explain pushdown", async move {
2448            // Transfer the necessary read holds over to the background task
2449            let _read_holds = read_holds;
2450            let res = fut.await;
2451            ctx.retire(res);
2452        });
2453    }
2454
2455    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2456    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2457        &self,
2458        session: &Session,
2459        as_of: Antichain<Timestamp>,
2460        mz_now: ResultSpec<'static>,
2461        imports: I,
2462    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2463        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2464        super::explain_pushdown_future_inner(
2465            session,
2466            &self.catalog,
2467            &self.controller.storage_collections,
2468            as_of,
2469            mz_now,
2470            imports,
2471        )
2472        .await
2473    }
2474
2475    #[instrument]
2476    pub(super) async fn sequence_insert(
2477        &mut self,
2478        mut ctx: ExecuteContext,
2479        plan: plan::InsertPlan,
2480    ) {
2481        // Normally, this would get checked when trying to add "write ops" to
2482        // the transaction but we go down diverging paths below, based on
2483        // whether the INSERT is only constant values or not.
2484        //
2485        // For the non-constant case we sequence an implicit read-then-write,
2486        // which messes with the transaction ops and would allow an implicit
2487        // read-then-write to sneak into a read-only transaction.
2488        if !ctx.session_mut().transaction().allows_writes() {
2489            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2490            return;
2491        }
2492
2493        // The structure of this code originates from a time where
2494        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2495        // optimized `MirRelationExpr`.
2496        //
2497        // Ideally, we would like to make the `selection.as_const().is_some()`
2498        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2499        // are planned as a Wrap($n, $vals) call, so until we can reduce
2500        // HirRelationExpr this will always returns false.
2501        //
2502        // Unfortunately, hitting the default path of the match below also
2503        // causes a lot of tests to fail, so we opted to go with the extra
2504        // `plan.values.clone()` statements when producing the `optimized_mir`
2505        // and re-optimize the values in the `sequence_read_then_write` call.
2506        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2507            // We don't perform any optimizations on an expression that is already
2508            // a constant for writes, as we want to maximize bulk-insert throughput.
2509            let expr = return_if_err!(
2510                plan.values
2511                    .clone()
2512                    .lower(self.catalog().system_config(), None),
2513                ctx
2514            );
2515            OptimizedMirRelationExpr(expr)
2516        } else {
2517            // Collect optimizer parameters.
2518            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2519
2520            // (`optimize::view::Optimizer` has a special case for constant queries.)
2521            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2522
2523            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2524            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2525        };
2526
2527        match optimized_mir.into_inner() {
2528            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2529                let catalog = self.owned_catalog();
2530                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2531                    let result =
2532                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2533                    ctx.retire(result);
2534                });
2535            }
2536            // All non-constant values must be planned as read-then-writes.
2537            _ => {
2538                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2539                    Some(table) => {
2540                        // Inserts always occur at the latest version of the table.
2541                        let desc = table.relation_desc_latest().expect("table has a desc");
2542                        desc.arity()
2543                    }
2544                    None => {
2545                        ctx.retire(Err(AdapterError::Catalog(
2546                            mz_catalog::memory::error::Error {
2547                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2548                                    plan.id.to_string(),
2549                                )),
2550                            },
2551                        )));
2552                        return;
2553                    }
2554                };
2555
2556                let finishing = RowSetFinishing {
2557                    order_by: vec![],
2558                    limit: None,
2559                    offset: 0,
2560                    project: (0..desc_arity).collect(),
2561                };
2562
2563                let read_then_write_plan = plan::ReadThenWritePlan {
2564                    id: plan.id,
2565                    selection: plan.values,
2566                    finishing,
2567                    assignments: BTreeMap::new(),
2568                    kind: MutationKind::Insert,
2569                    returning: plan.returning,
2570                };
2571
2572                self.sequence_read_then_write(ctx, read_then_write_plan)
2573                    .await;
2574            }
2575        }
2576    }
2577
2578    /// ReadThenWrite is a plan whose writes depend on the results of a
2579    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2580    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2581    /// serializability violation could occur.
2582    #[instrument]
2583    pub(super) async fn sequence_read_then_write(
2584        &mut self,
2585        mut ctx: ExecuteContext,
2586        plan: plan::ReadThenWritePlan,
2587    ) {
2588        let mut source_ids: BTreeSet<_> = plan
2589            .selection
2590            .depends_on()
2591            .into_iter()
2592            .map(|gid| self.catalog().resolve_item_id(&gid))
2593            .collect();
2594        source_ids.insert(plan.id);
2595
2596        // If the transaction doesn't already have write locks, acquire them.
2597        if ctx.session().transaction().write_locks().is_none() {
2598            // Pre-define all of the locks we need.
2599            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2600
2601            // Try acquiring all of our locks.
2602            for id in &source_ids {
2603                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2604                    write_locks.insert_lock(*id, lock);
2605                }
2606            }
2607
2608            // See if we acquired all of the neccessary locks.
2609            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2610                Ok(locks) => locks,
2611                Err(missing) => {
2612                    // Defer our write if we couldn't acquire all of the locks.
2613                    let role_metadata = ctx.session().role_metadata().clone();
2614                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2615                    let plan = DeferredPlan {
2616                        ctx,
2617                        plan: Plan::ReadThenWrite(plan),
2618                        validity: PlanValidity::new(
2619                            self.catalog.transient_revision(),
2620                            source_ids.clone(),
2621                            None,
2622                            None,
2623                            role_metadata,
2624                        ),
2625                        requires_locks: source_ids,
2626                    };
2627                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2628                }
2629            };
2630
2631            ctx.session_mut()
2632                .try_grant_write_locks(write_locks)
2633                .expect("session has already been granted write locks");
2634        }
2635
2636        let plan::ReadThenWritePlan {
2637            id,
2638            kind,
2639            selection,
2640            mut assignments,
2641            finishing,
2642            mut returning,
2643        } = plan;
2644
2645        // Read then writes can be queued, so re-verify the id exists.
2646        let desc = match self.catalog().try_get_entry(&id) {
2647            Some(table) => {
2648                // Inserts always occur at the latest version of the table.
2649                table
2650                    .relation_desc_latest()
2651                    .expect("table has a desc")
2652                    .into_owned()
2653            }
2654            None => {
2655                ctx.retire(Err(AdapterError::Catalog(
2656                    mz_catalog::memory::error::Error {
2657                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2658                    },
2659                )));
2660                return;
2661            }
2662        };
2663
2664        // Disallow mz_now in any position because read time and write time differ.
2665        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2666            || assignments.values().any(|e| e.contains_temporal())
2667            || returning.iter().any(|e| e.contains_temporal());
2668        if contains_temporal {
2669            ctx.retire(Err(AdapterError::Unsupported(
2670                "calls to mz_now in write statements",
2671            )));
2672            return;
2673        }
2674
2675        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2676        //
2677        // - They do not refer to any objects whose notion of time moves differently than that of
2678        // user tables. This limitation is meant to ensure no writes occur between this read and the
2679        // subsequent write.
2680        // - They do not use mz_now(), whose time produced during read will differ from the write
2681        //   timestamp.
2682        fn validate_read_dependencies(
2683            catalog: &Catalog,
2684            id: &CatalogItemId,
2685        ) -> Result<(), AdapterError> {
2686            use CatalogItemType::*;
2687            use mz_catalog::memory::objects;
2688            let mut ids_to_check = Vec::new();
2689            let valid = match catalog.try_get_entry(id) {
2690                Some(entry) => {
2691                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2692                    | CatalogItem::MaterializedView(objects::MaterializedView {
2693                        optimized_expr,
2694                        ..
2695                    }) = entry.item()
2696                    {
2697                        if optimized_expr.contains_temporal() {
2698                            return Err(AdapterError::Unsupported(
2699                                "calls to mz_now in write statements",
2700                            ));
2701                        }
2702                    }
2703                    match entry.item().typ() {
2704                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2705                            ids_to_check.extend(entry.uses());
2706                            let valid_id = id.is_user() || matches!(typ, Func);
2707                            valid_id
2708                        }
2709                        Source | Secret | Connection => false,
2710                        // Cannot select from sinks or indexes.
2711                        Sink | Index => unreachable!(),
2712                        Table => {
2713                            if !id.is_user() {
2714                                // We can't read from non-user tables
2715                                false
2716                            } else {
2717                                // We can't read from tables that are source-exports
2718                                entry.source_export_details().is_none()
2719                            }
2720                        }
2721                        Type => true,
2722                    }
2723                }
2724                None => false,
2725            };
2726            if !valid {
2727                let (object_name, object_type) = match catalog.try_get_entry(id) {
2728                    Some(entry) => {
2729                        let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2730                        let object_type = match entry.item().typ() {
2731                            // We only need the disallowed types here; the allowed types are handled above.
2732                            Source => "source",
2733                            Secret => "secret",
2734                            Connection => "connection",
2735                            Table => {
2736                                if !id.is_user() {
2737                                    "system table"
2738                                } else {
2739                                    "source-export table"
2740                                }
2741                            }
2742                            View => "system view",
2743                            MaterializedView => "system materialized view",
2744                            ContinualTask => "system task",
2745                            _ => "invalid dependency",
2746                        };
2747                        (object_name, object_type.to_string())
2748                    }
2749                    None => (id.to_string(), "unknown".to_string()),
2750                };
2751                return Err(AdapterError::InvalidTableMutationSelection {
2752                    object_name,
2753                    object_type,
2754                });
2755            }
2756            for id in ids_to_check {
2757                validate_read_dependencies(catalog, &id)?;
2758            }
2759            Ok(())
2760        }
2761
2762        for gid in selection.depends_on() {
2763            let item_id = self.catalog().resolve_item_id(&gid);
2764            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2765                ctx.retire(Err(err));
2766                return;
2767            }
2768        }
2769
2770        let (peek_tx, peek_rx) = oneshot::channel();
2771        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2772        let (tx, _, session, extra) = ctx.into_parts();
2773        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2774        // execution context, because this peek does not directly correspond to an execute,
2775        // and so we don't need to take any action on its retirement.
2776        // TODO[btv]: we might consider extending statement logging to log the inner
2777        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2778        // and mint a new "real" execution context here. We'd also have to add some logic to
2779        // make sure such "sub-statements" are always sampled when the top-level statement is
2780        //
2781        // It's debatable whether this makes sense conceptually,
2782        // because the inner fragment here is not actually a
2783        // "statement" in its own right.
2784        let peek_ctx = ExecuteContext::from_parts(
2785            peek_client_tx,
2786            self.internal_cmd_tx.clone(),
2787            session,
2788            Default::default(),
2789        );
2790
2791        self.sequence_peek(
2792            peek_ctx,
2793            plan::SelectPlan {
2794                select: None,
2795                source: selection,
2796                when: QueryWhen::FreshestTableWrite,
2797                finishing,
2798                copy_to: None,
2799            },
2800            TargetCluster::Active,
2801            None,
2802        )
2803        .await;
2804
2805        let internal_cmd_tx = self.internal_cmd_tx.clone();
2806        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2807        let catalog = self.owned_catalog();
2808        let max_result_size = self.catalog().system_config().max_result_size();
2809
2810        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2811            let (peek_response, session) = match peek_rx.await {
2812                Ok(Response {
2813                    result: Ok(resp),
2814                    session,
2815                    otel_ctx,
2816                }) => {
2817                    otel_ctx.attach_as_parent();
2818                    (resp, session)
2819                }
2820                Ok(Response {
2821                    result: Err(e),
2822                    session,
2823                    otel_ctx,
2824                }) => {
2825                    let ctx =
2826                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2827                    otel_ctx.attach_as_parent();
2828                    ctx.retire(Err(e));
2829                    return;
2830                }
2831                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2832                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2833            };
2834            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2835            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2836
2837            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2838            if timeout_dur == Duration::ZERO {
2839                timeout_dur = Duration::MAX;
2840            }
2841
2842            let style = ExprPrepOneShot {
2843                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2844                session: ctx.session(),
2845                catalog_state: catalog.state(),
2846            };
2847            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2848                return_if_err!(style.prep_scalar_expr(expr), ctx);
2849            }
2850
2851            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2852                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2853                    let arena = RowArena::new();
2854                    let mut diffs = Vec::new();
2855                    let mut datum_vec = mz_repr::DatumVec::new();
2856
2857                    while let Some(row) = rows.next() {
2858                        if !assignments.is_empty() {
2859                            assert!(
2860                                matches!(kind, MutationKind::Update),
2861                                "only updates support assignments"
2862                            );
2863                            let mut datums = datum_vec.borrow_with(row);
2864                            let mut updates = vec![];
2865                            for (idx, expr) in &assignments {
2866                                let updated = match expr.eval(&datums, &arena) {
2867                                    Ok(updated) => updated,
2868                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2869                                };
2870                                updates.push((*idx, updated));
2871                            }
2872                            for (idx, new_value) in updates {
2873                                datums[idx] = new_value;
2874                            }
2875                            let updated = Row::pack_slice(&datums);
2876                            diffs.push((updated, Diff::ONE));
2877                        }
2878                        match kind {
2879                            // Updates and deletes always remove the
2880                            // current row. Updates will also add an
2881                            // updated value.
2882                            MutationKind::Update | MutationKind::Delete => {
2883                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2884                            }
2885                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2886                        }
2887                    }
2888
2889                    // Sum of all the rows' byte size, for checking if we go
2890                    // above the max_result_size threshold.
2891                    let mut byte_size: u64 = 0;
2892                    for (row, diff) in &diffs {
2893                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2894                        if diff.is_positive() {
2895                            for (idx, datum) in row.iter().enumerate() {
2896                                desc.constraints_met(idx, &datum)?;
2897                            }
2898                        }
2899                    }
2900                    Ok((diffs, byte_size))
2901                };
2902
2903            let diffs = match peek_response {
2904                ExecuteResponse::SendingRowsStreaming {
2905                    rows: mut rows_stream,
2906                    ..
2907                } => {
2908                    let mut byte_size: u64 = 0;
2909                    let mut diffs = Vec::new();
2910                    let result = loop {
2911                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2912                            Ok(Some(res)) => match res {
2913                                PeekResponseUnary::Rows(new_rows) => {
2914                                    match make_diffs(new_rows) {
2915                                        Ok((mut new_diffs, new_byte_size)) => {
2916                                            byte_size = byte_size.saturating_add(new_byte_size);
2917                                            if byte_size > max_result_size {
2918                                                break Err(AdapterError::ResultSize(format!(
2919                                                    "result exceeds max size of {max_result_size}"
2920                                                )));
2921                                            }
2922                                            diffs.append(&mut new_diffs)
2923                                        }
2924                                        Err(e) => break Err(e),
2925                                    };
2926                                }
2927                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2928                                PeekResponseUnary::Error(e) => {
2929                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2930                                }
2931                            },
2932                            Ok(None) => break Ok(diffs),
2933                            Err(_) => {
2934                                // We timed out, so remove the pending peek. This is
2935                                // best-effort and doesn't guarantee we won't
2936                                // receive a response.
2937                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2938                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2939                                    conn_id: ctx.session().conn_id().clone(),
2940                                });
2941                                if let Err(e) = result {
2942                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2943                                }
2944                                break Err(AdapterError::StatementTimeout);
2945                            }
2946                        }
2947                    };
2948
2949                    result
2950                }
2951                ExecuteResponse::SendingRowsImmediate { rows } => {
2952                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2953                }
2954                resp => Err(AdapterError::Unstructured(anyhow!(
2955                    "unexpected peek response: {resp:?}"
2956                ))),
2957            };
2958
2959            let mut returning_rows = Vec::new();
2960            let mut diff_err: Option<AdapterError> = None;
2961            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2962                let arena = RowArena::new();
2963                for (row, diff) in diffs {
2964                    if !diff.is_positive() {
2965                        continue;
2966                    }
2967                    let mut returning_row = Row::with_capacity(returning.len());
2968                    let mut packer = returning_row.packer();
2969                    for expr in &returning {
2970                        let datums: Vec<_> = row.iter().collect();
2971                        match expr.eval(&datums, &arena) {
2972                            Ok(datum) => {
2973                                packer.push(datum);
2974                            }
2975                            Err(err) => {
2976                                diff_err = Some(err.into());
2977                                break;
2978                            }
2979                        }
2980                    }
2981                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2982                    let diff = match NonZeroUsize::try_from(diff) {
2983                        Ok(diff) => diff,
2984                        Err(err) => {
2985                            diff_err = Some(err.into());
2986                            break;
2987                        }
2988                    };
2989                    returning_rows.push((returning_row, diff));
2990                    if diff_err.is_some() {
2991                        break;
2992                    }
2993                }
2994            }
2995            let diffs = if let Some(err) = diff_err {
2996                Err(err)
2997            } else {
2998                diffs
2999            };
3000
3001            // We need to clear out the timestamp context so the write doesn't fail due to a
3002            // read only transaction.
3003            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3004            // No matter what isolation level the client is using, we must linearize this
3005            // read. The write will be performed right after this, as part of a single
3006            // transaction, so the write must have a timestamp greater than or equal to the
3007            // read.
3008            //
3009            // Note: It's only OK for the write to have a greater timestamp than the read
3010            // because the write lock prevents any other writes from happening in between
3011            // the read and write.
3012            if let Some(timestamp_context) = timestamp_context {
3013                let (tx, rx) = tokio::sync::oneshot::channel();
3014                let conn_id = ctx.session().conn_id().clone();
3015                let pending_read_txn = PendingReadTxn {
3016                    txn: PendingRead::ReadThenWrite { ctx, tx },
3017                    timestamp_context,
3018                    created: Instant::now(),
3019                    num_requeues: 0,
3020                    otel_ctx: OpenTelemetryContext::obtain(),
3021                };
3022                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3023                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3024                if let Err(e) = result {
3025                    warn!(
3026                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3027                        e
3028                    );
3029                    return;
3030                }
3031                let result = rx.await;
3032                // It is not an error for these results to be ready after `tx` has been dropped.
3033                ctx = match result {
3034                    Ok(Some(ctx)) => ctx,
3035                    Ok(None) => {
3036                        // Coordinator took our context and will handle responding to the client.
3037                        // This usually indicates that our transaction was aborted.
3038                        return;
3039                    }
3040                    Err(e) => {
3041                        warn!(
3042                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3043                            e
3044                        );
3045                        return;
3046                    }
3047                };
3048            }
3049
3050            match diffs {
3051                Ok(diffs) => {
3052                    let result = Self::send_diffs(
3053                        ctx.session_mut(),
3054                        plan::SendDiffsPlan {
3055                            id,
3056                            updates: diffs,
3057                            kind,
3058                            returning: returning_rows,
3059                            max_result_size,
3060                        },
3061                    );
3062                    ctx.retire(result);
3063                }
3064                Err(e) => {
3065                    ctx.retire(Err(e));
3066                }
3067            }
3068        });
3069    }
3070
3071    #[instrument]
3072    pub(super) async fn sequence_alter_item_rename(
3073        &mut self,
3074        ctx: &mut ExecuteContext,
3075        plan: plan::AlterItemRenamePlan,
3076    ) -> Result<ExecuteResponse, AdapterError> {
3077        let op = catalog::Op::RenameItem {
3078            id: plan.id,
3079            current_full_name: plan.current_full_name,
3080            to_name: plan.to_name,
3081        };
3082        match self
3083            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3084            .await
3085        {
3086            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3087            Err(err) => Err(err),
3088        }
3089    }
3090
3091    #[instrument]
3092    pub(super) async fn sequence_alter_retain_history(
3093        &mut self,
3094        ctx: &mut ExecuteContext,
3095        plan: plan::AlterRetainHistoryPlan,
3096    ) -> Result<ExecuteResponse, AdapterError> {
3097        let ops = vec![catalog::Op::AlterRetainHistory {
3098            id: plan.id,
3099            value: plan.value,
3100            window: plan.window,
3101        }];
3102        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3103            Box::pin(async move {
3104                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3105                let cluster = match catalog_item {
3106                    CatalogItem::Table(_)
3107                    | CatalogItem::MaterializedView(_)
3108                    | CatalogItem::Source(_)
3109                    | CatalogItem::ContinualTask(_) => None,
3110                    CatalogItem::Index(index) => Some(index.cluster_id),
3111                    CatalogItem::Log(_)
3112                    | CatalogItem::View(_)
3113                    | CatalogItem::Sink(_)
3114                    | CatalogItem::Type(_)
3115                    | CatalogItem::Func(_)
3116                    | CatalogItem::Secret(_)
3117                    | CatalogItem::Connection(_) => unreachable!(),
3118                };
3119                match cluster {
3120                    Some(cluster) => {
3121                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3122                    }
3123                    None => {
3124                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3125                    }
3126                }
3127            })
3128        })
3129        .await?;
3130        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3131    }
3132
3133    #[instrument]
3134    pub(super) async fn sequence_alter_source_timestamp_interval(
3135        &mut self,
3136        ctx: &mut ExecuteContext,
3137        plan: plan::AlterSourceTimestampIntervalPlan,
3138    ) -> Result<ExecuteResponse, AdapterError> {
3139        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3140            id: plan.id,
3141            value: plan.value,
3142            interval: plan.interval,
3143        }];
3144        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3145            Box::pin(async move {
3146                let source = coord
3147                    .catalog()
3148                    .get_entry(&plan.id)
3149                    .source()
3150                    .expect("known to be source");
3151                let (global_id, desc) = match &source.data_source {
3152                    DataSourceDesc::Ingestion { desc, .. }
3153                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3154                        (source.global_id, desc.clone())
3155                    }
3156                    _ => return,
3157                };
3158                let desc = desc.into_inline_connection(coord.catalog().state());
3159                coord
3160                    .controller
3161                    .storage
3162                    .alter_ingestion_source_desc(BTreeMap::from([(global_id, desc)]))
3163                    .await
3164                    .unwrap_or_terminate("cannot fail to alter ingestion source desc");
3165            })
3166        })
3167        .await?;
3168        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3169    }
3170
3171    #[instrument]
3172    pub(super) async fn sequence_alter_schema_rename(
3173        &mut self,
3174        ctx: &mut ExecuteContext,
3175        plan: plan::AlterSchemaRenamePlan,
3176    ) -> Result<ExecuteResponse, AdapterError> {
3177        let (database_spec, schema_spec) = plan.cur_schema_spec;
3178        let op = catalog::Op::RenameSchema {
3179            database_spec,
3180            schema_spec,
3181            new_name: plan.new_schema_name,
3182            check_reserved_names: true,
3183        };
3184        match self
3185            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3186            .await
3187        {
3188            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3189            Err(err) => Err(err),
3190        }
3191    }
3192
3193    #[instrument]
3194    pub(super) async fn sequence_alter_schema_swap(
3195        &mut self,
3196        ctx: &mut ExecuteContext,
3197        plan: plan::AlterSchemaSwapPlan,
3198    ) -> Result<ExecuteResponse, AdapterError> {
3199        let plan::AlterSchemaSwapPlan {
3200            schema_a_spec: (schema_a_db, schema_a),
3201            schema_a_name,
3202            schema_b_spec: (schema_b_db, schema_b),
3203            schema_b_name,
3204            name_temp,
3205        } = plan;
3206
3207        let op_a = catalog::Op::RenameSchema {
3208            database_spec: schema_a_db,
3209            schema_spec: schema_a,
3210            new_name: name_temp,
3211            check_reserved_names: false,
3212        };
3213        let op_b = catalog::Op::RenameSchema {
3214            database_spec: schema_b_db,
3215            schema_spec: schema_b,
3216            new_name: schema_a_name,
3217            check_reserved_names: false,
3218        };
3219        let op_c = catalog::Op::RenameSchema {
3220            database_spec: schema_a_db,
3221            schema_spec: schema_a,
3222            new_name: schema_b_name,
3223            check_reserved_names: false,
3224        };
3225
3226        match self
3227            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3228                Box::pin(async {})
3229            })
3230            .await
3231        {
3232            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3233            Err(err) => Err(err),
3234        }
3235    }
3236
3237    #[instrument]
3238    pub(super) async fn sequence_alter_role(
3239        &mut self,
3240        session: &Session,
3241        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3242    ) -> Result<ExecuteResponse, AdapterError> {
3243        let catalog = self.catalog().for_session(session);
3244        let role = catalog.get_role(&id);
3245
3246        // We'll send these notices to the user, if the operation is successful.
3247        let mut notices = vec![];
3248
3249        // Get the attributes and variables from the role, as they currently are.
3250        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3251        let mut vars = role.vars().clone();
3252
3253        // Whether to set the password to NULL. This is a special case since the existing
3254        // password is not stored in the role attributes.
3255        let mut nopassword = false;
3256
3257        // Apply our updates.
3258        match option {
3259            PlannedAlterRoleOption::Attributes(attrs) => {
3260                self.validate_role_attributes(&attrs.clone().into())?;
3261
3262                if let Some(inherit) = attrs.inherit {
3263                    attributes.inherit = inherit;
3264                }
3265
3266                if let Some(password) = attrs.password {
3267                    attributes.password = Some(password);
3268                    attributes.scram_iterations =
3269                        Some(self.catalog().system_config().scram_iterations())
3270                }
3271
3272                if let Some(superuser) = attrs.superuser {
3273                    attributes.superuser = Some(superuser);
3274                }
3275
3276                if let Some(login) = attrs.login {
3277                    attributes.login = Some(login);
3278                }
3279
3280                if attrs.nopassword.unwrap_or(false) {
3281                    nopassword = true;
3282                }
3283
3284                if let Some(notice) = self.should_emit_rbac_notice(session) {
3285                    notices.push(notice);
3286                }
3287            }
3288            PlannedAlterRoleOption::Variable(variable) => {
3289                // Get the variable to make sure it's valid and visible.
3290                let session_var = session.vars().inspect(variable.name())?;
3291                // Return early if it's not visible.
3292                session_var.visible(session.user(), catalog.system_vars())?;
3293
3294                // Emit a warning when deprecated variables are used.
3295                // TODO(database-issues#8069) remove this after sufficient time has passed
3296                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3297                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3298                } else if let PlannedRoleVariable::Set {
3299                    name,
3300                    value: VariableValue::Values(vals),
3301                } = &variable
3302                {
3303                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3304                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3305                    }
3306                }
3307
3308                let var_name = match variable {
3309                    PlannedRoleVariable::Set { name, value } => {
3310                        // Update our persisted set.
3311                        match &value {
3312                            VariableValue::Default => {
3313                                vars.remove(&name);
3314                            }
3315                            VariableValue::Values(vals) => {
3316                                let var = match &vals[..] {
3317                                    [val] => OwnedVarInput::Flat(val.clone()),
3318                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3319                                };
3320                                // Make sure the input is valid.
3321                                session_var.check(var.borrow())?;
3322
3323                                vars.insert(name.clone(), var);
3324                            }
3325                        };
3326                        name
3327                    }
3328                    PlannedRoleVariable::Reset { name } => {
3329                        // Remove it from our persisted values.
3330                        vars.remove(&name);
3331                        name
3332                    }
3333                };
3334
3335                // Emit a notice that they need to reconnect to see the change take effect.
3336                notices.push(AdapterNotice::VarDefaultUpdated {
3337                    role: Some(name.clone()),
3338                    var_name: Some(var_name),
3339                });
3340            }
3341        }
3342
3343        let op = catalog::Op::AlterRole {
3344            id,
3345            name,
3346            attributes,
3347            nopassword,
3348            vars: RoleVars { map: vars },
3349        };
3350        let response = self
3351            .catalog_transact(Some(session), vec![op])
3352            .await
3353            .map(|_| ExecuteResponse::AlteredRole)?;
3354
3355        // Send all of our queued notices.
3356        session.add_notices(notices);
3357
3358        Ok(response)
3359    }
3360
3361    #[instrument]
3362    pub(super) async fn sequence_alter_sink_prepare(
3363        &mut self,
3364        ctx: ExecuteContext,
3365        plan: plan::AlterSinkPlan,
3366    ) {
3367        // Put a read hold on the new relation
3368        let id_bundle = crate::CollectionIdBundle {
3369            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3370            compute_ids: BTreeMap::new(),
3371        };
3372        let read_hold = self.acquire_read_holds(&id_bundle);
3373
3374        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3375            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3376            return;
3377        };
3378
3379        let otel_ctx = OpenTelemetryContext::obtain();
3380        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3381
3382        let plan_validity = PlanValidity::new(
3383            self.catalog().transient_revision(),
3384            BTreeSet::from_iter([plan.item_id, from_item_id]),
3385            Some(plan.in_cluster),
3386            None,
3387            ctx.session().role_metadata().clone(),
3388        );
3389
3390        info!(
3391            "preparing alter sink for {}: frontiers={:?} export={:?}",
3392            plan.global_id,
3393            self.controller
3394                .storage_collections
3395                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3396            self.controller.storage.export(plan.global_id)
3397        );
3398
3399        // Now we must wait for the sink to make enough progress such that there is overlap between
3400        // the new `from` collection's read hold and the sink's write frontier.
3401        //
3402        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3403        // the watch set never completes and neither does the `ALTER SINK` command.
3404        self.install_storage_watch_set(
3405            ctx.session().conn_id().clone(),
3406            BTreeSet::from_iter([plan.global_id]),
3407            read_ts,
3408            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3409                ctx: Some(ctx),
3410                otel_ctx,
3411                plan,
3412                plan_validity,
3413                read_hold,
3414            }),
3415        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3416    }
3417
3418    #[instrument]
3419    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3420        ctx.otel_ctx.attach_as_parent();
3421
3422        let plan::AlterSinkPlan {
3423            item_id,
3424            global_id,
3425            sink: sink_plan,
3426            with_snapshot,
3427            in_cluster,
3428        } = ctx.plan.clone();
3429
3430        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3431        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3432        // arbitrarily changed since we performed planning, and we must re-assert that it still
3433        // matches our requirements.
3434        //
3435        // The `PlanValidity` check ensures that both the sink and the new source relation still
3436        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3437        // time, which we do by comparing the catalog sink version to the one in the plan.
3438        match ctx.plan_validity.check(self.catalog()) {
3439            Ok(()) => {}
3440            Err(err) => {
3441                ctx.retire(Err(err));
3442                return;
3443            }
3444        }
3445
3446        let entry = self.catalog().get_entry(&item_id);
3447        let CatalogItem::Sink(old_sink) = entry.item() else {
3448            panic!("invalid item kind for `AlterSinkPlan`");
3449        };
3450
3451        if sink_plan.version != old_sink.version + 1 {
3452            ctx.retire(Err(AdapterError::ChangedPlan(
3453                "sink was altered concurrently".into(),
3454            )));
3455            return;
3456        }
3457
3458        info!(
3459            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3460            self.controller
3461                .storage_collections
3462                .collections_frontiers(vec![global_id, sink_plan.from]),
3463            self.controller.storage.export(global_id),
3464        );
3465
3466        // Assert that we can recover the updates that happened at the timestamps of the write
3467        // frontier. This must be true in this call.
3468        let write_frontier = &self
3469            .controller
3470            .storage
3471            .export(global_id)
3472            .expect("sink known to exist")
3473            .write_frontier;
3474        let as_of = ctx.read_hold.least_valid_read();
3475        assert!(
3476            write_frontier.iter().all(|t| as_of.less_than(t)),
3477            "{:?} should be strictly less than {:?}",
3478            &*as_of,
3479            &**write_frontier
3480        );
3481
3482        // Parse the `create_sql` so we can update it to the new sink definition.
3483        //
3484        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3485        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3486        // names in the `create_sql` may have changed, for example due to a schema swap.
3487        let create_sql = &old_sink.create_sql;
3488        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3489        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3490            unreachable!("invalid statement kind for sink");
3491        };
3492
3493        // Update the sink version.
3494        stmt.with_options
3495            .retain(|o| o.name != CreateSinkOptionName::Version);
3496        stmt.with_options.push(CreateSinkOption {
3497            name: CreateSinkOptionName::Version,
3498            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3499                sink_plan.version.to_string(),
3500            ))),
3501        });
3502
3503        let conn_catalog = self.catalog().for_system_session();
3504        let (mut stmt, resolved_ids) =
3505            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3506
3507        // Update the `from` relation.
3508        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3509        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3510        stmt.from = ResolvedItemName::Item {
3511            id: from_entry.id(),
3512            qualifiers: from_entry.name.qualifiers.clone(),
3513            full_name,
3514            print_id: true,
3515            version: from_entry.version,
3516        };
3517
3518        let new_sink = Sink {
3519            create_sql: stmt.to_ast_string_stable(),
3520            global_id,
3521            from: sink_plan.from,
3522            connection: sink_plan.connection.clone(),
3523            envelope: sink_plan.envelope,
3524            version: sink_plan.version,
3525            with_snapshot,
3526            resolved_ids: resolved_ids.clone(),
3527            cluster_id: in_cluster,
3528            commit_interval: sink_plan.commit_interval,
3529        };
3530
3531        let ops = vec![catalog::Op::UpdateItem {
3532            id: item_id,
3533            name: entry.name().clone(),
3534            to_item: CatalogItem::Sink(new_sink),
3535        }];
3536
3537        match self
3538            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3539            .await
3540        {
3541            Ok(()) => {}
3542            Err(err) => {
3543                ctx.retire(Err(err));
3544                return;
3545            }
3546        }
3547
3548        let storage_sink_desc = StorageSinkDesc {
3549            from: sink_plan.from,
3550            from_desc: from_entry
3551                .relation_desc()
3552                .expect("sinks can only be built on items with descs")
3553                .into_owned(),
3554            connection: sink_plan
3555                .connection
3556                .clone()
3557                .into_inline_connection(self.catalog().state()),
3558            envelope: sink_plan.envelope,
3559            as_of,
3560            with_snapshot,
3561            version: sink_plan.version,
3562            from_storage_metadata: (),
3563            to_storage_metadata: (),
3564            commit_interval: sink_plan.commit_interval,
3565        };
3566
3567        self.controller
3568            .storage
3569            .alter_export(
3570                global_id,
3571                ExportDescription {
3572                    sink: storage_sink_desc,
3573                    instance_id: in_cluster,
3574                },
3575            )
3576            .await
3577            .unwrap_or_terminate("cannot fail to alter source desc");
3578
3579        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3580    }
3581
3582    #[instrument]
3583    pub(super) async fn sequence_alter_connection(
3584        &mut self,
3585        ctx: ExecuteContext,
3586        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3587    ) {
3588        match action {
3589            AlterConnectionAction::RotateKeys => {
3590                self.sequence_rotate_keys(ctx, id).await;
3591            }
3592            AlterConnectionAction::AlterOptions {
3593                set_options,
3594                drop_options,
3595                validate,
3596            } => {
3597                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3598                    .await
3599            }
3600        }
3601    }
3602
3603    #[instrument]
3604    async fn sequence_alter_connection_options(
3605        &mut self,
3606        mut ctx: ExecuteContext,
3607        id: CatalogItemId,
3608        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3609        drop_options: BTreeSet<ConnectionOptionName>,
3610        validate: bool,
3611    ) {
3612        let cur_entry = self.catalog().get_entry(&id);
3613        let cur_conn = cur_entry.connection().expect("known to be connection");
3614        let connection_gid = cur_conn.global_id();
3615
3616        let inner = || -> Result<Connection, AdapterError> {
3617            // Parse statement.
3618            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3619                .expect("invalid create sql persisted to catalog")
3620                .into_element()
3621                .ast
3622            {
3623                Statement::CreateConnection(stmt) => stmt,
3624                _ => unreachable!("proved type is source"),
3625            };
3626
3627            let catalog = self.catalog().for_system_session();
3628
3629            // Resolve items in statement
3630            let (mut create_conn_stmt, resolved_ids) =
3631                mz_sql::names::resolve(&catalog, create_conn_stmt)
3632                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3633
3634            // Retain options that are neither set nor dropped.
3635            create_conn_stmt
3636                .values
3637                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3638
3639            // Set new values
3640            create_conn_stmt.values.extend(
3641                set_options
3642                    .into_iter()
3643                    .map(|(name, value)| ConnectionOption { name, value }),
3644            );
3645
3646            // Open a new catalog, which we will use to re-plan our
3647            // statement with the desired config.
3648            let mut catalog = self.catalog().for_system_session();
3649            catalog.mark_id_unresolvable_for_replanning(id);
3650
3651            // Re-define our source in terms of the amended statement
3652            let plan = match mz_sql::plan::plan(
3653                None,
3654                &catalog,
3655                Statement::CreateConnection(create_conn_stmt),
3656                &Params::empty(),
3657                &resolved_ids,
3658            )
3659            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3660            {
3661                Plan::CreateConnection(plan) => plan,
3662                _ => unreachable!("create source plan is only valid response"),
3663            };
3664
3665            // Parse statement.
3666            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3667                .expect("invalid create sql persisted to catalog")
3668                .into_element()
3669                .ast
3670            {
3671                Statement::CreateConnection(stmt) => stmt,
3672                _ => unreachable!("proved type is source"),
3673            };
3674
3675            let catalog = self.catalog().for_system_session();
3676
3677            // Resolve items in statement
3678            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3679                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3680
3681            Ok(Connection {
3682                create_sql: plan.connection.create_sql,
3683                global_id: cur_conn.global_id,
3684                details: plan.connection.details,
3685                resolved_ids: new_deps,
3686            })
3687        };
3688
3689        let conn = match inner() {
3690            Ok(conn) => conn,
3691            Err(e) => {
3692                return ctx.retire(Err(e));
3693            }
3694        };
3695
3696        if validate {
3697            let connection = conn
3698                .details
3699                .to_connection()
3700                .into_inline_connection(self.catalog().state());
3701
3702            let internal_cmd_tx = self.internal_cmd_tx.clone();
3703            let transient_revision = self.catalog().transient_revision();
3704            let conn_id = ctx.session().conn_id().clone();
3705            let otel_ctx = OpenTelemetryContext::obtain();
3706            let role_metadata = ctx.session().role_metadata().clone();
3707            let current_storage_parameters = self.controller.storage.config().clone();
3708
3709            task::spawn(
3710                || format!("validate_alter_connection:{conn_id}"),
3711                async move {
3712                    let resolved_ids = conn.resolved_ids.clone();
3713                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3714                    let result = match connection.validate(id, &current_storage_parameters).await {
3715                        Ok(()) => Ok(conn),
3716                        Err(err) => Err(err.into()),
3717                    };
3718
3719                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3720                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3721                        AlterConnectionValidationReady {
3722                            ctx,
3723                            result,
3724                            connection_id: id,
3725                            connection_gid,
3726                            plan_validity: PlanValidity::new(
3727                                transient_revision,
3728                                dependency_ids.clone(),
3729                                None,
3730                                None,
3731                                role_metadata,
3732                            ),
3733                            otel_ctx,
3734                            resolved_ids,
3735                        },
3736                    ));
3737                    if let Err(e) = result {
3738                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3739                    }
3740                },
3741            );
3742        } else {
3743            let result = self
3744                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3745                .await;
3746            ctx.retire(result);
3747        }
3748    }
3749
3750    #[instrument]
3751    pub(crate) async fn sequence_alter_connection_stage_finish(
3752        &mut self,
3753        session: &Session,
3754        id: CatalogItemId,
3755        connection: Connection,
3756    ) -> Result<ExecuteResponse, AdapterError> {
3757        match self.catalog.get_entry(&id).item() {
3758            CatalogItem::Connection(curr_conn) => {
3759                curr_conn
3760                    .details
3761                    .to_connection()
3762                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3763                    .map_err(StorageError::from)?;
3764            }
3765            _ => unreachable!("known to be a connection"),
3766        };
3767
3768        let ops = vec![catalog::Op::UpdateItem {
3769            id,
3770            name: self.catalog.get_entry(&id).name().clone(),
3771            to_item: CatalogItem::Connection(connection.clone()),
3772        }];
3773
3774        self.catalog_transact(Some(session), ops).await?;
3775
3776        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3777        // and propagating connection changes to dependent sources, sinks, and
3778        // tables) is handled in `apply_catalog_implications` via
3779        // `handle_alter_connection`. The catalog transact above triggers that
3780        // code path.
3781
3782        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3783    }
3784
3785    #[instrument]
3786    pub(super) async fn sequence_alter_source(
3787        &mut self,
3788        session: &Session,
3789        plan::AlterSourcePlan {
3790            item_id,
3791            ingestion_id,
3792            action,
3793        }: plan::AlterSourcePlan,
3794    ) -> Result<ExecuteResponse, AdapterError> {
3795        let cur_entry = self.catalog().get_entry(&item_id);
3796        let cur_source = cur_entry.source().expect("known to be source");
3797
3798        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3799            // Parse statement.
3800            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3801                .expect("invalid create sql persisted to catalog")
3802                .into_element()
3803                .ast
3804            {
3805                Statement::CreateSource(stmt) => stmt,
3806                _ => unreachable!("proved type is source"),
3807            };
3808
3809            let catalog = coord.catalog().for_system_session();
3810
3811            // Resolve items in statement
3812            mz_sql::names::resolve(&catalog, create_source_stmt)
3813                .map_err(|e| AdapterError::internal(err_cx, e))
3814        };
3815
3816        match action {
3817            plan::AlterSourceAction::AddSubsourceExports {
3818                subsources,
3819                options,
3820            } => {
3821                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3822
3823                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3824                    text_columns: mut new_text_columns,
3825                    exclude_columns: mut new_exclude_columns,
3826                    ..
3827                } = options.try_into()?;
3828
3829                // Resolve items in statement
3830                let (mut create_source_stmt, resolved_ids) =
3831                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3832
3833                // Get all currently referred-to items
3834                let catalog = self.catalog();
3835                let curr_references: BTreeSet<_> = catalog
3836                    .get_entry(&item_id)
3837                    .used_by()
3838                    .into_iter()
3839                    .filter_map(|subsource| {
3840                        catalog
3841                            .get_entry(subsource)
3842                            .subsource_details()
3843                            .map(|(_id, reference, _details)| reference)
3844                    })
3845                    .collect();
3846
3847                // We are doing a lot of unwrapping, so just make an error to reference; all of
3848                // these invariants are guaranteed to be true because of how we plan subsources.
3849                let purification_err =
3850                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3851
3852                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3853                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3854                // statement.
3855                match &mut create_source_stmt.connection {
3856                    CreateSourceConnection::Postgres {
3857                        options: curr_options,
3858                        ..
3859                    } => {
3860                        let mz_sql::plan::PgConfigOptionExtracted {
3861                            mut text_columns, ..
3862                        } = curr_options.clone().try_into()?;
3863
3864                        // Drop text columns; we will add them back in
3865                        // as appropriate below.
3866                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3867
3868                        // Drop all text columns that are not currently referred to.
3869                        text_columns.retain(|column_qualified_reference| {
3870                            mz_ore::soft_assert_eq_or_log!(
3871                                column_qualified_reference.0.len(),
3872                                4,
3873                                "all TEXT COLUMNS values must be column-qualified references"
3874                            );
3875                            let mut table = column_qualified_reference.clone();
3876                            table.0.truncate(3);
3877                            curr_references.contains(&table)
3878                        });
3879
3880                        // Merge the current text columns into the new text columns.
3881                        new_text_columns.extend(text_columns);
3882
3883                        // If we have text columns, add them to the options.
3884                        if !new_text_columns.is_empty() {
3885                            new_text_columns.sort();
3886                            let new_text_columns = new_text_columns
3887                                .into_iter()
3888                                .map(WithOptionValue::UnresolvedItemName)
3889                                .collect();
3890
3891                            curr_options.push(PgConfigOption {
3892                                name: PgConfigOptionName::TextColumns,
3893                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3894                            });
3895                        }
3896                    }
3897                    CreateSourceConnection::MySql {
3898                        options: curr_options,
3899                        ..
3900                    } => {
3901                        let mz_sql::plan::MySqlConfigOptionExtracted {
3902                            mut text_columns,
3903                            mut exclude_columns,
3904                            ..
3905                        } = curr_options.clone().try_into()?;
3906
3907                        // Drop both ignore and text columns; we will add them back in
3908                        // as appropriate below.
3909                        curr_options.retain(|o| {
3910                            !matches!(
3911                                o.name,
3912                                MySqlConfigOptionName::TextColumns
3913                                    | MySqlConfigOptionName::ExcludeColumns
3914                            )
3915                        });
3916
3917                        // Drop all text / exclude columns that are not currently referred to.
3918                        let column_referenced =
3919                            |column_qualified_reference: &UnresolvedItemName| {
3920                                mz_ore::soft_assert_eq_or_log!(
3921                                    column_qualified_reference.0.len(),
3922                                    3,
3923                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3924                                );
3925                                let mut table = column_qualified_reference.clone();
3926                                table.0.truncate(2);
3927                                curr_references.contains(&table)
3928                            };
3929                        text_columns.retain(column_referenced);
3930                        exclude_columns.retain(column_referenced);
3931
3932                        // Merge the current text / exclude columns into the new text / exclude columns.
3933                        new_text_columns.extend(text_columns);
3934                        new_exclude_columns.extend(exclude_columns);
3935
3936                        // If we have text columns, add them to the options.
3937                        if !new_text_columns.is_empty() {
3938                            new_text_columns.sort();
3939                            let new_text_columns = new_text_columns
3940                                .into_iter()
3941                                .map(WithOptionValue::UnresolvedItemName)
3942                                .collect();
3943
3944                            curr_options.push(MySqlConfigOption {
3945                                name: MySqlConfigOptionName::TextColumns,
3946                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3947                            });
3948                        }
3949                        // If we have exclude columns, add them to the options.
3950                        if !new_exclude_columns.is_empty() {
3951                            new_exclude_columns.sort();
3952                            let new_exclude_columns = new_exclude_columns
3953                                .into_iter()
3954                                .map(WithOptionValue::UnresolvedItemName)
3955                                .collect();
3956
3957                            curr_options.push(MySqlConfigOption {
3958                                name: MySqlConfigOptionName::ExcludeColumns,
3959                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3960                            });
3961                        }
3962                    }
3963                    CreateSourceConnection::SqlServer {
3964                        options: curr_options,
3965                        ..
3966                    } => {
3967                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3968                            mut text_columns,
3969                            mut exclude_columns,
3970                            ..
3971                        } = curr_options.clone().try_into()?;
3972
3973                        // Drop both ignore and text columns; we will add them back in
3974                        // as appropriate below.
3975                        curr_options.retain(|o| {
3976                            !matches!(
3977                                o.name,
3978                                SqlServerConfigOptionName::TextColumns
3979                                    | SqlServerConfigOptionName::ExcludeColumns
3980                            )
3981                        });
3982
3983                        // Drop all text / exclude columns that are not currently referred to.
3984                        let column_referenced =
3985                            |column_qualified_reference: &UnresolvedItemName| {
3986                                mz_ore::soft_assert_eq_or_log!(
3987                                    column_qualified_reference.0.len(),
3988                                    3,
3989                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3990                                );
3991                                let mut table = column_qualified_reference.clone();
3992                                table.0.truncate(2);
3993                                curr_references.contains(&table)
3994                            };
3995                        text_columns.retain(column_referenced);
3996                        exclude_columns.retain(column_referenced);
3997
3998                        // Merge the current text / exclude columns into the new text / exclude columns.
3999                        new_text_columns.extend(text_columns);
4000                        new_exclude_columns.extend(exclude_columns);
4001
4002                        // If we have text columns, add them to the options.
4003                        if !new_text_columns.is_empty() {
4004                            new_text_columns.sort();
4005                            let new_text_columns = new_text_columns
4006                                .into_iter()
4007                                .map(WithOptionValue::UnresolvedItemName)
4008                                .collect();
4009
4010                            curr_options.push(SqlServerConfigOption {
4011                                name: SqlServerConfigOptionName::TextColumns,
4012                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4013                            });
4014                        }
4015                        // If we have exclude columns, add them to the options.
4016                        if !new_exclude_columns.is_empty() {
4017                            new_exclude_columns.sort();
4018                            let new_exclude_columns = new_exclude_columns
4019                                .into_iter()
4020                                .map(WithOptionValue::UnresolvedItemName)
4021                                .collect();
4022
4023                            curr_options.push(SqlServerConfigOption {
4024                                name: SqlServerConfigOptionName::ExcludeColumns,
4025                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4026                            });
4027                        }
4028                    }
4029                    _ => return Err(purification_err()),
4030                };
4031
4032                let mut catalog = self.catalog().for_system_session();
4033                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4034
4035                // Re-define our source in terms of the amended statement
4036                let plan = match mz_sql::plan::plan(
4037                    None,
4038                    &catalog,
4039                    Statement::CreateSource(create_source_stmt),
4040                    &Params::empty(),
4041                    &resolved_ids,
4042                )
4043                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4044                {
4045                    Plan::CreateSource(plan) => plan,
4046                    _ => unreachable!("create source plan is only valid response"),
4047                };
4048
4049                // Asserting that we've done the right thing with dependencies
4050                // here requires mocking out objects in the catalog, which is a
4051                // large task for an operation we have to cover in tests anyway.
4052                let source = Source::new(
4053                    plan,
4054                    cur_source.global_id,
4055                    resolved_ids,
4056                    cur_source.custom_logical_compaction_window,
4057                    cur_source.is_retained_metrics_object,
4058                );
4059
4060                // Get new ingestion description for storage.
4061                let desc = match &source.data_source {
4062                    DataSourceDesc::Ingestion { desc, .. }
4063                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4064                        desc.clone().into_inline_connection(self.catalog().state())
4065                    }
4066                    _ => unreachable!("already verified of type ingestion"),
4067                };
4068
4069                self.controller
4070                    .storage
4071                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4072                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4073
4074                // Redefine source. This must be done before we create any new
4075                // subsources so that it has the right ingestion.
4076                let mut ops = vec![catalog::Op::UpdateItem {
4077                    id: item_id,
4078                    // Look this up again so we don't have to hold an immutable reference to the
4079                    // entry for so long.
4080                    name: self.catalog.get_entry(&item_id).name().clone(),
4081                    to_item: CatalogItem::Source(source),
4082                }];
4083
4084                let CreateSourceInner {
4085                    ops: new_ops,
4086                    sources: _,
4087                    if_not_exists_ids,
4088                } = self.create_source_inner(session, subsources).await?;
4089
4090                ops.extend(new_ops.into_iter());
4091
4092                assert!(
4093                    if_not_exists_ids.is_empty(),
4094                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4095                );
4096
4097                self.catalog_transact(Some(session), ops).await?;
4098            }
4099            plan::AlterSourceAction::RefreshReferences { references } => {
4100                self.catalog_transact(
4101                    Some(session),
4102                    vec![catalog::Op::UpdateSourceReferences {
4103                        source_id: item_id,
4104                        references: references.into(),
4105                    }],
4106                )
4107                .await?;
4108            }
4109        }
4110
4111        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4112    }
4113
4114    #[instrument]
4115    pub(super) async fn sequence_alter_system_set(
4116        &mut self,
4117        session: &Session,
4118        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4119    ) -> Result<ExecuteResponse, AdapterError> {
4120        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4121        // We want to ensure that the network policy we're switching too actually exists.
4122        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4123            self.validate_alter_system_network_policy(session, &value)?;
4124        }
4125
4126        let op = match value {
4127            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4128                name: name.clone(),
4129                value: OwnedVarInput::SqlSet(values),
4130            },
4131            plan::VariableValue::Default => {
4132                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4133            }
4134        };
4135        self.catalog_transact(Some(session), vec![op]).await?;
4136
4137        session.add_notice(AdapterNotice::VarDefaultUpdated {
4138            role: None,
4139            var_name: Some(name),
4140        });
4141        Ok(ExecuteResponse::AlteredSystemConfiguration)
4142    }
4143
4144    #[instrument]
4145    pub(super) async fn sequence_alter_system_reset(
4146        &mut self,
4147        session: &Session,
4148        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4149    ) -> Result<ExecuteResponse, AdapterError> {
4150        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4151        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4152        self.catalog_transact(Some(session), vec![op]).await?;
4153        session.add_notice(AdapterNotice::VarDefaultUpdated {
4154            role: None,
4155            var_name: Some(name),
4156        });
4157        Ok(ExecuteResponse::AlteredSystemConfiguration)
4158    }
4159
4160    #[instrument]
4161    pub(super) async fn sequence_alter_system_reset_all(
4162        &mut self,
4163        session: &Session,
4164        _: plan::AlterSystemResetAllPlan,
4165    ) -> Result<ExecuteResponse, AdapterError> {
4166        self.is_user_allowed_to_alter_system(session, None)?;
4167        let op = catalog::Op::ResetAllSystemConfiguration;
4168        self.catalog_transact(Some(session), vec![op]).await?;
4169        session.add_notice(AdapterNotice::VarDefaultUpdated {
4170            role: None,
4171            var_name: None,
4172        });
4173        Ok(ExecuteResponse::AlteredSystemConfiguration)
4174    }
4175
4176    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4177    fn is_user_allowed_to_alter_system(
4178        &self,
4179        session: &Session,
4180        var_name: Option<&str>,
4181    ) -> Result<(), AdapterError> {
4182        match (session.user().kind(), var_name) {
4183            // Only internal superusers can reset all system variables.
4184            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4185            // Whether or not a variable can be modified depends if we're an internal superuser.
4186            (UserKind::Superuser, Some(name))
4187                if session.user().is_internal()
4188                    || self.catalog().system_config().user_modifiable(name) =>
4189            {
4190                // In lieu of plumbing the user to all system config functions, just check that
4191                // the var is visible.
4192                let var = self.catalog().system_config().get(name)?;
4193                var.visible(session.user(), self.catalog().system_config())?;
4194                Ok(())
4195            }
4196            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4197            // session variables.
4198            (UserKind::Regular, Some(name))
4199                if self.catalog().system_config().user_modifiable(name) =>
4200            {
4201                Err(AdapterError::Unauthorized(
4202                    rbac::UnauthorizedError::Superuser {
4203                        action: format!("toggle the '{name}' system configuration parameter"),
4204                    },
4205                ))
4206            }
4207            _ => Err(AdapterError::Unauthorized(
4208                rbac::UnauthorizedError::MzSystem {
4209                    action: "alter system".into(),
4210                },
4211            )),
4212        }
4213    }
4214
4215    fn validate_alter_system_network_policy(
4216        &self,
4217        session: &Session,
4218        policy_value: &plan::VariableValue,
4219    ) -> Result<(), AdapterError> {
4220        let policy_name = match &policy_value {
4221            // Make sure the compiled in default still exists.
4222            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4223            plan::VariableValue::Values(values) if values.len() == 1 => {
4224                values.iter().next().cloned()
4225            }
4226            plan::VariableValue::Values(values) => {
4227                tracing::warn!(?values, "can't set multiple network policies at once");
4228                None
4229            }
4230        };
4231        let maybe_network_policy = policy_name
4232            .as_ref()
4233            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4234        let Some(network_policy) = maybe_network_policy else {
4235            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4236                VarError::InvalidParameterValue {
4237                    name: NETWORK_POLICY.name(),
4238                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4239                    reason: "no network policy with such name exists".to_string(),
4240                },
4241            )));
4242        };
4243        self.validate_alter_network_policy(session, &network_policy.rules)
4244    }
4245
4246    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4247    ///
4248    /// This helps prevent users from modifying network policies in a way that would lock out their
4249    /// current connection.
4250    fn validate_alter_network_policy(
4251        &self,
4252        session: &Session,
4253        policy_rules: &Vec<NetworkPolicyRule>,
4254    ) -> Result<(), AdapterError> {
4255        // If the user is not an internal user attempt to protect them from
4256        // blocking themselves.
4257        if session.user().is_internal() {
4258            return Ok(());
4259        }
4260        if let Some(ip) = session.meta().client_ip() {
4261            validate_ip_with_policy_rules(ip, policy_rules)
4262                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4263        } else {
4264            // Sessions without IPs are only temporarily constructed for default values
4265            // they should not be permitted here.
4266            return Err(AdapterError::NetworkPolicyDenied(
4267                NetworkPolicyError::MissingIp,
4268            ));
4269        }
4270        Ok(())
4271    }
4272
4273    // Returns the name of the portal to execute.
4274    #[instrument]
4275    pub(super) fn sequence_execute(
4276        &self,
4277        session: &mut Session,
4278        plan: plan::ExecutePlan,
4279    ) -> Result<String, AdapterError> {
4280        // Verify the stmt is still valid.
4281        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4282        let ps = session
4283            .get_prepared_statement_unverified(&plan.name)
4284            .expect("known to exist");
4285        let stmt = ps.stmt().cloned();
4286        let desc = ps.desc().clone();
4287        let state_revision = ps.state_revision;
4288        let logging = Arc::clone(ps.logging());
4289        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4290    }
4291
4292    #[instrument]
4293    pub(super) async fn sequence_grant_privileges(
4294        &mut self,
4295        session: &Session,
4296        plan::GrantPrivilegesPlan {
4297            update_privileges,
4298            grantees,
4299        }: plan::GrantPrivilegesPlan,
4300    ) -> Result<ExecuteResponse, AdapterError> {
4301        self.sequence_update_privileges(
4302            session,
4303            update_privileges,
4304            grantees,
4305            UpdatePrivilegeVariant::Grant,
4306        )
4307        .await
4308    }
4309
4310    #[instrument]
4311    pub(super) async fn sequence_revoke_privileges(
4312        &mut self,
4313        session: &Session,
4314        plan::RevokePrivilegesPlan {
4315            update_privileges,
4316            revokees,
4317        }: plan::RevokePrivilegesPlan,
4318    ) -> Result<ExecuteResponse, AdapterError> {
4319        self.sequence_update_privileges(
4320            session,
4321            update_privileges,
4322            revokees,
4323            UpdatePrivilegeVariant::Revoke,
4324        )
4325        .await
4326    }
4327
4328    #[instrument]
4329    async fn sequence_update_privileges(
4330        &mut self,
4331        session: &Session,
4332        update_privileges: Vec<UpdatePrivilege>,
4333        grantees: Vec<RoleId>,
4334        variant: UpdatePrivilegeVariant,
4335    ) -> Result<ExecuteResponse, AdapterError> {
4336        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4337        let mut warnings = Vec::new();
4338        let catalog = self.catalog().for_session(session);
4339
4340        for UpdatePrivilege {
4341            acl_mode,
4342            target_id,
4343            grantor,
4344        } in update_privileges
4345        {
4346            let actual_object_type = catalog.get_system_object_type(&target_id);
4347            // For all relations we allow all applicable table privileges, but send a warning if the
4348            // privilege isn't actually applicable to the object type.
4349            if actual_object_type.is_relation() {
4350                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4351                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4352                if !non_applicable_privileges.is_empty() {
4353                    let object_description =
4354                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4355                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4356                        non_applicable_privileges,
4357                        object_description,
4358                    })
4359                }
4360            }
4361
4362            if let SystemObjectId::Object(object_id) = &target_id {
4363                self.catalog()
4364                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4365            }
4366
4367            let privileges = self
4368                .catalog()
4369                .get_privileges(&target_id, session.conn_id())
4370                // Should be unreachable since the parser will refuse to parse grant/revoke
4371                // statements on objects without privileges.
4372                .ok_or(AdapterError::Unsupported(
4373                    "GRANTs/REVOKEs on an object type with no privileges",
4374                ))?;
4375
4376            for grantee in &grantees {
4377                self.catalog().ensure_not_system_role(grantee)?;
4378                self.catalog().ensure_not_predefined_role(grantee)?;
4379                let existing_privilege = privileges
4380                    .get_acl_item(grantee, &grantor)
4381                    .map(Cow::Borrowed)
4382                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4383
4384                match variant {
4385                    UpdatePrivilegeVariant::Grant
4386                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4387                    {
4388                        ops.push(catalog::Op::UpdatePrivilege {
4389                            target_id: target_id.clone(),
4390                            privilege: MzAclItem {
4391                                grantee: *grantee,
4392                                grantor,
4393                                acl_mode,
4394                            },
4395                            variant,
4396                        });
4397                    }
4398                    UpdatePrivilegeVariant::Revoke
4399                        if !existing_privilege
4400                            .acl_mode
4401                            .intersection(acl_mode)
4402                            .is_empty() =>
4403                    {
4404                        ops.push(catalog::Op::UpdatePrivilege {
4405                            target_id: target_id.clone(),
4406                            privilege: MzAclItem {
4407                                grantee: *grantee,
4408                                grantor,
4409                                acl_mode,
4410                            },
4411                            variant,
4412                        });
4413                    }
4414                    // no-op
4415                    _ => {}
4416                }
4417            }
4418        }
4419
4420        if ops.is_empty() {
4421            session.add_notices(warnings);
4422            return Ok(variant.into());
4423        }
4424
4425        let res = self
4426            .catalog_transact(Some(session), ops)
4427            .await
4428            .map(|_| match variant {
4429                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4430                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4431            });
4432        if res.is_ok() {
4433            session.add_notices(warnings);
4434        }
4435        res
4436    }
4437
4438    #[instrument]
4439    pub(super) async fn sequence_alter_default_privileges(
4440        &mut self,
4441        session: &Session,
4442        plan::AlterDefaultPrivilegesPlan {
4443            privilege_objects,
4444            privilege_acl_items,
4445            is_grant,
4446        }: plan::AlterDefaultPrivilegesPlan,
4447    ) -> Result<ExecuteResponse, AdapterError> {
4448        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4449        let variant = if is_grant {
4450            UpdatePrivilegeVariant::Grant
4451        } else {
4452            UpdatePrivilegeVariant::Revoke
4453        };
4454        for privilege_object in &privilege_objects {
4455            self.catalog()
4456                .ensure_not_system_role(&privilege_object.role_id)?;
4457            self.catalog()
4458                .ensure_not_predefined_role(&privilege_object.role_id)?;
4459            if let Some(database_id) = privilege_object.database_id {
4460                self.catalog()
4461                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4462            }
4463            if let Some(schema_id) = privilege_object.schema_id {
4464                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4465                let schema_spec: SchemaSpecifier = schema_id.into();
4466
4467                self.catalog().ensure_not_reserved_object(
4468                    &(database_spec, schema_spec).into(),
4469                    session.conn_id(),
4470                )?;
4471            }
4472            for privilege_acl_item in &privilege_acl_items {
4473                self.catalog()
4474                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4475                self.catalog()
4476                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4477                ops.push(catalog::Op::UpdateDefaultPrivilege {
4478                    privilege_object: privilege_object.clone(),
4479                    privilege_acl_item: privilege_acl_item.clone(),
4480                    variant,
4481                })
4482            }
4483        }
4484
4485        self.catalog_transact(Some(session), ops).await?;
4486        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4487    }
4488
4489    #[instrument]
4490    pub(super) async fn sequence_grant_role(
4491        &mut self,
4492        session: &Session,
4493        plan::GrantRolePlan {
4494            role_ids,
4495            member_ids,
4496            grantor_id,
4497        }: plan::GrantRolePlan,
4498    ) -> Result<ExecuteResponse, AdapterError> {
4499        let catalog = self.catalog();
4500        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4501        for role_id in role_ids {
4502            for member_id in &member_ids {
4503                let member_membership: BTreeSet<_> =
4504                    catalog.get_role(member_id).membership().keys().collect();
4505                if member_membership.contains(&role_id) {
4506                    let role_name = catalog.get_role(&role_id).name().to_string();
4507                    let member_name = catalog.get_role(member_id).name().to_string();
4508                    // We need this check so we don't accidentally return a success on a reserved role.
4509                    catalog.ensure_not_reserved_role(member_id)?;
4510                    catalog.ensure_grantable_role(&role_id)?;
4511                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4512                        role_name,
4513                        member_name,
4514                    });
4515                } else {
4516                    ops.push(catalog::Op::GrantRole {
4517                        role_id,
4518                        member_id: *member_id,
4519                        grantor_id,
4520                    });
4521                }
4522            }
4523        }
4524
4525        if ops.is_empty() {
4526            return Ok(ExecuteResponse::GrantedRole);
4527        }
4528
4529        self.catalog_transact(Some(session), ops)
4530            .await
4531            .map(|_| ExecuteResponse::GrantedRole)
4532    }
4533
4534    #[instrument]
4535    pub(super) async fn sequence_revoke_role(
4536        &mut self,
4537        session: &Session,
4538        plan::RevokeRolePlan {
4539            role_ids,
4540            member_ids,
4541            grantor_id,
4542        }: plan::RevokeRolePlan,
4543    ) -> Result<ExecuteResponse, AdapterError> {
4544        let catalog = self.catalog();
4545        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4546        for role_id in role_ids {
4547            for member_id in &member_ids {
4548                let member_membership: BTreeSet<_> =
4549                    catalog.get_role(member_id).membership().keys().collect();
4550                if !member_membership.contains(&role_id) {
4551                    let role_name = catalog.get_role(&role_id).name().to_string();
4552                    let member_name = catalog.get_role(member_id).name().to_string();
4553                    // We need this check so we don't accidentally return a success on a reserved role.
4554                    catalog.ensure_not_reserved_role(member_id)?;
4555                    catalog.ensure_grantable_role(&role_id)?;
4556                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4557                        role_name,
4558                        member_name,
4559                    });
4560                } else {
4561                    ops.push(catalog::Op::RevokeRole {
4562                        role_id,
4563                        member_id: *member_id,
4564                        grantor_id,
4565                    });
4566                }
4567            }
4568        }
4569
4570        if ops.is_empty() {
4571            return Ok(ExecuteResponse::RevokedRole);
4572        }
4573
4574        self.catalog_transact(Some(session), ops)
4575            .await
4576            .map(|_| ExecuteResponse::RevokedRole)
4577    }
4578
4579    #[instrument]
4580    pub(super) async fn sequence_alter_owner(
4581        &mut self,
4582        session: &Session,
4583        plan::AlterOwnerPlan {
4584            id,
4585            object_type,
4586            new_owner,
4587        }: plan::AlterOwnerPlan,
4588    ) -> Result<ExecuteResponse, AdapterError> {
4589        let mut ops = vec![catalog::Op::UpdateOwner {
4590            id: id.clone(),
4591            new_owner,
4592        }];
4593
4594        match &id {
4595            ObjectId::Item(global_id) => {
4596                let entry = self.catalog().get_entry(global_id);
4597
4598                // Cannot directly change the owner of an index.
4599                if entry.is_index() {
4600                    let name = self
4601                        .catalog()
4602                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4603                        .to_string();
4604                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4605                    return Ok(ExecuteResponse::AlteredObject(object_type));
4606                }
4607
4608                // Alter owner cascades down to dependent indexes.
4609                let dependent_index_ops = entry
4610                    .used_by()
4611                    .into_iter()
4612                    .filter(|id| self.catalog().get_entry(id).is_index())
4613                    .map(|id| catalog::Op::UpdateOwner {
4614                        id: ObjectId::Item(*id),
4615                        new_owner,
4616                    });
4617                ops.extend(dependent_index_ops);
4618
4619                // Alter owner cascades down to progress collections.
4620                let dependent_subsources =
4621                    entry
4622                        .progress_id()
4623                        .into_iter()
4624                        .map(|item_id| catalog::Op::UpdateOwner {
4625                            id: ObjectId::Item(item_id),
4626                            new_owner,
4627                        });
4628                ops.extend(dependent_subsources);
4629            }
4630            ObjectId::Cluster(cluster_id) => {
4631                let cluster = self.catalog().get_cluster(*cluster_id);
4632                // Alter owner cascades down to cluster replicas.
4633                let managed_cluster_replica_ops =
4634                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4635                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4636                        new_owner,
4637                    });
4638                ops.extend(managed_cluster_replica_ops);
4639            }
4640            _ => {}
4641        }
4642
4643        self.catalog_transact(Some(session), ops)
4644            .await
4645            .map(|_| ExecuteResponse::AlteredObject(object_type))
4646    }
4647
4648    #[instrument]
4649    pub(super) async fn sequence_reassign_owned(
4650        &mut self,
4651        session: &Session,
4652        plan::ReassignOwnedPlan {
4653            old_roles,
4654            new_role,
4655            reassign_ids,
4656        }: plan::ReassignOwnedPlan,
4657    ) -> Result<ExecuteResponse, AdapterError> {
4658        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4659            self.catalog().ensure_not_reserved_role(role_id)?;
4660        }
4661
4662        let ops = reassign_ids
4663            .into_iter()
4664            .map(|id| catalog::Op::UpdateOwner {
4665                id,
4666                new_owner: new_role,
4667            })
4668            .collect();
4669
4670        self.catalog_transact(Some(session), ops)
4671            .await
4672            .map(|_| ExecuteResponse::ReassignOwned)
4673    }
4674
4675    #[instrument]
4676    pub(crate) async fn handle_deferred_statement(&mut self) {
4677        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4678        // was processed, removing the single element from deferred_statements, so it is expected
4679        // that this is sometimes empty.
4680        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4681            return;
4682        };
4683        match ps {
4684            crate::coord::PlanStatement::Statement { stmt, params } => {
4685                self.handle_execute_inner(stmt, params, ctx).await;
4686            }
4687            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4688                self.sequence_plan(ctx, plan, resolved_ids).await;
4689            }
4690        }
4691    }
4692
4693    #[instrument]
4694    // TODO(parkmycar): Remove this once we have an actual implementation.
4695    #[allow(clippy::unused_async)]
4696    pub(super) async fn sequence_alter_table(
4697        &mut self,
4698        ctx: &mut ExecuteContext,
4699        plan: plan::AlterTablePlan,
4700    ) -> Result<ExecuteResponse, AdapterError> {
4701        let plan::AlterTablePlan {
4702            relation_id,
4703            column_name,
4704            column_type,
4705            raw_sql_type,
4706        } = plan;
4707
4708        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4709        let (_, new_global_id) = self.allocate_user_id().await?;
4710        let ops = vec![catalog::Op::AlterAddColumn {
4711            id: relation_id,
4712            new_global_id,
4713            name: column_name,
4714            typ: column_type,
4715            sql: raw_sql_type,
4716        }];
4717
4718        self.catalog_transact_with_context(None, Some(ctx), ops)
4719            .await?;
4720
4721        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4722    }
4723
4724    /// Prepares to apply a replacement materialized view.
4725    #[instrument]
4726    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4727        &mut self,
4728        ctx: ExecuteContext,
4729        plan: AlterMaterializedViewApplyReplacementPlan,
4730    ) {
4731        // To ensure there is no time gap in the output, we can only apply a replacement if the
4732        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4733        // might not be the case initially, so we have to wait. To this end, we install a watch set
4734        // waiting for the target MV's write frontier to advance sufficiently.
4735        //
4736        // Note that the replacement's dataflow is not performing any writes, so it can only be
4737        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4738        // replacement's write frontier is always <= the target's.
4739
4740        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4741
4742        let plan_validity = PlanValidity::new(
4743            self.catalog().transient_revision(),
4744            BTreeSet::from_iter([id, replacement_id]),
4745            None,
4746            None,
4747            ctx.session().role_metadata().clone(),
4748        );
4749
4750        let target = self.catalog.get_entry(&id);
4751        let target_gid = target.latest_global_id();
4752
4753        let replacement = self.catalog.get_entry(&replacement_id);
4754        let replacement_gid = replacement.latest_global_id();
4755
4756        let target_upper = self
4757            .controller
4758            .storage_collections
4759            .collection_frontiers(target_gid)
4760            .expect("target MV exists")
4761            .write_frontier;
4762        let replacement_upper = self
4763            .controller
4764            .compute
4765            .collection_frontiers(replacement_gid, replacement.cluster_id())
4766            .expect("replacement MV exists")
4767            .write_frontier;
4768
4769        info!(
4770            %id, %replacement_id, ?target_upper, ?replacement_upper,
4771            "preparing materialized view replacement application",
4772        );
4773
4774        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4775            // A replacement's write frontier can only become empty if the target's write frontier
4776            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4777            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4778            // the user by returning an error, rather than applying the useless replacement.
4779            //
4780            // Note that we can't assert on `target_upper` being empty here, because the reporting
4781            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4782            // from persist, which we cannot do without incurring I/O.
4783            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4784                name: target.name().item.clone(),
4785            }));
4786            return;
4787        };
4788
4789        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4790        // specified timestamp. Since we only need to wait until the target frontier is >= the
4791        // replacement's frontier, we can step back the timestamp.
4792        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4793
4794        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4795        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4796        // command.
4797        self.install_storage_watch_set(
4798            ctx.session().conn_id().clone(),
4799            BTreeSet::from_iter([target_gid]),
4800            replacement_upper_ts,
4801            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4802                ctx: Some(ctx),
4803                otel_ctx: OpenTelemetryContext::obtain(),
4804                plan,
4805                plan_validity,
4806            }),
4807        )
4808        .expect("target collection exists");
4809    }
4810
4811    /// Finishes applying a replacement materialized view after the frontier wait completed.
4812    #[instrument]
4813    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4814        &mut self,
4815        mut ctx: AlterMaterializedViewReadyContext,
4816    ) {
4817        ctx.otel_ctx.attach_as_parent();
4818
4819        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4820
4821        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4822        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4823        // world has arbitrarily changed since we performed planning, and we must re-assert
4824        // that it still matches our requirements.
4825        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4826            ctx.retire(Err(err));
4827            return;
4828        }
4829
4830        info!(
4831            %id, %replacement_id,
4832            "finishing materialized view replacement application",
4833        );
4834
4835        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4836        match self
4837            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4838            .await
4839        {
4840            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4841                ObjectType::MaterializedView,
4842            ))),
4843            Err(err) => ctx.retire(Err(err)),
4844        }
4845    }
4846
4847    pub(super) async fn statistics_oracle(
4848        &self,
4849        session: &Session,
4850        source_ids: &BTreeSet<GlobalId>,
4851        query_as_of: &Antichain<Timestamp>,
4852        is_oneshot: bool,
4853    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4854        super::statistics_oracle(
4855            session,
4856            source_ids,
4857            query_as_of,
4858            is_oneshot,
4859            self.catalog().system_config(),
4860            self.controller.storage_collections.as_ref(),
4861        )
4862        .await
4863    }
4864}
4865
4866impl Coordinator {
4867    /// Process the metainfo from a newly created non-transient dataflow.
4868    async fn process_dataflow_metainfo(
4869        &mut self,
4870        df_meta: DataflowMetainfo,
4871        export_id: GlobalId,
4872        ctx: Option<&mut ExecuteContext>,
4873        notice_ids: Vec<GlobalId>,
4874    ) -> Option<BuiltinTableAppendNotify> {
4875        // Emit raw notices to the user.
4876        if let Some(ctx) = ctx {
4877            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4878        }
4879
4880        // Create a metainfo with rendered notices.
4881        let df_meta = self
4882            .catalog()
4883            .render_notices(df_meta, notice_ids, Some(export_id));
4884
4885        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4886        // in-memory state.
4887        if self.catalog().state().system_config().enable_mz_notices()
4888            && !df_meta.optimizer_notices.is_empty()
4889        {
4890            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4891            self.catalog().state().pack_optimizer_notices(
4892                &mut builtin_table_updates,
4893                df_meta.optimizer_notices.iter(),
4894                Diff::ONE,
4895            );
4896
4897            // Save the metainfo.
4898            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4899
4900            Some(
4901                self.builtin_table_update()
4902                    .execute(builtin_table_updates)
4903                    .await
4904                    .0,
4905            )
4906        } else {
4907            // Save the metainfo.
4908            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4909
4910            None
4911        }
4912    }
4913}