Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43    RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48    SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58    SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62    StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106    WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114    WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119/// A future that resolves to a real-time recency timestamp.
120type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = return_if_err!(res, ctx);
255                    f(ctx, next);
256                }
257                _ = rx, if cancel_enabled => {
258                    ctx.retire(Err(AdapterError::Canceled));
259                }
260            }
261        });
262    }
263
264    async fn create_source_inner(
265        &self,
266        session: &Session,
267        plans: Vec<plan::CreateSourcePlanBundle>,
268    ) -> Result<CreateSourceInner, AdapterError> {
269        let mut ops = vec![];
270        let mut sources = vec![];
271
272        let if_not_exists_ids = plans
273            .iter()
274            .filter_map(
275                |plan::CreateSourcePlanBundle {
276                     item_id,
277                     global_id: _,
278                     plan,
279                     resolved_ids: _,
280                     available_source_references: _,
281                 }| {
282                    if plan.if_not_exists {
283                        Some((*item_id, plan.name.clone()))
284                    } else {
285                        None
286                    }
287                },
288            )
289            .collect::<BTreeMap<_, _>>();
290
291        for plan::CreateSourcePlanBundle {
292            item_id,
293            global_id,
294            mut plan,
295            resolved_ids,
296            available_source_references,
297        } in plans
298        {
299            let name = plan.name.clone();
300
301            match plan.source.data_source {
302                plan::DataSourceDesc::Ingestion(ref desc)
303                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304                    let cluster_id = plan
305                        .in_cluster
306                        .expect("ingestion plans must specify cluster");
307                    match desc.connection {
308                        GenericSourceConnection::Postgres(_)
309                        | GenericSourceConnection::MySql(_)
310                        | GenericSourceConnection::SqlServer(_)
311                        | GenericSourceConnection::Kafka(_)
312                        | GenericSourceConnection::LoadGenerator(_) => {
313                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315                                    .get(self.catalog().system_config().dyncfgs());
316
317                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318                                {
319                                    return Err(AdapterError::Unsupported(
320                                        "sources in clusters with >1 replicas",
321                                    ));
322                                }
323                            }
324                        }
325                    }
326                }
327                plan::DataSourceDesc::Webhook { .. } => {
328                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331                            .get(self.catalog().system_config().dyncfgs());
332
333                        if !enable_multi_replica_sources {
334                            if cluster.replica_ids().len() > 1 {
335                                return Err(AdapterError::Unsupported(
336                                    "webhook sources in clusters with >1 replicas",
337                                ));
338                            }
339                        }
340                    }
341                }
342                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343            }
344
345            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
346            if let mz_sql::plan::DataSourceDesc::Webhook {
347                validate_using: Some(validate),
348                ..
349            } = &mut plan.source.data_source
350            {
351                if let Err(reason) = validate.reduce_expression().await {
352                    self.metrics
353                        .webhook_validation_reduce_failures
354                        .with_label_values(&[reason])
355                        .inc();
356                    return Err(AdapterError::Internal(format!(
357                        "failed to reduce check expression, {reason}"
358                    )));
359                }
360            }
361
362            // If this source contained a set of available source references, update the
363            // source references catalog table.
364            let mut reference_ops = vec![];
365            if let Some(references) = &available_source_references {
366                reference_ops.push(catalog::Op::UpdateSourceReferences {
367                    source_id: item_id,
368                    references: references.clone().into(),
369                });
370            }
371
372            let source = Source::new(plan, global_id, resolved_ids, None, false);
373            ops.push(catalog::Op::CreateItem {
374                id: item_id,
375                name,
376                item: CatalogItem::Source(source.clone()),
377                owner_id: *session.current_role_id(),
378            });
379            sources.push((item_id, source));
380            // These operations must be executed after the source is added to the catalog.
381            ops.extend(reference_ops);
382        }
383
384        Ok(CreateSourceInner {
385            ops,
386            sources,
387            if_not_exists_ids,
388        })
389    }
390
391    /// Subsources are planned differently from other statements because they
392    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
393    /// Because of this, we have usually "missed" the opportunity to plan them
394    /// through the normal statement execution life cycle (the exception being
395    /// during bootstrapping).
396    ///
397    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
398    pub(crate) fn plan_subsource(
399        &self,
400        session: &Session,
401        params: &mz_sql::plan::Params,
402        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403        item_id: CatalogItemId,
404        global_id: GlobalId,
405    ) -> Result<CreateSourcePlanBundle, AdapterError> {
406        let catalog = self.catalog().for_session(session);
407        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409        let plan = self.plan_statement(
410            session,
411            Statement::CreateSubsource(subsource_stmt),
412            params,
413            &resolved_ids,
414        )?;
415        let plan = match plan {
416            Plan::CreateSource(plan) => plan,
417            _ => unreachable!(),
418        };
419        Ok(CreateSourcePlanBundle {
420            item_id,
421            global_id,
422            plan,
423            resolved_ids,
424            available_source_references: None,
425        })
426    }
427
428    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
429    pub(crate) async fn plan_purified_alter_source_add_subsource(
430        &mut self,
431        session: &Session,
432        params: Params,
433        source_name: ResolvedItemName,
434        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436    ) -> Result<(Plan, ResolvedIds), AdapterError> {
437        let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439        // Generate subsource statements
440        let conn_catalog = self.catalog().for_system_session();
441        let pcx = plan::PlanContext::zero();
442        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444        let entry = self.catalog().get_entry(source_name.item_id());
445        let source = entry.source().ok_or_else(|| {
446            AdapterError::internal(
447                "plan alter source",
448                format!("expected Source found {entry:?}"),
449            )
450        })?;
451
452        let item_id = entry.id();
453        let ingestion_id = source.global_id();
454        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456        let id_ts = self.get_catalog_write_ts().await;
457        let ids = self
458            .catalog()
459            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
460            .await?;
461        for (subsource_stmt, (item_id, global_id)) in
462            subsource_stmts.into_iter().zip_eq(ids.into_iter())
463        {
464            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
465            subsource_plans.push(s);
466        }
467
468        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
469            subsources: subsource_plans,
470            options,
471        };
472
473        Ok((
474            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
475                item_id,
476                ingestion_id,
477                action,
478            }),
479            ResolvedIds::empty(),
480        ))
481    }
482
483    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
484    pub(crate) fn plan_purified_alter_source_refresh_references(
485        &self,
486        _session: &Session,
487        _params: Params,
488        source_name: ResolvedItemName,
489        available_source_references: plan::SourceReferences,
490    ) -> Result<(Plan, ResolvedIds), AdapterError> {
491        let entry = self.catalog().get_entry(source_name.item_id());
492        let source = entry.source().ok_or_else(|| {
493            AdapterError::internal(
494                "plan alter source",
495                format!("expected Source found {entry:?}"),
496            )
497        })?;
498        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
499            references: available_source_references,
500        };
501
502        Ok((
503            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
504                item_id: entry.id(),
505                ingestion_id: source.global_id(),
506                action,
507            }),
508            ResolvedIds::empty(),
509        ))
510    }
511
512    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
513    /// the primary source, and any ingestion export subsources (e.g. PG
514    /// tables).
515    pub(crate) async fn plan_purified_create_source(
516        &mut self,
517        ctx: &ExecuteContext,
518        params: Params,
519        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
520        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
521        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
522        available_source_references: plan::SourceReferences,
523    ) -> Result<(Plan, ResolvedIds), AdapterError> {
524        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
525
526        // 1. First plan the progress subsource, if any.
527        if let Some(progress_stmt) = progress_stmt {
528            // The primary source depends on this subsource because the primary
529            // source needs to know its shard ID, and the easiest way of
530            // guaranteeing that the shard ID is discoverable is to create this
531            // collection first.
532            assert_none!(progress_stmt.of_source);
533            let id_ts = self.get_catalog_write_ts().await;
534            let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
535            let progress_plan =
536                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
537            let progress_full_name = self
538                .catalog()
539                .resolve_full_name(&progress_plan.plan.name, None);
540            let progress_subsource = ResolvedItemName::Item {
541                id: progress_plan.item_id,
542                qualifiers: progress_plan.plan.name.qualifiers.clone(),
543                full_name: progress_full_name,
544                print_id: true,
545                version: RelationVersionSelector::Latest,
546            };
547
548            create_source_plans.push(progress_plan);
549
550            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
551        }
552
553        let catalog = self.catalog().for_session(ctx.session());
554        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
555
556        let propagated_with_options: Vec<_> = source_stmt
557            .with_options
558            .iter()
559            .filter_map(|opt| match opt.name {
560                CreateSourceOptionName::TimestampInterval => None,
561                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
562                    name: CreateSubsourceOptionName::RetainHistory,
563                    value: opt.value.clone(),
564                }),
565            })
566            .collect();
567
568        // 2. Then plan the main source.
569        let source_plan = match self.plan_statement(
570            ctx.session(),
571            Statement::CreateSource(source_stmt),
572            &params,
573            &resolved_ids,
574        )? {
575            Plan::CreateSource(plan) => plan,
576            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
577        };
578
579        let id_ts = self.get_catalog_write_ts().await;
580        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
581
582        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
583        let of_source = ResolvedItemName::Item {
584            id: item_id,
585            qualifiers: source_plan.name.qualifiers.clone(),
586            full_name: source_full_name,
587            print_id: true,
588            version: RelationVersionSelector::Latest,
589        };
590
591        // Generate subsource statements
592        let conn_catalog = self.catalog().for_system_session();
593        let pcx = plan::PlanContext::zero();
594        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
595
596        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
597
598        for subsource_stmt in subsource_stmts.iter_mut() {
599            subsource_stmt
600                .with_options
601                .extend(propagated_with_options.iter().cloned())
602        }
603
604        create_source_plans.push(CreateSourcePlanBundle {
605            item_id,
606            global_id,
607            plan: source_plan,
608            resolved_ids: resolved_ids.clone(),
609            available_source_references: Some(available_source_references),
610        });
611
612        // 3. Finally, plan all the subsources
613        let id_ts = self.get_catalog_write_ts().await;
614        let ids = self
615            .catalog()
616            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
617            .await?;
618        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
619            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
620            create_source_plans.push(plan);
621        }
622
623        Ok((
624            Plan::CreateSources(create_source_plans),
625            ResolvedIds::empty(),
626        ))
627    }
628
629    #[instrument]
630    pub(super) async fn sequence_create_source(
631        &mut self,
632        ctx: &mut ExecuteContext,
633        plans: Vec<plan::CreateSourcePlanBundle>,
634    ) -> Result<ExecuteResponse, AdapterError> {
635        let CreateSourceInner {
636            ops,
637            sources,
638            if_not_exists_ids,
639        } = self.create_source_inner(ctx.session(), plans).await?;
640
641        let transact_result = self
642            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
643            .await;
644
645        // Check if any sources are webhook sources and report them as created.
646        for (item_id, source) in &sources {
647            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
648                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
649                    ctx.session()
650                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
651                }
652            }
653        }
654
655        match transact_result {
656            Ok(()) => Ok(ExecuteResponse::CreatedSource),
657            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
658                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
659            })) if if_not_exists_ids.contains_key(&id) => {
660                ctx.session()
661                    .add_notice(AdapterNotice::ObjectAlreadyExists {
662                        name: if_not_exists_ids[&id].item.clone(),
663                        ty: "source",
664                    });
665                Ok(ExecuteResponse::CreatedSource)
666            }
667            Err(err) => Err(err),
668        }
669    }
670
671    #[instrument]
672    pub(super) async fn sequence_create_connection(
673        &mut self,
674        mut ctx: ExecuteContext,
675        plan: plan::CreateConnectionPlan,
676        resolved_ids: ResolvedIds,
677    ) {
678        let id_ts = self.get_catalog_write_ts().await;
679        let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
680            Ok(item_id) => item_id,
681            Err(err) => return ctx.retire(Err(err.into())),
682        };
683
684        match &plan.connection.details {
685            ConnectionDetails::Ssh { key_1, key_2, .. } => {
686                let key_1 = match key_1.as_key_pair() {
687                    Some(key_1) => key_1.clone(),
688                    None => {
689                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
690                            "the PUBLIC KEY 1 option cannot be explicitly specified"
691                        ))));
692                    }
693                };
694
695                let key_2 = match key_2.as_key_pair() {
696                    Some(key_2) => key_2.clone(),
697                    None => {
698                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
699                            "the PUBLIC KEY 2 option cannot be explicitly specified"
700                        ))));
701                    }
702                };
703
704                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
705                let secret = key_set.to_bytes();
706                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
707                    return ctx.retire(Err(err.into()));
708                }
709            }
710            _ => (),
711        };
712
713        if plan.validate {
714            let internal_cmd_tx = self.internal_cmd_tx.clone();
715            let transient_revision = self.catalog().transient_revision();
716            let conn_id = ctx.session().conn_id().clone();
717            let otel_ctx = OpenTelemetryContext::obtain();
718            let role_metadata = ctx.session().role_metadata().clone();
719
720            let connection = plan
721                .connection
722                .details
723                .to_connection()
724                .into_inline_connection(self.catalog().state());
725
726            let current_storage_parameters = self.controller.storage.config().clone();
727            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
728                let result = match connection
729                    .validate(connection_id, &current_storage_parameters)
730                    .await
731                {
732                    Ok(()) => Ok(plan),
733                    Err(err) => Err(err.into()),
734                };
735
736                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
737                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
738                    CreateConnectionValidationReady {
739                        ctx,
740                        result,
741                        connection_id,
742                        connection_gid,
743                        plan_validity: PlanValidity::new(
744                            transient_revision,
745                            resolved_ids.items().copied().collect(),
746                            None,
747                            None,
748                            role_metadata,
749                        ),
750                        otel_ctx,
751                        resolved_ids: resolved_ids.clone(),
752                    },
753                ));
754                if let Err(e) = result {
755                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
756                }
757            });
758        } else {
759            let result = self
760                .sequence_create_connection_stage_finish(
761                    &mut ctx,
762                    connection_id,
763                    connection_gid,
764                    plan,
765                    resolved_ids,
766                )
767                .await;
768            ctx.retire(result);
769        }
770    }
771
772    #[instrument]
773    pub(crate) async fn sequence_create_connection_stage_finish(
774        &mut self,
775        ctx: &mut ExecuteContext,
776        connection_id: CatalogItemId,
777        connection_gid: GlobalId,
778        plan: plan::CreateConnectionPlan,
779        resolved_ids: ResolvedIds,
780    ) -> Result<ExecuteResponse, AdapterError> {
781        let ops = vec![catalog::Op::CreateItem {
782            id: connection_id,
783            name: plan.name.clone(),
784            item: CatalogItem::Connection(Connection {
785                create_sql: plan.connection.create_sql,
786                global_id: connection_gid,
787                details: plan.connection.details.clone(),
788                resolved_ids,
789            }),
790            owner_id: *ctx.session().current_role_id(),
791        }];
792
793        // VPC endpoint creation for AWS PrivateLink connections is now handled
794        // in apply_catalog_implications.
795        let conn_id = ctx.session().conn_id().clone();
796        let transact_result = self
797            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
798            .await;
799
800        match transact_result {
801            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
802            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
803                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
804            })) if plan.if_not_exists => {
805                ctx.session()
806                    .add_notice(AdapterNotice::ObjectAlreadyExists {
807                        name: plan.name.item,
808                        ty: "connection",
809                    });
810                Ok(ExecuteResponse::CreatedConnection)
811            }
812            Err(err) => Err(err),
813        }
814    }
815
816    #[instrument]
817    pub(super) async fn sequence_create_database(
818        &mut self,
819        session: &Session,
820        plan: plan::CreateDatabasePlan,
821    ) -> Result<ExecuteResponse, AdapterError> {
822        let ops = vec![catalog::Op::CreateDatabase {
823            name: plan.name.clone(),
824            owner_id: *session.current_role_id(),
825        }];
826        match self.catalog_transact(Some(session), ops).await {
827            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
828            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
829                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
830            })) if plan.if_not_exists => {
831                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
832                Ok(ExecuteResponse::CreatedDatabase)
833            }
834            Err(err) => Err(err),
835        }
836    }
837
838    #[instrument]
839    pub(super) async fn sequence_create_schema(
840        &mut self,
841        session: &Session,
842        plan: plan::CreateSchemaPlan,
843    ) -> Result<ExecuteResponse, AdapterError> {
844        let op = catalog::Op::CreateSchema {
845            database_id: plan.database_spec,
846            schema_name: plan.schema_name.clone(),
847            owner_id: *session.current_role_id(),
848        };
849        match self.catalog_transact(Some(session), vec![op]).await {
850            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
851            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
852                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853            })) if plan.if_not_exists => {
854                session.add_notice(AdapterNotice::SchemaAlreadyExists {
855                    name: plan.schema_name,
856                });
857                Ok(ExecuteResponse::CreatedSchema)
858            }
859            Err(err) => Err(err),
860        }
861    }
862
863    /// Validates the role attributes for a `CREATE ROLE` statement.
864    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866            if attributes.superuser.is_some()
867                || attributes.password.is_some()
868                || attributes.login.is_some()
869            {
870                return Err(AdapterError::UnavailableFeature {
871                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873                });
874            }
875        }
876        Ok(())
877    }
878
879    #[instrument]
880    pub(super) async fn sequence_create_role(
881        &mut self,
882        conn_id: Option<&ConnectionId>,
883        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884    ) -> Result<ExecuteResponse, AdapterError> {
885        self.validate_role_attributes(&attributes.clone())?;
886        let op = catalog::Op::CreateRole { name, attributes };
887        self.catalog_transact_with_context(conn_id, None, vec![op])
888            .await
889            .map(|_| ExecuteResponse::CreatedRole)
890    }
891
892    #[instrument]
893    pub(super) async fn sequence_create_network_policy(
894        &mut self,
895        session: &Session,
896        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897    ) -> Result<ExecuteResponse, AdapterError> {
898        let op = catalog::Op::CreateNetworkPolicy {
899            rules,
900            name,
901            owner_id: *session.current_role_id(),
902        };
903        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904            .await
905            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906    }
907
908    #[instrument]
909    pub(super) async fn sequence_alter_network_policy(
910        &mut self,
911        session: &Session,
912        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913    ) -> Result<ExecuteResponse, AdapterError> {
914        // TODO(network_policy): Consider role based network policies here.
915        let current_network_policy_name =
916            self.catalog().system_config().default_network_policy_name();
917        // Check if the way we're alerting the policy is still valid for the current connection.
918        if current_network_policy_name == name {
919            self.validate_alter_network_policy(session, &rules)?;
920        }
921
922        let op = catalog::Op::AlterNetworkPolicy {
923            id,
924            rules,
925            name,
926            owner_id: *session.current_role_id(),
927        };
928        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929            .await
930            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931    }
932
933    #[instrument]
934    pub(super) async fn sequence_create_table(
935        &mut self,
936        ctx: &mut ExecuteContext,
937        plan: plan::CreateTablePlan,
938        resolved_ids: ResolvedIds,
939    ) -> Result<ExecuteResponse, AdapterError> {
940        let plan::CreateTablePlan {
941            name,
942            table,
943            if_not_exists,
944        } = plan;
945
946        let conn_id = if table.temporary {
947            Some(ctx.session().conn_id())
948        } else {
949            None
950        };
951        let id_ts = self.get_catalog_write_ts().await;
952        let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955        let data_source = match table.data_source {
956            plan::TableDataSource::TableWrites { defaults } => {
957                TableDataSource::TableWrites { defaults }
958            }
959            plan::TableDataSource::DataSource {
960                desc: data_source_plan,
961                timeline,
962            } => match data_source_plan {
963                plan::DataSourceDesc::IngestionExport {
964                    ingestion_id,
965                    external_reference,
966                    details,
967                    data_config,
968                } => TableDataSource::DataSource {
969                    desc: DataSourceDesc::IngestionExport {
970                        ingestion_id,
971                        external_reference,
972                        details,
973                        data_config,
974                    },
975                    timeline,
976                },
977                plan::DataSourceDesc::Webhook {
978                    validate_using,
979                    body_format,
980                    headers,
981                    cluster_id,
982                } => TableDataSource::DataSource {
983                    desc: DataSourceDesc::Webhook {
984                        validate_using,
985                        body_format,
986                        headers,
987                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988                    },
989                    timeline,
990                },
991                o => {
992                    unreachable!("CREATE TABLE data source got {:?}", o)
993                }
994            },
995        };
996
997        let is_webhook = if let TableDataSource::DataSource {
998            desc: DataSourceDesc::Webhook { .. },
999            timeline: _,
1000        } = &data_source
1001        {
1002            true
1003        } else {
1004            false
1005        };
1006
1007        let table = Table {
1008            create_sql: Some(table.create_sql),
1009            desc: table.desc,
1010            collections,
1011            conn_id: conn_id.cloned(),
1012            resolved_ids,
1013            custom_logical_compaction_window: table.compaction_window,
1014            is_retained_metrics_object: false,
1015            data_source,
1016        };
1017        let ops = vec![catalog::Op::CreateItem {
1018            id: table_id,
1019            name: name.clone(),
1020            item: CatalogItem::Table(table.clone()),
1021            owner_id: *ctx.session().current_role_id(),
1022        }];
1023
1024        let catalog_result = self
1025            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026            .await;
1027
1028        if is_webhook {
1029            // try_get_webhook_url will make up a URL for things that are not
1030            // webhooks, so we guard against that here.
1031            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032                ctx.session()
1033                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034            }
1035        }
1036
1037        match catalog_result {
1038            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1041            })) if if_not_exists => {
1042                ctx.session_mut()
1043                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1044                        name: name.item,
1045                        ty: "table",
1046                    });
1047                Ok(ExecuteResponse::CreatedTable)
1048            }
1049            Err(err) => Err(err),
1050        }
1051    }
1052
1053    #[instrument]
1054    pub(super) async fn sequence_create_sink(
1055        &mut self,
1056        ctx: ExecuteContext,
1057        plan: plan::CreateSinkPlan,
1058        resolved_ids: ResolvedIds,
1059    ) {
1060        let plan::CreateSinkPlan {
1061            name,
1062            sink,
1063            with_snapshot,
1064            if_not_exists,
1065            in_cluster,
1066        } = plan;
1067
1068        // First try to allocate an ID and an OID. If either fails, we're done.
1069        let id_ts = self.get_catalog_write_ts().await;
1070        let (item_id, global_id) =
1071            return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1072
1073        let catalog_sink = Sink {
1074            create_sql: sink.create_sql,
1075            global_id,
1076            from: sink.from,
1077            connection: sink.connection,
1078            envelope: sink.envelope,
1079            version: sink.version,
1080            with_snapshot,
1081            resolved_ids,
1082            cluster_id: in_cluster,
1083            commit_interval: sink.commit_interval,
1084        };
1085
1086        let ops = vec![catalog::Op::CreateItem {
1087            id: item_id,
1088            name: name.clone(),
1089            item: CatalogItem::Sink(catalog_sink.clone()),
1090            owner_id: *ctx.session().current_role_id(),
1091        }];
1092
1093        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1094
1095        match result {
1096            Ok(()) => {}
1097            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1098                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1099            })) if if_not_exists => {
1100                ctx.session()
1101                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1102                        name: name.item,
1103                        ty: "sink",
1104                    });
1105                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1106                return;
1107            }
1108            Err(e) => {
1109                ctx.retire(Err(e));
1110                return;
1111            }
1112        };
1113
1114        self.create_storage_export(global_id, &catalog_sink)
1115            .await
1116            .unwrap_or_terminate("cannot fail to create exports");
1117
1118        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1119            .await;
1120
1121        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1122    }
1123
1124    /// Validates that a view definition does not contain any expressions that may lead to
1125    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1126    ///
1127    /// We prevent these expressions so that we can add columns to system tables without
1128    /// changing the definition of the view.
1129    ///
1130    /// Here is a bit of a hand wavy proof as to why we only need to check the
1131    /// immediate view definition for system objects and ambiguous column
1132    /// references, and not the entire dependency tree:
1133    ///
1134    ///   - A view with no object references cannot have any ambiguous column
1135    ///   references to a system object, because it has no system objects.
1136    ///   - A view with a direct reference to a system object and a * or
1137    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1138    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1139    ///   any ambiguous column references to a system object, because all column
1140    ///   references are explicitly named.
1141    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1142    ///   system object cannot have any ambiguous column references to a system
1143    ///   object, because there are no system objects in the top level view and
1144    ///   all sub-views are guaranteed to have no ambiguous column references to
1145    ///   system objects.
1146    pub(super) fn validate_system_column_references(
1147        &self,
1148        uses_ambiguous_columns: bool,
1149        depends_on: &BTreeSet<GlobalId>,
1150    ) -> Result<(), AdapterError> {
1151        if uses_ambiguous_columns
1152            && depends_on
1153                .iter()
1154                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1155        {
1156            Err(AdapterError::AmbiguousSystemColumnReference)
1157        } else {
1158            Ok(())
1159        }
1160    }
1161
1162    #[instrument]
1163    pub(super) async fn sequence_create_type(
1164        &mut self,
1165        session: &Session,
1166        plan: plan::CreateTypePlan,
1167        resolved_ids: ResolvedIds,
1168    ) -> Result<ExecuteResponse, AdapterError> {
1169        let id_ts = self.get_catalog_write_ts().await;
1170        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1171        // Validate the type definition (e.g., composite columns) before storing.
1172        plan.typ
1173            .inner
1174            .desc(&self.catalog().for_session(session))
1175            .map_err(AdapterError::from)?;
1176        let typ = Type {
1177            create_sql: Some(plan.typ.create_sql),
1178            global_id,
1179            details: CatalogTypeDetails {
1180                array_id: None,
1181                typ: plan.typ.inner,
1182                pg_metadata: None,
1183            },
1184            resolved_ids,
1185        };
1186        let op = catalog::Op::CreateItem {
1187            id: item_id,
1188            name: plan.name,
1189            item: CatalogItem::Type(typ),
1190            owner_id: *session.current_role_id(),
1191        };
1192        match self.catalog_transact(Some(session), vec![op]).await {
1193            Ok(()) => Ok(ExecuteResponse::CreatedType),
1194            Err(err) => Err(err),
1195        }
1196    }
1197
1198    #[instrument]
1199    pub(super) async fn sequence_comment_on(
1200        &mut self,
1201        session: &Session,
1202        plan: plan::CommentPlan,
1203    ) -> Result<ExecuteResponse, AdapterError> {
1204        let op = catalog::Op::Comment {
1205            object_id: plan.object_id,
1206            sub_component: plan.sub_component,
1207            comment: plan.comment,
1208        };
1209        self.catalog_transact(Some(session), vec![op]).await?;
1210        Ok(ExecuteResponse::Comment)
1211    }
1212
1213    #[instrument]
1214    pub(super) async fn sequence_drop_objects(
1215        &mut self,
1216        ctx: &mut ExecuteContext,
1217        plan::DropObjectsPlan {
1218            drop_ids,
1219            object_type,
1220            referenced_ids,
1221        }: plan::DropObjectsPlan,
1222    ) -> Result<ExecuteResponse, AdapterError> {
1223        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1224        let mut objects = Vec::new();
1225        for obj_id in &drop_ids {
1226            if !referenced_ids_hashset.contains(obj_id) {
1227                let object_info = ErrorMessageObjectDescription::from_id(
1228                    obj_id,
1229                    &self.catalog().for_session(ctx.session()),
1230                )
1231                .to_string();
1232                objects.push(object_info);
1233            }
1234        }
1235
1236        if !objects.is_empty() {
1237            ctx.session()
1238                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1239        }
1240
1241        // Collect GlobalIds for expression cache invalidation.
1242        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1243            .iter()
1244            .filter_map(|id| match id {
1245                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1246                _ => None,
1247            })
1248            .flatten()
1249            .collect();
1250
1251        let DropOps {
1252            ops,
1253            dropped_active_db,
1254            dropped_active_cluster,
1255            dropped_in_use_indexes,
1256        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1257
1258        self.catalog_transact_with_context(None, Some(ctx), ops)
1259            .await?;
1260
1261        // Invalidate expression cache entries for dropped objects.
1262        if !expr_cache_invalidate_ids.is_empty() {
1263            let _fut = self.catalog().update_expression_cache(
1264                Default::default(),
1265                Default::default(),
1266                expr_cache_invalidate_ids,
1267            );
1268        }
1269
1270        fail::fail_point!("after_sequencer_drop_replica");
1271
1272        if dropped_active_db {
1273            ctx.session()
1274                .add_notice(AdapterNotice::DroppedActiveDatabase {
1275                    name: ctx.session().vars().database().to_string(),
1276                });
1277        }
1278        if dropped_active_cluster {
1279            ctx.session()
1280                .add_notice(AdapterNotice::DroppedActiveCluster {
1281                    name: ctx.session().vars().cluster().to_string(),
1282                });
1283        }
1284        for dropped_in_use_index in dropped_in_use_indexes {
1285            ctx.session()
1286                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1287            self.metrics
1288                .optimization_notices
1289                .with_label_values(&["DroppedInUseIndex"])
1290                .inc_by(1);
1291        }
1292        Ok(ExecuteResponse::DroppedObject(object_type))
1293    }
1294
1295    fn validate_dropped_role_ownership(
1296        &self,
1297        session: &Session,
1298        dropped_roles: &BTreeMap<RoleId, &str>,
1299    ) -> Result<(), AdapterError> {
1300        fn privilege_check(
1301            privileges: &PrivilegeMap,
1302            dropped_roles: &BTreeMap<RoleId, &str>,
1303            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1304            object_id: &SystemObjectId,
1305            catalog: &ConnCatalog,
1306        ) {
1307            for privilege in privileges.all_values() {
1308                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1309                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1310                    let object_description =
1311                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1312                    dependent_objects
1313                        .entry(role_name.to_string())
1314                        .or_default()
1315                        .push(format!(
1316                            "privileges on {object_description} granted by {grantor_name}",
1317                        ));
1318                }
1319                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1320                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1321                    let object_description =
1322                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1323                    dependent_objects
1324                        .entry(role_name.to_string())
1325                        .or_default()
1326                        .push(format!(
1327                            "privileges granted on {object_description} to {grantee_name}"
1328                        ));
1329                }
1330            }
1331        }
1332
1333        let catalog = self.catalog().for_session(session);
1334        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1335        for entry in self.catalog.entries() {
1336            let id = SystemObjectId::Object(entry.id().into());
1337            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1338                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1339                dependent_objects
1340                    .entry(role_name.to_string())
1341                    .or_default()
1342                    .push(format!("owner of {object_description}"));
1343            }
1344            privilege_check(
1345                entry.privileges(),
1346                dropped_roles,
1347                &mut dependent_objects,
1348                &id,
1349                &catalog,
1350            );
1351        }
1352        for database in self.catalog.databases() {
1353            let database_id = SystemObjectId::Object(database.id().into());
1354            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1355                let object_description =
1356                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1357                dependent_objects
1358                    .entry(role_name.to_string())
1359                    .or_default()
1360                    .push(format!("owner of {object_description}"));
1361            }
1362            privilege_check(
1363                &database.privileges,
1364                dropped_roles,
1365                &mut dependent_objects,
1366                &database_id,
1367                &catalog,
1368            );
1369            for schema in database.schemas_by_id.values() {
1370                let schema_id = SystemObjectId::Object(
1371                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1372                );
1373                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1374                    let object_description =
1375                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1376                    dependent_objects
1377                        .entry(role_name.to_string())
1378                        .or_default()
1379                        .push(format!("owner of {object_description}"));
1380                }
1381                privilege_check(
1382                    &schema.privileges,
1383                    dropped_roles,
1384                    &mut dependent_objects,
1385                    &schema_id,
1386                    &catalog,
1387                );
1388            }
1389        }
1390        for cluster in self.catalog.clusters() {
1391            let cluster_id = SystemObjectId::Object(cluster.id().into());
1392            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1393                let object_description =
1394                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1395                dependent_objects
1396                    .entry(role_name.to_string())
1397                    .or_default()
1398                    .push(format!("owner of {object_description}"));
1399            }
1400            privilege_check(
1401                &cluster.privileges,
1402                dropped_roles,
1403                &mut dependent_objects,
1404                &cluster_id,
1405                &catalog,
1406            );
1407            for replica in cluster.replicas() {
1408                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1409                    let replica_id =
1410                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1411                    let object_description =
1412                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1413                    dependent_objects
1414                        .entry(role_name.to_string())
1415                        .or_default()
1416                        .push(format!("owner of {object_description}"));
1417                }
1418            }
1419        }
1420        privilege_check(
1421            self.catalog().system_privileges(),
1422            dropped_roles,
1423            &mut dependent_objects,
1424            &SystemObjectId::System,
1425            &catalog,
1426        );
1427        for (default_privilege_object, default_privilege_acl_items) in
1428            self.catalog.default_privileges()
1429        {
1430            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1431                dependent_objects
1432                    .entry(role_name.to_string())
1433                    .or_default()
1434                    .push(format!(
1435                        "default privileges on {}S created by {}",
1436                        default_privilege_object.object_type, role_name
1437                    ));
1438            }
1439            for default_privilege_acl_item in default_privilege_acl_items {
1440                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1441                    dependent_objects
1442                        .entry(role_name.to_string())
1443                        .or_default()
1444                        .push(format!(
1445                            "default privileges on {}S granted to {}",
1446                            default_privilege_object.object_type, role_name
1447                        ));
1448                }
1449            }
1450        }
1451
1452        if !dependent_objects.is_empty() {
1453            Err(AdapterError::DependentObject(dependent_objects))
1454        } else {
1455            Ok(())
1456        }
1457    }
1458
1459    #[instrument]
1460    pub(super) async fn sequence_drop_owned(
1461        &mut self,
1462        session: &Session,
1463        plan: plan::DropOwnedPlan,
1464    ) -> Result<ExecuteResponse, AdapterError> {
1465        for role_id in &plan.role_ids {
1466            self.catalog().ensure_not_reserved_role(role_id)?;
1467        }
1468
1469        let mut privilege_revokes = plan.privilege_revokes;
1470
1471        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1472        let session_catalog = self.catalog().for_session(session);
1473        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1474            && !session.is_superuser()
1475        {
1476            // Obtain all roles that the current session is a member of.
1477            let role_membership =
1478                session_catalog.collect_role_membership(session.current_role_id());
1479            let invalid_revokes: BTreeSet<_> = privilege_revokes
1480                .extract_if(.., |(_, privilege)| {
1481                    !role_membership.contains(&privilege.grantor)
1482                })
1483                .map(|(object_id, _)| object_id)
1484                .collect();
1485            for invalid_revoke in invalid_revokes {
1486                let object_description =
1487                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1488                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1489            }
1490        }
1491
1492        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1493            catalog::Op::UpdatePrivilege {
1494                target_id: object_id,
1495                privilege,
1496                variant: UpdatePrivilegeVariant::Revoke,
1497            }
1498        });
1499        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1500            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1501                privilege_object,
1502                privilege_acl_item,
1503                variant: UpdatePrivilegeVariant::Revoke,
1504            },
1505        );
1506        let DropOps {
1507            ops: drop_ops,
1508            dropped_active_db,
1509            dropped_active_cluster,
1510            dropped_in_use_indexes,
1511        } = self.sequence_drop_common(session, plan.drop_ids)?;
1512
1513        let ops = privilege_revoke_ops
1514            .chain(default_privilege_revoke_ops)
1515            .chain(drop_ops.into_iter())
1516            .collect();
1517
1518        self.catalog_transact(Some(session), ops).await?;
1519
1520        if dropped_active_db {
1521            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1522                name: session.vars().database().to_string(),
1523            });
1524        }
1525        if dropped_active_cluster {
1526            session.add_notice(AdapterNotice::DroppedActiveCluster {
1527                name: session.vars().cluster().to_string(),
1528            });
1529        }
1530        for dropped_in_use_index in dropped_in_use_indexes {
1531            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1532        }
1533        Ok(ExecuteResponse::DroppedOwned)
1534    }
1535
1536    fn sequence_drop_common(
1537        &self,
1538        session: &Session,
1539        ids: Vec<ObjectId>,
1540    ) -> Result<DropOps, AdapterError> {
1541        let mut dropped_active_db = false;
1542        let mut dropped_active_cluster = false;
1543        let mut dropped_in_use_indexes = Vec::new();
1544        let mut dropped_roles = BTreeMap::new();
1545        let mut dropped_databases = BTreeSet::new();
1546        let mut dropped_schemas = BTreeSet::new();
1547        // Dropping either the group role or the member role of a role membership will trigger a
1548        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1549        // role membership twice.
1550        let mut role_revokes = BTreeSet::new();
1551        // Dropping a database or a schema will revoke all default roles associated with that
1552        // database or schema.
1553        let mut default_privilege_revokes = BTreeSet::new();
1554
1555        // Clusters we're dropping
1556        let mut clusters_to_drop = BTreeSet::new();
1557
1558        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1559        for id in &ids {
1560            match id {
1561                ObjectId::Database(id) => {
1562                    let name = self.catalog().get_database(id).name();
1563                    if name == session.vars().database() {
1564                        dropped_active_db = true;
1565                    }
1566                    dropped_databases.insert(id);
1567                }
1568                ObjectId::Schema((_, spec)) => {
1569                    if let SchemaSpecifier::Id(id) = spec {
1570                        dropped_schemas.insert(id);
1571                    }
1572                }
1573                ObjectId::Cluster(id) => {
1574                    clusters_to_drop.insert(*id);
1575                    if let Some(active_id) = self
1576                        .catalog()
1577                        .active_cluster(session)
1578                        .ok()
1579                        .map(|cluster| cluster.id())
1580                    {
1581                        if id == &active_id {
1582                            dropped_active_cluster = true;
1583                        }
1584                    }
1585                }
1586                ObjectId::Role(id) => {
1587                    let role = self.catalog().get_role(id);
1588                    let name = role.name();
1589                    dropped_roles.insert(*id, name);
1590                    // We must revoke all role memberships that the dropped roles belongs to.
1591                    for (group_id, grantor_id) in &role.membership.map {
1592                        role_revokes.insert((*group_id, *id, *grantor_id));
1593                    }
1594                }
1595                ObjectId::Item(id) => {
1596                    if let Some(index) = self.catalog().get_entry(id).index() {
1597                        let humanizer = self.catalog().for_session(session);
1598                        let dependants = self
1599                            .controller
1600                            .compute
1601                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1602                            .ok()
1603                            .into_iter()
1604                            .flatten()
1605                            .filter(|dependant_id| {
1606                                // Transient Ids belong to Peeks. We are not interested for now in
1607                                // peeks depending on a dropped index.
1608                                // TODO: show a different notice in this case. Something like
1609                                // "There is an in-progress ad hoc SELECT that uses the dropped
1610                                // index. The resources used by the index will be freed when all
1611                                // such SELECTs complete."
1612                                if dependant_id.is_transient() {
1613                                    return false;
1614                                }
1615                                // The item should exist, but don't panic if it doesn't.
1616                                let Some(dependent_id) = humanizer
1617                                    .try_get_item_by_global_id(dependant_id)
1618                                    .map(|item| item.id())
1619                                else {
1620                                    return false;
1621                                };
1622                                // If the dependent object is also being dropped, then there is no
1623                                // problem, so we don't want a notice.
1624                                !ids_set.contains(&ObjectId::Item(dependent_id))
1625                            })
1626                            .flat_map(|dependant_id| {
1627                                // If we are not able to find a name for this ID it probably means
1628                                // we have already dropped the compute collection, in which case we
1629                                // can ignore it.
1630                                humanizer.humanize_id(dependant_id)
1631                            })
1632                            .collect_vec();
1633                        if !dependants.is_empty() {
1634                            dropped_in_use_indexes.push(DroppedInUseIndex {
1635                                index_name: humanizer
1636                                    .humanize_id(index.global_id())
1637                                    .unwrap_or_else(|| id.to_string()),
1638                                dependant_objects: dependants,
1639                            });
1640                        }
1641                    }
1642                }
1643                _ => {}
1644            }
1645        }
1646
1647        for id in &ids {
1648            match id {
1649                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1650                // unless they are internal replicas, which exist outside the scope
1651                // of managed clusters.
1652                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1653                    if !clusters_to_drop.contains(cluster_id) {
1654                        let cluster = self.catalog.get_cluster(*cluster_id);
1655                        if cluster.is_managed() {
1656                            let replica =
1657                                cluster.replica(*replica_id).expect("Catalog out of sync");
1658                            if !replica.config.location.internal() {
1659                                coord_bail!("cannot drop replica of managed cluster");
1660                            }
1661                        }
1662                    }
1663                }
1664                _ => {}
1665            }
1666        }
1667
1668        for role_id in dropped_roles.keys() {
1669            self.catalog().ensure_not_reserved_role(role_id)?;
1670        }
1671        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1672        // If any role is a member of a dropped role, then we must revoke that membership.
1673        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1674        for role in self.catalog().user_roles() {
1675            for dropped_role_id in
1676                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1677            {
1678                role_revokes.insert((
1679                    **dropped_role_id,
1680                    role.id(),
1681                    *role
1682                        .membership
1683                        .map
1684                        .get(*dropped_role_id)
1685                        .expect("included in keys above"),
1686                ));
1687            }
1688        }
1689
1690        for (default_privilege_object, default_privilege_acls) in
1691            self.catalog().default_privileges()
1692        {
1693            if matches!(
1694                &default_privilege_object.database_id,
1695                Some(database_id) if dropped_databases.contains(database_id),
1696            ) || matches!(
1697                &default_privilege_object.schema_id,
1698                Some(schema_id) if dropped_schemas.contains(schema_id),
1699            ) {
1700                for default_privilege_acl in default_privilege_acls {
1701                    default_privilege_revokes.insert((
1702                        default_privilege_object.clone(),
1703                        default_privilege_acl.clone(),
1704                    ));
1705                }
1706            }
1707        }
1708
1709        let ops = role_revokes
1710            .into_iter()
1711            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1712                role_id,
1713                member_id,
1714                grantor_id,
1715            })
1716            .chain(default_privilege_revokes.into_iter().map(
1717                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1718                    privilege_object,
1719                    privilege_acl_item,
1720                    variant: UpdatePrivilegeVariant::Revoke,
1721                },
1722            ))
1723            .chain(iter::once(catalog::Op::DropObjects(
1724                ids.into_iter()
1725                    .map(DropObjectInfo::manual_drop_from_object_id)
1726                    .collect(),
1727            )))
1728            .collect();
1729
1730        Ok(DropOps {
1731            ops,
1732            dropped_active_db,
1733            dropped_active_cluster,
1734            dropped_in_use_indexes,
1735        })
1736    }
1737
1738    pub(super) fn sequence_explain_schema(
1739        &self,
1740        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1741    ) -> Result<ExecuteResponse, AdapterError> {
1742        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1743            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1744        })?;
1745
1746        let json_string = json_string(&json_value);
1747        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1748        Ok(Self::send_immediate_rows(row))
1749    }
1750
1751    pub(super) fn sequence_show_all_variables(
1752        &self,
1753        session: &Session,
1754    ) -> Result<ExecuteResponse, AdapterError> {
1755        let mut rows = viewable_variables(self.catalog().state(), session)
1756            .map(|v| (v.name(), v.value(), v.description()))
1757            .collect::<Vec<_>>();
1758        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1759
1760        // TODO(parkmycar): Pack all of these into a single RowCollection.
1761        let rows: Vec<_> = rows
1762            .into_iter()
1763            .map(|(name, val, desc)| {
1764                Row::pack_slice(&[
1765                    Datum::String(name),
1766                    Datum::String(&val),
1767                    Datum::String(desc),
1768                ])
1769            })
1770            .collect();
1771        Ok(Self::send_immediate_rows(rows))
1772    }
1773
1774    pub(super) fn sequence_show_variable(
1775        &self,
1776        session: &Session,
1777        plan: plan::ShowVariablePlan,
1778    ) -> Result<ExecuteResponse, AdapterError> {
1779        if &plan.name == SCHEMA_ALIAS {
1780            let schemas = self.catalog.resolve_search_path(session);
1781            let schema = schemas.first();
1782            return match schema {
1783                Some((database_spec, schema_spec)) => {
1784                    let schema_name = &self
1785                        .catalog
1786                        .get_schema(database_spec, schema_spec, session.conn_id())
1787                        .name()
1788                        .schema;
1789                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1790                    Ok(Self::send_immediate_rows(row))
1791                }
1792                None => {
1793                    if session.vars().current_object_missing_warnings() {
1794                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1795                            search_path: session
1796                                .vars()
1797                                .search_path()
1798                                .into_iter()
1799                                .map(|schema| schema.to_string())
1800                                .collect(),
1801                        });
1802                    }
1803                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1804                }
1805            };
1806        }
1807
1808        let variable = session
1809            .vars()
1810            .get(self.catalog().system_config(), &plan.name)
1811            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1812
1813        // In lieu of plumbing the user to all system config functions, just check that the var is
1814        // visible.
1815        variable.visible(session.user(), self.catalog().system_config())?;
1816
1817        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1818        if variable.name() == vars::DATABASE.name()
1819            && matches!(
1820                self.catalog().resolve_database(&variable.value()),
1821                Err(CatalogError::UnknownDatabase(_))
1822            )
1823            && session.vars().current_object_missing_warnings()
1824        {
1825            let name = variable.value();
1826            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1827        } else if variable.name() == vars::CLUSTER.name()
1828            && matches!(
1829                self.catalog().resolve_cluster(&variable.value()),
1830                Err(CatalogError::UnknownCluster(_))
1831            )
1832            && session.vars().current_object_missing_warnings()
1833        {
1834            let name = variable.value();
1835            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1836        }
1837        Ok(Self::send_immediate_rows(row))
1838    }
1839
1840    #[instrument]
1841    pub(super) async fn sequence_inspect_shard(
1842        &self,
1843        session: &Session,
1844        plan: plan::InspectShardPlan,
1845    ) -> Result<ExecuteResponse, AdapterError> {
1846        // TODO: Not thrilled about this rbac special case here, but probably
1847        // sufficient for now.
1848        if !session.user().is_internal() {
1849            return Err(AdapterError::Unauthorized(
1850                rbac::UnauthorizedError::MzSystem {
1851                    action: "inspect".into(),
1852                },
1853            ));
1854        }
1855        let state = self
1856            .controller
1857            .storage
1858            .inspect_persist_state(plan.id)
1859            .await?;
1860        let jsonb = Jsonb::from_serde_json(state)?;
1861        Ok(Self::send_immediate_rows(jsonb.into_row()))
1862    }
1863
1864    #[instrument]
1865    pub(super) fn sequence_set_variable(
1866        &self,
1867        session: &mut Session,
1868        plan: plan::SetVariablePlan,
1869    ) -> Result<ExecuteResponse, AdapterError> {
1870        let (name, local) = (plan.name, plan.local);
1871        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1872            self.validate_set_isolation_level(session)?;
1873        }
1874        if &name == vars::CLUSTER.name() {
1875            self.validate_set_cluster(session)?;
1876        }
1877
1878        let vars = session.vars_mut();
1879        let values = match plan.value {
1880            plan::VariableValue::Default => None,
1881            plan::VariableValue::Values(values) => Some(values),
1882        };
1883
1884        match values {
1885            Some(values) => {
1886                vars.set(
1887                    self.catalog().system_config(),
1888                    &name,
1889                    VarInput::SqlSet(&values),
1890                    local,
1891                )?;
1892
1893                let vars = session.vars();
1894
1895                // Emit a warning when deprecated variables are used.
1896                // TODO(database-issues#8069) remove this after sufficient time has passed
1897                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1898                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1899                } else if name == vars::CLUSTER.name()
1900                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1901                {
1902                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1903                }
1904
1905                // Database or cluster value does not correspond to a catalog item.
1906                if name.as_str() == vars::DATABASE.name()
1907                    && matches!(
1908                        self.catalog().resolve_database(vars.database()),
1909                        Err(CatalogError::UnknownDatabase(_))
1910                    )
1911                    && session.vars().current_object_missing_warnings()
1912                {
1913                    let name = vars.database().to_string();
1914                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1915                } else if name.as_str() == vars::CLUSTER.name()
1916                    && matches!(
1917                        self.catalog().resolve_cluster(vars.cluster()),
1918                        Err(CatalogError::UnknownCluster(_))
1919                    )
1920                    && session.vars().current_object_missing_warnings()
1921                {
1922                    let name = vars.cluster().to_string();
1923                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1924                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1925                    let v = values.into_first().to_lowercase();
1926                    if v == IsolationLevel::ReadUncommitted.as_str()
1927                        || v == IsolationLevel::ReadCommitted.as_str()
1928                        || v == IsolationLevel::RepeatableRead.as_str()
1929                    {
1930                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1931                            isolation_level: v,
1932                        });
1933                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1934                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1935                    }
1936                }
1937            }
1938            None => vars.reset(self.catalog().system_config(), &name, local)?,
1939        }
1940
1941        Ok(ExecuteResponse::SetVariable { name, reset: false })
1942    }
1943
1944    pub(super) fn sequence_reset_variable(
1945        &self,
1946        session: &mut Session,
1947        plan: plan::ResetVariablePlan,
1948    ) -> Result<ExecuteResponse, AdapterError> {
1949        let name = plan.name;
1950        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1951            self.validate_set_isolation_level(session)?;
1952        }
1953        if &name == vars::CLUSTER.name() {
1954            self.validate_set_cluster(session)?;
1955        }
1956        session
1957            .vars_mut()
1958            .reset(self.catalog().system_config(), &name, false)?;
1959        Ok(ExecuteResponse::SetVariable { name, reset: true })
1960    }
1961
1962    pub(super) fn sequence_set_transaction(
1963        &self,
1964        session: &mut Session,
1965        plan: plan::SetTransactionPlan,
1966    ) -> Result<ExecuteResponse, AdapterError> {
1967        // TODO(jkosh44) Only supports isolation levels for now.
1968        for mode in plan.modes {
1969            match mode {
1970                TransactionMode::AccessMode(_) => {
1971                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1972                }
1973                TransactionMode::IsolationLevel(isolation_level) => {
1974                    self.validate_set_isolation_level(session)?;
1975
1976                    session.vars_mut().set(
1977                        self.catalog().system_config(),
1978                        TRANSACTION_ISOLATION_VAR_NAME,
1979                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1980                        plan.local,
1981                    )?
1982                }
1983            }
1984        }
1985        Ok(ExecuteResponse::SetVariable {
1986            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1987            reset: false,
1988        })
1989    }
1990
1991    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1992        if session.transaction().contains_ops() {
1993            Err(AdapterError::InvalidSetIsolationLevel)
1994        } else {
1995            Ok(())
1996        }
1997    }
1998
1999    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2000        if session.transaction().contains_ops() {
2001            Err(AdapterError::InvalidSetCluster)
2002        } else {
2003            Ok(())
2004        }
2005    }
2006
2007    #[instrument]
2008    pub(super) async fn sequence_end_transaction(
2009        &mut self,
2010        mut ctx: ExecuteContext,
2011        mut action: EndTransactionAction,
2012    ) {
2013        // If the transaction has failed, we can only rollback.
2014        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2015            (&action, ctx.session().transaction())
2016        {
2017            action = EndTransactionAction::Rollback;
2018        }
2019        let response = match action {
2020            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2021                params: BTreeMap::new(),
2022            }),
2023            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2024                params: BTreeMap::new(),
2025            }),
2026        };
2027
2028        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2029
2030        let (response, action) = match result {
2031            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2032                (response, action)
2033            }
2034            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2035                // Make sure we have the correct set of write locks for this transaction.
2036                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2037                // transactions.
2038                let validated_locks = match write_lock_guards {
2039                    None => None,
2040                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2041                        Ok(locks) => Some(locks),
2042                        Err(missing) => {
2043                            tracing::error!(?missing, "programming error, missing write locks");
2044                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2045                        }
2046                    },
2047                };
2048
2049                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2050                for WriteOp { id, rows } in writes {
2051                    let total_rows = collected_writes.entry(id).or_default();
2052                    total_rows.push(rows);
2053                }
2054
2055                self.submit_write(PendingWriteTxn::User {
2056                    span: Span::current(),
2057                    writes: collected_writes,
2058                    write_locks: validated_locks,
2059                    pending_txn: PendingTxn {
2060                        ctx,
2061                        response,
2062                        action,
2063                    },
2064                });
2065                return;
2066            }
2067            Ok((
2068                Some(TransactionOps::Peeks {
2069                    determination,
2070                    requires_linearization: RequireLinearization::Required,
2071                    ..
2072                }),
2073                _,
2074            )) if ctx.session().vars().transaction_isolation()
2075                == &IsolationLevel::StrictSerializable =>
2076            {
2077                let conn_id = ctx.session().conn_id().clone();
2078                let pending_read_txn = PendingReadTxn {
2079                    txn: PendingRead::Read {
2080                        txn: PendingTxn {
2081                            ctx,
2082                            response,
2083                            action,
2084                        },
2085                    },
2086                    timestamp_context: determination.timestamp_context,
2087                    created: Instant::now(),
2088                    num_requeues: 0,
2089                    otel_ctx: OpenTelemetryContext::obtain(),
2090                };
2091                self.strict_serializable_reads_tx
2092                    .send((conn_id, pending_read_txn))
2093                    .expect("sending to strict_serializable_reads_tx cannot fail");
2094                return;
2095            }
2096            Ok((
2097                Some(TransactionOps::Peeks {
2098                    determination,
2099                    requires_linearization: RequireLinearization::Required,
2100                    ..
2101                }),
2102                _,
2103            )) if ctx.session().vars().transaction_isolation()
2104                == &IsolationLevel::StrongSessionSerializable =>
2105            {
2106                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2107                    ctx.session_mut()
2108                        .ensure_timestamp_oracle(timeline.clone())
2109                        .apply_write(*ts);
2110                }
2111                (response, action)
2112            }
2113            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2114                self.internal_cmd_tx
2115                    .send(Message::ExecuteSingleStatementTransaction {
2116                        ctx,
2117                        otel_ctx: OpenTelemetryContext::obtain(),
2118                        stmt,
2119                        params,
2120                    })
2121                    .expect("must send");
2122                return;
2123            }
2124            Ok((_, _)) => (response, action),
2125            Err(err) => (Err(err), EndTransactionAction::Rollback),
2126        };
2127        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2128        // Append any parameters that changed to the response.
2129        let response = response.map(|mut r| {
2130            r.extend_params(changed);
2131            ExecuteResponse::from(r)
2132        });
2133
2134        ctx.retire(response);
2135    }
2136
2137    #[instrument]
2138    async fn sequence_end_transaction_inner(
2139        &mut self,
2140        ctx: &mut ExecuteContext,
2141        action: EndTransactionAction,
2142    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2143        let txn = self.clear_transaction(ctx.session_mut()).await;
2144
2145        if let EndTransactionAction::Commit = action {
2146            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2147                match &mut ops {
2148                    TransactionOps::Writes(writes) => {
2149                        for WriteOp { id, .. } in &mut writes.iter() {
2150                            // Re-verify this id exists.
2151                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2152                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2153                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2154                                })
2155                            })?;
2156                        }
2157
2158                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2159                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2160                    }
2161                    TransactionOps::DDL {
2162                        ops,
2163                        state: _,
2164                        side_effects,
2165                        revision,
2166                    } => {
2167                        // Make sure our catalog hasn't changed.
2168                        if *revision != self.catalog().transient_revision() {
2169                            return Err(AdapterError::DDLTransactionRace);
2170                        }
2171                        // Commit all of our queued ops.
2172                        let ops = std::mem::take(ops);
2173                        let side_effects = std::mem::take(side_effects);
2174                        self.catalog_transact_with_side_effects(
2175                            Some(ctx),
2176                            ops,
2177                            move |a, mut ctx| {
2178                                Box::pin(async move {
2179                                    for side_effect in side_effects {
2180                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2181                                    }
2182                                })
2183                            },
2184                        )
2185                        .await?;
2186                    }
2187                    _ => (),
2188                }
2189                return Ok((Some(ops), write_lock_guards));
2190            }
2191        }
2192
2193        Ok((None, None))
2194    }
2195
2196    pub(super) async fn sequence_side_effecting_func(
2197        &mut self,
2198        ctx: ExecuteContext,
2199        plan: SideEffectingFunc,
2200    ) {
2201        match plan {
2202            SideEffectingFunc::PgCancelBackend { connection_id } => {
2203                if ctx.session().conn_id().unhandled() == connection_id {
2204                    // As a special case, if we're canceling ourselves, we send
2205                    // back a canceled resposne to the client issuing the query,
2206                    // and so we need to do no further processing of the cancel.
2207                    ctx.retire(Err(AdapterError::Canceled));
2208                    return;
2209                }
2210
2211                let res = if let Some((id_handle, _conn_meta)) =
2212                    self.active_conns.get_key_value(&connection_id)
2213                {
2214                    // check_plan already verified role membership.
2215                    self.handle_privileged_cancel(id_handle.clone()).await;
2216                    Datum::True
2217                } else {
2218                    Datum::False
2219                };
2220                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2221            }
2222        }
2223    }
2224
2225    /// Execute a side-effecting function from the frontend peek path.
2226    /// This is separate from `sequence_side_effecting_func` because
2227    /// - It doesn't have an ExecuteContext.
2228    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2229    ///   peek sequencing can't look at `active_conns`.
2230    ///
2231    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2232    /// sequencing.
2233    pub(crate) async fn execute_side_effecting_func(
2234        &mut self,
2235        plan: SideEffectingFunc,
2236        conn_id: ConnectionId,
2237        current_role: RoleId,
2238    ) -> Result<ExecuteResponse, AdapterError> {
2239        match plan {
2240            SideEffectingFunc::PgCancelBackend { connection_id } => {
2241                if conn_id.unhandled() == connection_id {
2242                    // As a special case, if we're canceling ourselves, we return
2243                    // a canceled response to the client issuing the query,
2244                    // and so we need to do no further processing of the cancel.
2245                    return Err(AdapterError::Canceled);
2246                }
2247
2248                // Perform RBAC check: the current user must be a member of the role
2249                // that owns the connection being cancelled.
2250                if let Some((_id_handle, conn_meta)) =
2251                    self.active_conns.get_key_value(&connection_id)
2252                {
2253                    let target_role = *conn_meta.authenticated_role_id();
2254                    let role_membership = self
2255                        .catalog()
2256                        .state()
2257                        .collect_role_membership(&current_role);
2258                    if !role_membership.contains(&target_role) {
2259                        let target_role_name = self
2260                            .catalog()
2261                            .try_get_role(&target_role)
2262                            .map(|role| role.name().to_string())
2263                            .unwrap_or_else(|| target_role.to_string());
2264                        return Err(AdapterError::Unauthorized(
2265                            rbac::UnauthorizedError::RoleMembership {
2266                                role_names: vec![target_role_name],
2267                            },
2268                        ));
2269                    }
2270
2271                    // RBAC check passed, proceed with cancellation.
2272                    let id_handle = self
2273                        .active_conns
2274                        .get_key_value(&connection_id)
2275                        .map(|(id, _)| id.clone())
2276                        .expect("checked above");
2277                    self.handle_privileged_cancel(id_handle).await;
2278                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2279                } else {
2280                    // Connection not found, return false.
2281                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2282                }
2283            }
2284        }
2285    }
2286
2287    /// Inner method that performs the actual real-time recency timestamp determination.
2288    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2289    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2290    pub(crate) async fn determine_real_time_recent_timestamp(
2291        &self,
2292        source_ids: impl Iterator<Item = GlobalId>,
2293        real_time_recency_timeout: Duration,
2294    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2295        let item_ids = source_ids
2296            .map(|gid| {
2297                self.catalog
2298                    .try_resolve_item_id(&gid)
2299                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2300            })
2301            .collect::<Result<Vec<_>, _>>()?;
2302
2303        // Find all dependencies transitively because we need to ensure that
2304        // RTR queries determine the timestamp from the sources' (i.e.
2305        // storage objects that ingest data from external systems) remap
2306        // data. We "cheat" a little bit and filter out any IDs that aren't
2307        // user objects because we know they are not a RTR source.
2308        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2309        // If none of the sources are user objects, we don't need to provide
2310        // a RTR timestamp.
2311        if to_visit.is_empty() {
2312            return Ok(None);
2313        }
2314
2315        let mut timestamp_objects = BTreeSet::new();
2316
2317        while let Some(id) = to_visit.pop_front() {
2318            timestamp_objects.insert(id);
2319            to_visit.extend(
2320                self.catalog()
2321                    .get_entry(&id)
2322                    .uses()
2323                    .into_iter()
2324                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2325            );
2326        }
2327        let timestamp_objects = timestamp_objects
2328            .into_iter()
2329            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2330            .collect();
2331
2332        let r = self
2333            .controller
2334            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2335            .await?;
2336
2337        Ok(Some(r))
2338    }
2339
2340    /// Checks to see if the session needs a real time recency timestamp and if so returns
2341    /// a future that will return the timestamp.
2342    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2343        &self,
2344        session: &Session,
2345        source_ids: impl Iterator<Item = GlobalId>,
2346    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2347        let vars = session.vars();
2348
2349        if vars.real_time_recency()
2350            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2351            && !session.contains_read_timestamp()
2352        {
2353            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2354                .await
2355        } else {
2356            Ok(None)
2357        }
2358    }
2359
2360    #[instrument]
2361    pub(super) async fn sequence_explain_plan(
2362        &mut self,
2363        ctx: ExecuteContext,
2364        plan: plan::ExplainPlanPlan,
2365        target_cluster: TargetCluster,
2366    ) {
2367        match &plan.explainee {
2368            plan::Explainee::Statement(stmt) => match stmt {
2369                plan::ExplaineeStatement::CreateView { .. } => {
2370                    self.explain_create_view(ctx, plan).await;
2371                }
2372                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2373                    self.explain_create_materialized_view(ctx, plan).await;
2374                }
2375                plan::ExplaineeStatement::CreateIndex { .. } => {
2376                    self.explain_create_index(ctx, plan).await;
2377                }
2378                plan::ExplaineeStatement::Select { .. } => {
2379                    self.explain_peek(ctx, plan, target_cluster).await;
2380                }
2381                plan::ExplaineeStatement::Subscribe { .. } => {
2382                    self.explain_subscribe(ctx, plan, target_cluster).await;
2383                }
2384            },
2385            plan::Explainee::View(_) => {
2386                let result = self.explain_view(&ctx, plan);
2387                ctx.retire(result);
2388            }
2389            plan::Explainee::MaterializedView(_) => {
2390                let result = self.explain_materialized_view(&ctx, plan);
2391                ctx.retire(result);
2392            }
2393            plan::Explainee::Index(_) => {
2394                let result = self.explain_index(&ctx, plan);
2395                ctx.retire(result);
2396            }
2397            plan::Explainee::ReplanView(_) => {
2398                self.explain_replan_view(ctx, plan).await;
2399            }
2400            plan::Explainee::ReplanMaterializedView(_) => {
2401                self.explain_replan_materialized_view(ctx, plan).await;
2402            }
2403            plan::Explainee::ReplanIndex(_) => {
2404                self.explain_replan_index(ctx, plan).await;
2405            }
2406        };
2407    }
2408
2409    pub(super) async fn sequence_explain_pushdown(
2410        &mut self,
2411        ctx: ExecuteContext,
2412        plan: plan::ExplainPushdownPlan,
2413        target_cluster: TargetCluster,
2414    ) {
2415        match plan.explainee {
2416            Explainee::Statement(ExplaineeStatement::Select {
2417                broken: false,
2418                plan,
2419                desc: _,
2420            }) => {
2421                let stage = return_if_err!(
2422                    self.peek_validate(
2423                        ctx.session(),
2424                        plan,
2425                        target_cluster,
2426                        None,
2427                        ExplainContext::Pushdown,
2428                        Some(ctx.session().vars().max_query_result_size()),
2429                    ),
2430                    ctx
2431                );
2432                self.sequence_staged(ctx, Span::current(), stage).await;
2433            }
2434            Explainee::MaterializedView(item_id) => {
2435                self.explain_pushdown_materialized_view(ctx, item_id).await;
2436            }
2437            _ => {
2438                ctx.retire(Err(AdapterError::Unsupported(
2439                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2440                )));
2441            }
2442        };
2443    }
2444
2445    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2446    async fn execute_explain_pushdown_with_read_holds(
2447        &self,
2448        ctx: ExecuteContext,
2449        as_of: Antichain<Timestamp>,
2450        mz_now: ResultSpec<'static>,
2451        read_holds: Option<ReadHolds<Timestamp>>,
2452        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2453    ) {
2454        let fut = self
2455            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2456            .await;
2457        task::spawn(|| "render explain pushdown", async move {
2458            // Transfer the necessary read holds over to the background task
2459            let _read_holds = read_holds;
2460            let res = fut.await;
2461            ctx.retire(res);
2462        });
2463    }
2464
2465    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2466    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2467        &self,
2468        session: &Session,
2469        as_of: Antichain<Timestamp>,
2470        mz_now: ResultSpec<'static>,
2471        imports: I,
2472    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2473        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2474        super::explain_pushdown_future_inner(
2475            session,
2476            &self.catalog,
2477            &self.controller.storage_collections,
2478            as_of,
2479            mz_now,
2480            imports,
2481        )
2482        .await
2483    }
2484
2485    #[instrument]
2486    pub(super) async fn sequence_insert(
2487        &mut self,
2488        mut ctx: ExecuteContext,
2489        plan: plan::InsertPlan,
2490    ) {
2491        // Normally, this would get checked when trying to add "write ops" to
2492        // the transaction but we go down diverging paths below, based on
2493        // whether the INSERT is only constant values or not.
2494        //
2495        // For the non-constant case we sequence an implicit read-then-write,
2496        // which messes with the transaction ops and would allow an implicit
2497        // read-then-write to sneak into a read-only transaction.
2498        if !ctx.session_mut().transaction().allows_writes() {
2499            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2500            return;
2501        }
2502
2503        // The structure of this code originates from a time where
2504        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2505        // optimized `MirRelationExpr`.
2506        //
2507        // Ideally, we would like to make the `selection.as_const().is_some()`
2508        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2509        // are planned as a Wrap($n, $vals) call, so until we can reduce
2510        // HirRelationExpr this will always returns false.
2511        //
2512        // Unfortunately, hitting the default path of the match below also
2513        // causes a lot of tests to fail, so we opted to go with the extra
2514        // `plan.values.clone()` statements when producing the `optimized_mir`
2515        // and re-optimize the values in the `sequence_read_then_write` call.
2516        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2517            // We don't perform any optimizations on an expression that is already
2518            // a constant for writes, as we want to maximize bulk-insert throughput.
2519            let expr = return_if_err!(
2520                plan.values
2521                    .clone()
2522                    .lower(self.catalog().system_config(), None),
2523                ctx
2524            );
2525            OptimizedMirRelationExpr(expr)
2526        } else {
2527            // Collect optimizer parameters.
2528            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2529
2530            // (`optimize::view::Optimizer` has a special case for constant queries.)
2531            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2532
2533            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2534            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2535        };
2536
2537        match optimized_mir.into_inner() {
2538            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2539                let catalog = self.owned_catalog();
2540                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2541                    let result =
2542                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2543                    ctx.retire(result);
2544                });
2545            }
2546            // All non-constant values must be planned as read-then-writes.
2547            _ => {
2548                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2549                    Some(table) => {
2550                        // Inserts always occur at the latest version of the table.
2551                        let desc = table.relation_desc_latest().expect("table has a desc");
2552                        desc.arity()
2553                    }
2554                    None => {
2555                        ctx.retire(Err(AdapterError::Catalog(
2556                            mz_catalog::memory::error::Error {
2557                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2558                                    plan.id.to_string(),
2559                                )),
2560                            },
2561                        )));
2562                        return;
2563                    }
2564                };
2565
2566                let finishing = RowSetFinishing {
2567                    order_by: vec![],
2568                    limit: None,
2569                    offset: 0,
2570                    project: (0..desc_arity).collect(),
2571                };
2572
2573                let read_then_write_plan = plan::ReadThenWritePlan {
2574                    id: plan.id,
2575                    selection: plan.values,
2576                    finishing,
2577                    assignments: BTreeMap::new(),
2578                    kind: MutationKind::Insert,
2579                    returning: plan.returning,
2580                };
2581
2582                self.sequence_read_then_write(ctx, read_then_write_plan)
2583                    .await;
2584            }
2585        }
2586    }
2587
2588    /// ReadThenWrite is a plan whose writes depend on the results of a
2589    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2590    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2591    /// serializability violation could occur.
2592    #[instrument]
2593    pub(super) async fn sequence_read_then_write(
2594        &mut self,
2595        mut ctx: ExecuteContext,
2596        plan: plan::ReadThenWritePlan,
2597    ) {
2598        let mut source_ids: BTreeSet<_> = plan
2599            .selection
2600            .depends_on()
2601            .into_iter()
2602            .map(|gid| self.catalog().resolve_item_id(&gid))
2603            .collect();
2604        source_ids.insert(plan.id);
2605
2606        // If the transaction doesn't already have write locks, acquire them.
2607        if ctx.session().transaction().write_locks().is_none() {
2608            // Pre-define all of the locks we need.
2609            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2610
2611            // Try acquiring all of our locks.
2612            for id in &source_ids {
2613                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2614                    write_locks.insert_lock(*id, lock);
2615                }
2616            }
2617
2618            // See if we acquired all of the neccessary locks.
2619            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2620                Ok(locks) => locks,
2621                Err(missing) => {
2622                    // Defer our write if we couldn't acquire all of the locks.
2623                    let role_metadata = ctx.session().role_metadata().clone();
2624                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2625                    let plan = DeferredPlan {
2626                        ctx,
2627                        plan: Plan::ReadThenWrite(plan),
2628                        validity: PlanValidity::new(
2629                            self.catalog.transient_revision(),
2630                            source_ids.clone(),
2631                            None,
2632                            None,
2633                            role_metadata,
2634                        ),
2635                        requires_locks: source_ids,
2636                    };
2637                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2638                }
2639            };
2640
2641            ctx.session_mut()
2642                .try_grant_write_locks(write_locks)
2643                .expect("session has already been granted write locks");
2644        }
2645
2646        let plan::ReadThenWritePlan {
2647            id,
2648            kind,
2649            selection,
2650            mut assignments,
2651            finishing,
2652            mut returning,
2653        } = plan;
2654
2655        // Read then writes can be queued, so re-verify the id exists.
2656        let desc = match self.catalog().try_get_entry(&id) {
2657            Some(table) => {
2658                // Inserts always occur at the latest version of the table.
2659                table
2660                    .relation_desc_latest()
2661                    .expect("table has a desc")
2662                    .into_owned()
2663            }
2664            None => {
2665                ctx.retire(Err(AdapterError::Catalog(
2666                    mz_catalog::memory::error::Error {
2667                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2668                    },
2669                )));
2670                return;
2671            }
2672        };
2673
2674        // Disallow mz_now in any position because read time and write time differ.
2675        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2676            || assignments.values().any(|e| e.contains_temporal())
2677            || returning.iter().any(|e| e.contains_temporal());
2678        if contains_temporal {
2679            ctx.retire(Err(AdapterError::Unsupported(
2680                "calls to mz_now in write statements",
2681            )));
2682            return;
2683        }
2684
2685        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2686        //
2687        // - They do not refer to any objects whose notion of time moves differently than that of
2688        // user tables. This limitation is meant to ensure no writes occur between this read and the
2689        // subsequent write.
2690        // - They do not use mz_now(), whose time produced during read will differ from the write
2691        //   timestamp.
2692        fn validate_read_dependencies(
2693            catalog: &Catalog,
2694            id: &CatalogItemId,
2695        ) -> Result<(), AdapterError> {
2696            use CatalogItemType::*;
2697            use mz_catalog::memory::objects;
2698            let mut ids_to_check = Vec::new();
2699            let valid = match catalog.try_get_entry(id) {
2700                Some(entry) => {
2701                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2702                    | CatalogItem::MaterializedView(objects::MaterializedView {
2703                        optimized_expr,
2704                        ..
2705                    }) = entry.item()
2706                    {
2707                        if optimized_expr.contains_temporal() {
2708                            return Err(AdapterError::Unsupported(
2709                                "calls to mz_now in write statements",
2710                            ));
2711                        }
2712                    }
2713                    match entry.item().typ() {
2714                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2715                            ids_to_check.extend(entry.uses());
2716                            let valid_id = id.is_user() || matches!(typ, Func);
2717                            valid_id
2718                        }
2719                        Source | Secret | Connection => false,
2720                        // Cannot select from sinks or indexes.
2721                        Sink | Index => unreachable!(),
2722                        Table => {
2723                            if !id.is_user() {
2724                                // We can't read from non-user tables
2725                                false
2726                            } else {
2727                                // We can't read from tables that are source-exports
2728                                entry.source_export_details().is_none()
2729                            }
2730                        }
2731                        Type => true,
2732                    }
2733                }
2734                None => false,
2735            };
2736            if !valid {
2737                let (object_name, object_type) = match catalog.try_get_entry(id) {
2738                    Some(entry) => {
2739                        let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2740                        let object_type = match entry.item().typ() {
2741                            // We only need the disallowed types here; the allowed types are handled above.
2742                            Source => "source",
2743                            Secret => "secret",
2744                            Connection => "connection",
2745                            Table => {
2746                                if !id.is_user() {
2747                                    "system table"
2748                                } else {
2749                                    "source-export table"
2750                                }
2751                            }
2752                            View => "system view",
2753                            MaterializedView => "system materialized view",
2754                            ContinualTask => "system task",
2755                            _ => "invalid dependency",
2756                        };
2757                        (object_name, object_type.to_string())
2758                    }
2759                    None => (id.to_string(), "unknown".to_string()),
2760                };
2761                return Err(AdapterError::InvalidTableMutationSelection {
2762                    object_name,
2763                    object_type,
2764                });
2765            }
2766            for id in ids_to_check {
2767                validate_read_dependencies(catalog, &id)?;
2768            }
2769            Ok(())
2770        }
2771
2772        for gid in selection.depends_on() {
2773            let item_id = self.catalog().resolve_item_id(&gid);
2774            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2775                ctx.retire(Err(err));
2776                return;
2777            }
2778        }
2779
2780        let (peek_tx, peek_rx) = oneshot::channel();
2781        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2782        let (tx, _, session, extra) = ctx.into_parts();
2783        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2784        // execution context, because this peek does not directly correspond to an execute,
2785        // and so we don't need to take any action on its retirement.
2786        // TODO[btv]: we might consider extending statement logging to log the inner
2787        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2788        // and mint a new "real" execution context here. We'd also have to add some logic to
2789        // make sure such "sub-statements" are always sampled when the top-level statement is
2790        //
2791        // It's debatable whether this makes sense conceptually,
2792        // because the inner fragment here is not actually a
2793        // "statement" in its own right.
2794        let peek_ctx = ExecuteContext::from_parts(
2795            peek_client_tx,
2796            self.internal_cmd_tx.clone(),
2797            session,
2798            Default::default(),
2799        );
2800
2801        self.sequence_peek(
2802            peek_ctx,
2803            plan::SelectPlan {
2804                select: None,
2805                source: selection,
2806                when: QueryWhen::FreshestTableWrite,
2807                finishing,
2808                copy_to: None,
2809            },
2810            TargetCluster::Active,
2811            None,
2812        )
2813        .await;
2814
2815        let internal_cmd_tx = self.internal_cmd_tx.clone();
2816        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2817        let catalog = self.owned_catalog();
2818        let max_result_size = self.catalog().system_config().max_result_size();
2819
2820        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2821            let (peek_response, session) = match peek_rx.await {
2822                Ok(Response {
2823                    result: Ok(resp),
2824                    session,
2825                    otel_ctx,
2826                }) => {
2827                    otel_ctx.attach_as_parent();
2828                    (resp, session)
2829                }
2830                Ok(Response {
2831                    result: Err(e),
2832                    session,
2833                    otel_ctx,
2834                }) => {
2835                    let ctx =
2836                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2837                    otel_ctx.attach_as_parent();
2838                    ctx.retire(Err(e));
2839                    return;
2840                }
2841                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2842                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2843            };
2844            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2845            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2846
2847            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2848            if timeout_dur == Duration::ZERO {
2849                timeout_dur = Duration::MAX;
2850            }
2851
2852            let style = ExprPrepOneShot {
2853                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2854                session: ctx.session(),
2855                catalog_state: catalog.state(),
2856            };
2857            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2858                return_if_err!(style.prep_scalar_expr(expr), ctx);
2859            }
2860
2861            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2862                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2863                    let arena = RowArena::new();
2864                    let mut diffs = Vec::new();
2865                    let mut datum_vec = mz_repr::DatumVec::new();
2866
2867                    while let Some(row) = rows.next() {
2868                        if !assignments.is_empty() {
2869                            assert!(
2870                                matches!(kind, MutationKind::Update),
2871                                "only updates support assignments"
2872                            );
2873                            let mut datums = datum_vec.borrow_with(row);
2874                            let mut updates = vec![];
2875                            for (idx, expr) in &assignments {
2876                                let updated = match expr.eval(&datums, &arena) {
2877                                    Ok(updated) => updated,
2878                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2879                                };
2880                                updates.push((*idx, updated));
2881                            }
2882                            for (idx, new_value) in updates {
2883                                datums[idx] = new_value;
2884                            }
2885                            let updated = Row::pack_slice(&datums);
2886                            diffs.push((updated, Diff::ONE));
2887                        }
2888                        match kind {
2889                            // Updates and deletes always remove the
2890                            // current row. Updates will also add an
2891                            // updated value.
2892                            MutationKind::Update | MutationKind::Delete => {
2893                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2894                            }
2895                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2896                        }
2897                    }
2898
2899                    // Sum of all the rows' byte size, for checking if we go
2900                    // above the max_result_size threshold.
2901                    let mut byte_size: u64 = 0;
2902                    for (row, diff) in &diffs {
2903                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2904                        if diff.is_positive() {
2905                            for (idx, datum) in row.iter().enumerate() {
2906                                desc.constraints_met(idx, &datum)?;
2907                            }
2908                        }
2909                    }
2910                    Ok((diffs, byte_size))
2911                };
2912
2913            let diffs = match peek_response {
2914                ExecuteResponse::SendingRowsStreaming {
2915                    rows: mut rows_stream,
2916                    ..
2917                } => {
2918                    let mut byte_size: u64 = 0;
2919                    let mut diffs = Vec::new();
2920                    let result = loop {
2921                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2922                            Ok(Some(res)) => match res {
2923                                PeekResponseUnary::Rows(new_rows) => {
2924                                    match make_diffs(new_rows) {
2925                                        Ok((mut new_diffs, new_byte_size)) => {
2926                                            byte_size = byte_size.saturating_add(new_byte_size);
2927                                            if byte_size > max_result_size {
2928                                                break Err(AdapterError::ResultSize(format!(
2929                                                    "result exceeds max size of {max_result_size}"
2930                                                )));
2931                                            }
2932                                            diffs.append(&mut new_diffs)
2933                                        }
2934                                        Err(e) => break Err(e),
2935                                    };
2936                                }
2937                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2938                                PeekResponseUnary::Error(e) => {
2939                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2940                                }
2941                            },
2942                            Ok(None) => break Ok(diffs),
2943                            Err(_) => {
2944                                // We timed out, so remove the pending peek. This is
2945                                // best-effort and doesn't guarantee we won't
2946                                // receive a response.
2947                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2948                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2949                                    conn_id: ctx.session().conn_id().clone(),
2950                                });
2951                                if let Err(e) = result {
2952                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2953                                }
2954                                break Err(AdapterError::StatementTimeout);
2955                            }
2956                        }
2957                    };
2958
2959                    result
2960                }
2961                ExecuteResponse::SendingRowsImmediate { rows } => {
2962                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2963                }
2964                resp => Err(AdapterError::Unstructured(anyhow!(
2965                    "unexpected peek response: {resp:?}"
2966                ))),
2967            };
2968
2969            let mut returning_rows = Vec::new();
2970            let mut diff_err: Option<AdapterError> = None;
2971            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2972                let arena = RowArena::new();
2973                for (row, diff) in diffs {
2974                    if !diff.is_positive() {
2975                        continue;
2976                    }
2977                    let mut returning_row = Row::with_capacity(returning.len());
2978                    let mut packer = returning_row.packer();
2979                    for expr in &returning {
2980                        let datums: Vec<_> = row.iter().collect();
2981                        match expr.eval(&datums, &arena) {
2982                            Ok(datum) => {
2983                                packer.push(datum);
2984                            }
2985                            Err(err) => {
2986                                diff_err = Some(err.into());
2987                                break;
2988                            }
2989                        }
2990                    }
2991                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2992                    let diff = match NonZeroUsize::try_from(diff) {
2993                        Ok(diff) => diff,
2994                        Err(err) => {
2995                            diff_err = Some(err.into());
2996                            break;
2997                        }
2998                    };
2999                    returning_rows.push((returning_row, diff));
3000                    if diff_err.is_some() {
3001                        break;
3002                    }
3003                }
3004            }
3005            let diffs = if let Some(err) = diff_err {
3006                Err(err)
3007            } else {
3008                diffs
3009            };
3010
3011            // We need to clear out the timestamp context so the write doesn't fail due to a
3012            // read only transaction.
3013            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3014            // No matter what isolation level the client is using, we must linearize this
3015            // read. The write will be performed right after this, as part of a single
3016            // transaction, so the write must have a timestamp greater than or equal to the
3017            // read.
3018            //
3019            // Note: It's only OK for the write to have a greater timestamp than the read
3020            // because the write lock prevents any other writes from happening in between
3021            // the read and write.
3022            if let Some(timestamp_context) = timestamp_context {
3023                let (tx, rx) = tokio::sync::oneshot::channel();
3024                let conn_id = ctx.session().conn_id().clone();
3025                let pending_read_txn = PendingReadTxn {
3026                    txn: PendingRead::ReadThenWrite { ctx, tx },
3027                    timestamp_context,
3028                    created: Instant::now(),
3029                    num_requeues: 0,
3030                    otel_ctx: OpenTelemetryContext::obtain(),
3031                };
3032                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3033                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3034                if let Err(e) = result {
3035                    warn!(
3036                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3037                        e
3038                    );
3039                    return;
3040                }
3041                let result = rx.await;
3042                // It is not an error for these results to be ready after `tx` has been dropped.
3043                ctx = match result {
3044                    Ok(Some(ctx)) => ctx,
3045                    Ok(None) => {
3046                        // Coordinator took our context and will handle responding to the client.
3047                        // This usually indicates that our transaction was aborted.
3048                        return;
3049                    }
3050                    Err(e) => {
3051                        warn!(
3052                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3053                            e
3054                        );
3055                        return;
3056                    }
3057                };
3058            }
3059
3060            match diffs {
3061                Ok(diffs) => {
3062                    let result = Self::send_diffs(
3063                        ctx.session_mut(),
3064                        plan::SendDiffsPlan {
3065                            id,
3066                            updates: diffs,
3067                            kind,
3068                            returning: returning_rows,
3069                            max_result_size,
3070                        },
3071                    );
3072                    ctx.retire(result);
3073                }
3074                Err(e) => {
3075                    ctx.retire(Err(e));
3076                }
3077            }
3078        });
3079    }
3080
3081    #[instrument]
3082    pub(super) async fn sequence_alter_item_rename(
3083        &mut self,
3084        ctx: &mut ExecuteContext,
3085        plan: plan::AlterItemRenamePlan,
3086    ) -> Result<ExecuteResponse, AdapterError> {
3087        let op = catalog::Op::RenameItem {
3088            id: plan.id,
3089            current_full_name: plan.current_full_name,
3090            to_name: plan.to_name,
3091        };
3092        match self
3093            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3094            .await
3095        {
3096            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3097            Err(err) => Err(err),
3098        }
3099    }
3100
3101    #[instrument]
3102    pub(super) async fn sequence_alter_retain_history(
3103        &mut self,
3104        ctx: &mut ExecuteContext,
3105        plan: plan::AlterRetainHistoryPlan,
3106    ) -> Result<ExecuteResponse, AdapterError> {
3107        let ops = vec![catalog::Op::AlterRetainHistory {
3108            id: plan.id,
3109            value: plan.value,
3110            window: plan.window,
3111        }];
3112        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3113            Box::pin(async move {
3114                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3115                let cluster = match catalog_item {
3116                    CatalogItem::Table(_)
3117                    | CatalogItem::MaterializedView(_)
3118                    | CatalogItem::Source(_)
3119                    | CatalogItem::ContinualTask(_) => None,
3120                    CatalogItem::Index(index) => Some(index.cluster_id),
3121                    CatalogItem::Log(_)
3122                    | CatalogItem::View(_)
3123                    | CatalogItem::Sink(_)
3124                    | CatalogItem::Type(_)
3125                    | CatalogItem::Func(_)
3126                    | CatalogItem::Secret(_)
3127                    | CatalogItem::Connection(_) => unreachable!(),
3128                };
3129                match cluster {
3130                    Some(cluster) => {
3131                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3132                    }
3133                    None => {
3134                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3135                    }
3136                }
3137            })
3138        })
3139        .await?;
3140        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3141    }
3142
3143    #[instrument]
3144    pub(super) async fn sequence_alter_source_timestamp_interval(
3145        &mut self,
3146        ctx: &mut ExecuteContext,
3147        plan: plan::AlterSourceTimestampIntervalPlan,
3148    ) -> Result<ExecuteResponse, AdapterError> {
3149        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3150            id: plan.id,
3151            value: plan.value,
3152            interval: plan.interval,
3153        }];
3154        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3155            Box::pin(async move {
3156                let source = coord
3157                    .catalog()
3158                    .get_entry(&plan.id)
3159                    .source()
3160                    .expect("known to be source");
3161                let (global_id, desc) = match &source.data_source {
3162                    DataSourceDesc::Ingestion { desc, .. }
3163                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3164                        (source.global_id, desc.clone())
3165                    }
3166                    _ => return,
3167                };
3168                let desc = desc.into_inline_connection(coord.catalog().state());
3169                coord
3170                    .controller
3171                    .storage
3172                    .alter_ingestion_source_desc(BTreeMap::from([(global_id, desc)]))
3173                    .await
3174                    .unwrap_or_terminate("cannot fail to alter ingestion source desc");
3175            })
3176        })
3177        .await?;
3178        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3179    }
3180
3181    #[instrument]
3182    pub(super) async fn sequence_alter_schema_rename(
3183        &mut self,
3184        ctx: &mut ExecuteContext,
3185        plan: plan::AlterSchemaRenamePlan,
3186    ) -> Result<ExecuteResponse, AdapterError> {
3187        let (database_spec, schema_spec) = plan.cur_schema_spec;
3188        let op = catalog::Op::RenameSchema {
3189            database_spec,
3190            schema_spec,
3191            new_name: plan.new_schema_name,
3192            check_reserved_names: true,
3193        };
3194        match self
3195            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3196            .await
3197        {
3198            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3199            Err(err) => Err(err),
3200        }
3201    }
3202
3203    #[instrument]
3204    pub(super) async fn sequence_alter_schema_swap(
3205        &mut self,
3206        ctx: &mut ExecuteContext,
3207        plan: plan::AlterSchemaSwapPlan,
3208    ) -> Result<ExecuteResponse, AdapterError> {
3209        let plan::AlterSchemaSwapPlan {
3210            schema_a_spec: (schema_a_db, schema_a),
3211            schema_a_name,
3212            schema_b_spec: (schema_b_db, schema_b),
3213            schema_b_name,
3214            name_temp,
3215        } = plan;
3216
3217        let op_a = catalog::Op::RenameSchema {
3218            database_spec: schema_a_db,
3219            schema_spec: schema_a,
3220            new_name: name_temp,
3221            check_reserved_names: false,
3222        };
3223        let op_b = catalog::Op::RenameSchema {
3224            database_spec: schema_b_db,
3225            schema_spec: schema_b,
3226            new_name: schema_a_name,
3227            check_reserved_names: false,
3228        };
3229        let op_c = catalog::Op::RenameSchema {
3230            database_spec: schema_a_db,
3231            schema_spec: schema_a,
3232            new_name: schema_b_name,
3233            check_reserved_names: false,
3234        };
3235
3236        match self
3237            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3238                Box::pin(async {})
3239            })
3240            .await
3241        {
3242            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3243            Err(err) => Err(err),
3244        }
3245    }
3246
3247    #[instrument]
3248    pub(super) async fn sequence_alter_role(
3249        &mut self,
3250        session: &Session,
3251        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3252    ) -> Result<ExecuteResponse, AdapterError> {
3253        let catalog = self.catalog().for_session(session);
3254        let role = catalog.get_role(&id);
3255
3256        // We'll send these notices to the user, if the operation is successful.
3257        let mut notices = vec![];
3258
3259        // Get the attributes and variables from the role, as they currently are.
3260        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3261        let mut vars = role.vars().clone();
3262
3263        // Whether to set the password to NULL. This is a special case since the existing
3264        // password is not stored in the role attributes.
3265        let mut nopassword = false;
3266
3267        // Apply our updates.
3268        match option {
3269            PlannedAlterRoleOption::Attributes(attrs) => {
3270                self.validate_role_attributes(&attrs.clone().into())?;
3271
3272                if let Some(inherit) = attrs.inherit {
3273                    attributes.inherit = inherit;
3274                }
3275
3276                if let Some(password) = attrs.password {
3277                    attributes.password = Some(password);
3278                    attributes.scram_iterations =
3279                        Some(self.catalog().system_config().scram_iterations())
3280                }
3281
3282                if let Some(superuser) = attrs.superuser {
3283                    attributes.superuser = Some(superuser);
3284                }
3285
3286                if let Some(login) = attrs.login {
3287                    attributes.login = Some(login);
3288                }
3289
3290                if attrs.nopassword.unwrap_or(false) {
3291                    nopassword = true;
3292                }
3293
3294                if let Some(notice) = self.should_emit_rbac_notice(session) {
3295                    notices.push(notice);
3296                }
3297            }
3298            PlannedAlterRoleOption::Variable(variable) => {
3299                // Get the variable to make sure it's valid and visible.
3300                let session_var = session.vars().inspect(variable.name())?;
3301                // Return early if it's not visible.
3302                session_var.visible(session.user(), catalog.system_vars())?;
3303
3304                // Emit a warning when deprecated variables are used.
3305                // TODO(database-issues#8069) remove this after sufficient time has passed
3306                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3307                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3308                } else if let PlannedRoleVariable::Set {
3309                    name,
3310                    value: VariableValue::Values(vals),
3311                } = &variable
3312                {
3313                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3314                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3315                    }
3316                }
3317
3318                let var_name = match variable {
3319                    PlannedRoleVariable::Set { name, value } => {
3320                        // Update our persisted set.
3321                        match &value {
3322                            VariableValue::Default => {
3323                                vars.remove(&name);
3324                            }
3325                            VariableValue::Values(vals) => {
3326                                let var = match &vals[..] {
3327                                    [val] => OwnedVarInput::Flat(val.clone()),
3328                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3329                                };
3330                                // Make sure the input is valid.
3331                                session_var.check(var.borrow())?;
3332
3333                                vars.insert(name.clone(), var);
3334                            }
3335                        };
3336                        name
3337                    }
3338                    PlannedRoleVariable::Reset { name } => {
3339                        // Remove it from our persisted values.
3340                        vars.remove(&name);
3341                        name
3342                    }
3343                };
3344
3345                // Emit a notice that they need to reconnect to see the change take effect.
3346                notices.push(AdapterNotice::VarDefaultUpdated {
3347                    role: Some(name.clone()),
3348                    var_name: Some(var_name),
3349                });
3350            }
3351        }
3352
3353        let op = catalog::Op::AlterRole {
3354            id,
3355            name,
3356            attributes,
3357            nopassword,
3358            vars: RoleVars { map: vars },
3359        };
3360        let response = self
3361            .catalog_transact(Some(session), vec![op])
3362            .await
3363            .map(|_| ExecuteResponse::AlteredRole)?;
3364
3365        // Send all of our queued notices.
3366        session.add_notices(notices);
3367
3368        Ok(response)
3369    }
3370
3371    #[instrument]
3372    pub(super) async fn sequence_alter_sink_prepare(
3373        &mut self,
3374        ctx: ExecuteContext,
3375        plan: plan::AlterSinkPlan,
3376    ) {
3377        // Put a read hold on the new relation
3378        let id_bundle = crate::CollectionIdBundle {
3379            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3380            compute_ids: BTreeMap::new(),
3381        };
3382        let read_hold = self.acquire_read_holds(&id_bundle);
3383
3384        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3385            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3386            return;
3387        };
3388
3389        let otel_ctx = OpenTelemetryContext::obtain();
3390        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3391
3392        let plan_validity = PlanValidity::new(
3393            self.catalog().transient_revision(),
3394            BTreeSet::from_iter([plan.item_id, from_item_id]),
3395            Some(plan.in_cluster),
3396            None,
3397            ctx.session().role_metadata().clone(),
3398        );
3399
3400        info!(
3401            "preparing alter sink for {}: frontiers={:?} export={:?}",
3402            plan.global_id,
3403            self.controller
3404                .storage_collections
3405                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3406            self.controller.storage.export(plan.global_id)
3407        );
3408
3409        // Now we must wait for the sink to make enough progress such that there is overlap between
3410        // the new `from` collection's read hold and the sink's write frontier.
3411        //
3412        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3413        // the watch set never completes and neither does the `ALTER SINK` command.
3414        self.install_storage_watch_set(
3415            ctx.session().conn_id().clone(),
3416            BTreeSet::from_iter([plan.global_id]),
3417            read_ts,
3418            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3419                ctx: Some(ctx),
3420                otel_ctx,
3421                plan,
3422                plan_validity,
3423                read_hold,
3424            }),
3425        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3426    }
3427
3428    #[instrument]
3429    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3430        ctx.otel_ctx.attach_as_parent();
3431
3432        let plan::AlterSinkPlan {
3433            item_id,
3434            global_id,
3435            sink: sink_plan,
3436            with_snapshot,
3437            in_cluster,
3438        } = ctx.plan.clone();
3439
3440        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3441        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3442        // arbitrarily changed since we performed planning, and we must re-assert that it still
3443        // matches our requirements.
3444        //
3445        // The `PlanValidity` check ensures that both the sink and the new source relation still
3446        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3447        // time, which we do by comparing the catalog sink version to the one in the plan.
3448        match ctx.plan_validity.check(self.catalog()) {
3449            Ok(()) => {}
3450            Err(err) => {
3451                ctx.retire(Err(err));
3452                return;
3453            }
3454        }
3455
3456        let entry = self.catalog().get_entry(&item_id);
3457        let CatalogItem::Sink(old_sink) = entry.item() else {
3458            panic!("invalid item kind for `AlterSinkPlan`");
3459        };
3460
3461        if sink_plan.version != old_sink.version + 1 {
3462            ctx.retire(Err(AdapterError::ChangedPlan(
3463                "sink was altered concurrently".into(),
3464            )));
3465            return;
3466        }
3467
3468        info!(
3469            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3470            self.controller
3471                .storage_collections
3472                .collections_frontiers(vec![global_id, sink_plan.from]),
3473            self.controller.storage.export(global_id),
3474        );
3475
3476        // Assert that we can recover the updates that happened at the timestamps of the write
3477        // frontier. This must be true in this call.
3478        let write_frontier = &self
3479            .controller
3480            .storage
3481            .export(global_id)
3482            .expect("sink known to exist")
3483            .write_frontier;
3484        let as_of = ctx.read_hold.least_valid_read();
3485        assert!(
3486            write_frontier.iter().all(|t| as_of.less_than(t)),
3487            "{:?} should be strictly less than {:?}",
3488            &*as_of,
3489            &**write_frontier
3490        );
3491
3492        // Parse the `create_sql` so we can update it to the new sink definition.
3493        //
3494        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3495        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3496        // names in the `create_sql` may have changed, for example due to a schema swap.
3497        let create_sql = &old_sink.create_sql;
3498        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3499        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3500            unreachable!("invalid statement kind for sink");
3501        };
3502
3503        // Update the sink version.
3504        stmt.with_options
3505            .retain(|o| o.name != CreateSinkOptionName::Version);
3506        stmt.with_options.push(CreateSinkOption {
3507            name: CreateSinkOptionName::Version,
3508            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3509                sink_plan.version.to_string(),
3510            ))),
3511        });
3512
3513        let conn_catalog = self.catalog().for_system_session();
3514        let (mut stmt, resolved_ids) =
3515            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3516
3517        // Update the `from` relation.
3518        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3519        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3520        stmt.from = ResolvedItemName::Item {
3521            id: from_entry.id(),
3522            qualifiers: from_entry.name.qualifiers.clone(),
3523            full_name,
3524            print_id: true,
3525            version: from_entry.version,
3526        };
3527
3528        let new_sink = Sink {
3529            create_sql: stmt.to_ast_string_stable(),
3530            global_id,
3531            from: sink_plan.from,
3532            connection: sink_plan.connection.clone(),
3533            envelope: sink_plan.envelope,
3534            version: sink_plan.version,
3535            with_snapshot,
3536            resolved_ids: resolved_ids.clone(),
3537            cluster_id: in_cluster,
3538            commit_interval: sink_plan.commit_interval,
3539        };
3540
3541        let ops = vec![catalog::Op::UpdateItem {
3542            id: item_id,
3543            name: entry.name().clone(),
3544            to_item: CatalogItem::Sink(new_sink),
3545        }];
3546
3547        match self
3548            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3549            .await
3550        {
3551            Ok(()) => {}
3552            Err(err) => {
3553                ctx.retire(Err(err));
3554                return;
3555            }
3556        }
3557
3558        let storage_sink_desc = StorageSinkDesc {
3559            from: sink_plan.from,
3560            from_desc: from_entry
3561                .relation_desc()
3562                .expect("sinks can only be built on items with descs")
3563                .into_owned(),
3564            connection: sink_plan
3565                .connection
3566                .clone()
3567                .into_inline_connection(self.catalog().state()),
3568            envelope: sink_plan.envelope,
3569            as_of,
3570            with_snapshot,
3571            version: sink_plan.version,
3572            from_storage_metadata: (),
3573            to_storage_metadata: (),
3574            commit_interval: sink_plan.commit_interval,
3575        };
3576
3577        self.controller
3578            .storage
3579            .alter_export(
3580                global_id,
3581                ExportDescription {
3582                    sink: storage_sink_desc,
3583                    instance_id: in_cluster,
3584                },
3585            )
3586            .await
3587            .unwrap_or_terminate("cannot fail to alter source desc");
3588
3589        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3590    }
3591
3592    #[instrument]
3593    pub(super) async fn sequence_alter_connection(
3594        &mut self,
3595        ctx: ExecuteContext,
3596        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3597    ) {
3598        match action {
3599            AlterConnectionAction::RotateKeys => {
3600                self.sequence_rotate_keys(ctx, id).await;
3601            }
3602            AlterConnectionAction::AlterOptions {
3603                set_options,
3604                drop_options,
3605                validate,
3606            } => {
3607                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3608                    .await
3609            }
3610        }
3611    }
3612
3613    #[instrument]
3614    async fn sequence_alter_connection_options(
3615        &mut self,
3616        mut ctx: ExecuteContext,
3617        id: CatalogItemId,
3618        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3619        drop_options: BTreeSet<ConnectionOptionName>,
3620        validate: bool,
3621    ) {
3622        let cur_entry = self.catalog().get_entry(&id);
3623        let cur_conn = cur_entry.connection().expect("known to be connection");
3624        let connection_gid = cur_conn.global_id();
3625
3626        let inner = || -> Result<Connection, AdapterError> {
3627            // Parse statement.
3628            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3629                .expect("invalid create sql persisted to catalog")
3630                .into_element()
3631                .ast
3632            {
3633                Statement::CreateConnection(stmt) => stmt,
3634                _ => unreachable!("proved type is source"),
3635            };
3636
3637            let catalog = self.catalog().for_system_session();
3638
3639            // Resolve items in statement
3640            let (mut create_conn_stmt, resolved_ids) =
3641                mz_sql::names::resolve(&catalog, create_conn_stmt)
3642                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3643
3644            // Retain options that are neither set nor dropped.
3645            create_conn_stmt
3646                .values
3647                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3648
3649            // Set new values
3650            create_conn_stmt.values.extend(
3651                set_options
3652                    .into_iter()
3653                    .map(|(name, value)| ConnectionOption { name, value }),
3654            );
3655
3656            // Open a new catalog, which we will use to re-plan our
3657            // statement with the desired config.
3658            let mut catalog = self.catalog().for_system_session();
3659            catalog.mark_id_unresolvable_for_replanning(id);
3660
3661            // Re-define our source in terms of the amended statement
3662            let plan = match mz_sql::plan::plan(
3663                None,
3664                &catalog,
3665                Statement::CreateConnection(create_conn_stmt),
3666                &Params::empty(),
3667                &resolved_ids,
3668            )
3669            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3670            {
3671                Plan::CreateConnection(plan) => plan,
3672                _ => unreachable!("create source plan is only valid response"),
3673            };
3674
3675            // Parse statement.
3676            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3677                .expect("invalid create sql persisted to catalog")
3678                .into_element()
3679                .ast
3680            {
3681                Statement::CreateConnection(stmt) => stmt,
3682                _ => unreachable!("proved type is source"),
3683            };
3684
3685            let catalog = self.catalog().for_system_session();
3686
3687            // Resolve items in statement
3688            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3689                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3690
3691            Ok(Connection {
3692                create_sql: plan.connection.create_sql,
3693                global_id: cur_conn.global_id,
3694                details: plan.connection.details,
3695                resolved_ids: new_deps,
3696            })
3697        };
3698
3699        let conn = match inner() {
3700            Ok(conn) => conn,
3701            Err(e) => {
3702                return ctx.retire(Err(e));
3703            }
3704        };
3705
3706        if validate {
3707            let connection = conn
3708                .details
3709                .to_connection()
3710                .into_inline_connection(self.catalog().state());
3711
3712            let internal_cmd_tx = self.internal_cmd_tx.clone();
3713            let transient_revision = self.catalog().transient_revision();
3714            let conn_id = ctx.session().conn_id().clone();
3715            let otel_ctx = OpenTelemetryContext::obtain();
3716            let role_metadata = ctx.session().role_metadata().clone();
3717            let current_storage_parameters = self.controller.storage.config().clone();
3718
3719            task::spawn(
3720                || format!("validate_alter_connection:{conn_id}"),
3721                async move {
3722                    let resolved_ids = conn.resolved_ids.clone();
3723                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3724                    let result = match connection.validate(id, &current_storage_parameters).await {
3725                        Ok(()) => Ok(conn),
3726                        Err(err) => Err(err.into()),
3727                    };
3728
3729                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3730                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3731                        AlterConnectionValidationReady {
3732                            ctx,
3733                            result,
3734                            connection_id: id,
3735                            connection_gid,
3736                            plan_validity: PlanValidity::new(
3737                                transient_revision,
3738                                dependency_ids.clone(),
3739                                None,
3740                                None,
3741                                role_metadata,
3742                            ),
3743                            otel_ctx,
3744                            resolved_ids,
3745                        },
3746                    ));
3747                    if let Err(e) = result {
3748                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3749                    }
3750                },
3751            );
3752        } else {
3753            let result = self
3754                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3755                .await;
3756            ctx.retire(result);
3757        }
3758    }
3759
3760    #[instrument]
3761    pub(crate) async fn sequence_alter_connection_stage_finish(
3762        &mut self,
3763        session: &Session,
3764        id: CatalogItemId,
3765        connection: Connection,
3766    ) -> Result<ExecuteResponse, AdapterError> {
3767        match self.catalog.get_entry(&id).item() {
3768            CatalogItem::Connection(curr_conn) => {
3769                curr_conn
3770                    .details
3771                    .to_connection()
3772                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3773                    .map_err(StorageError::from)?;
3774            }
3775            _ => unreachable!("known to be a connection"),
3776        };
3777
3778        let ops = vec![catalog::Op::UpdateItem {
3779            id,
3780            name: self.catalog.get_entry(&id).name().clone(),
3781            to_item: CatalogItem::Connection(connection.clone()),
3782        }];
3783
3784        self.catalog_transact(Some(session), ops).await?;
3785
3786        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3787        // and propagating connection changes to dependent sources, sinks, and
3788        // tables) is handled in `apply_catalog_implications` via
3789        // `handle_alter_connection`. The catalog transact above triggers that
3790        // code path.
3791
3792        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3793    }
3794
3795    #[instrument]
3796    pub(super) async fn sequence_alter_source(
3797        &mut self,
3798        session: &Session,
3799        plan::AlterSourcePlan {
3800            item_id,
3801            ingestion_id,
3802            action,
3803        }: plan::AlterSourcePlan,
3804    ) -> Result<ExecuteResponse, AdapterError> {
3805        let cur_entry = self.catalog().get_entry(&item_id);
3806        let cur_source = cur_entry.source().expect("known to be source");
3807
3808        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3809            // Parse statement.
3810            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3811                .expect("invalid create sql persisted to catalog")
3812                .into_element()
3813                .ast
3814            {
3815                Statement::CreateSource(stmt) => stmt,
3816                _ => unreachable!("proved type is source"),
3817            };
3818
3819            let catalog = coord.catalog().for_system_session();
3820
3821            // Resolve items in statement
3822            mz_sql::names::resolve(&catalog, create_source_stmt)
3823                .map_err(|e| AdapterError::internal(err_cx, e))
3824        };
3825
3826        match action {
3827            plan::AlterSourceAction::AddSubsourceExports {
3828                subsources,
3829                options,
3830            } => {
3831                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3832
3833                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3834                    text_columns: mut new_text_columns,
3835                    exclude_columns: mut new_exclude_columns,
3836                    ..
3837                } = options.try_into()?;
3838
3839                // Resolve items in statement
3840                let (mut create_source_stmt, resolved_ids) =
3841                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3842
3843                // Get all currently referred-to items
3844                let catalog = self.catalog();
3845                let curr_references: BTreeSet<_> = catalog
3846                    .get_entry(&item_id)
3847                    .used_by()
3848                    .into_iter()
3849                    .filter_map(|subsource| {
3850                        catalog
3851                            .get_entry(subsource)
3852                            .subsource_details()
3853                            .map(|(_id, reference, _details)| reference)
3854                    })
3855                    .collect();
3856
3857                // We are doing a lot of unwrapping, so just make an error to reference; all of
3858                // these invariants are guaranteed to be true because of how we plan subsources.
3859                let purification_err =
3860                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3861
3862                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3863                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3864                // statement.
3865                match &mut create_source_stmt.connection {
3866                    CreateSourceConnection::Postgres {
3867                        options: curr_options,
3868                        ..
3869                    } => {
3870                        let mz_sql::plan::PgConfigOptionExtracted {
3871                            mut text_columns, ..
3872                        } = curr_options.clone().try_into()?;
3873
3874                        // Drop text columns; we will add them back in
3875                        // as appropriate below.
3876                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3877
3878                        // Drop all text columns that are not currently referred to.
3879                        text_columns.retain(|column_qualified_reference| {
3880                            mz_ore::soft_assert_eq_or_log!(
3881                                column_qualified_reference.0.len(),
3882                                4,
3883                                "all TEXT COLUMNS values must be column-qualified references"
3884                            );
3885                            let mut table = column_qualified_reference.clone();
3886                            table.0.truncate(3);
3887                            curr_references.contains(&table)
3888                        });
3889
3890                        // Merge the current text columns into the new text columns.
3891                        new_text_columns.extend(text_columns);
3892
3893                        // If we have text columns, add them to the options.
3894                        if !new_text_columns.is_empty() {
3895                            new_text_columns.sort();
3896                            let new_text_columns = new_text_columns
3897                                .into_iter()
3898                                .map(WithOptionValue::UnresolvedItemName)
3899                                .collect();
3900
3901                            curr_options.push(PgConfigOption {
3902                                name: PgConfigOptionName::TextColumns,
3903                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3904                            });
3905                        }
3906                    }
3907                    CreateSourceConnection::MySql {
3908                        options: curr_options,
3909                        ..
3910                    } => {
3911                        let mz_sql::plan::MySqlConfigOptionExtracted {
3912                            mut text_columns,
3913                            mut exclude_columns,
3914                            ..
3915                        } = curr_options.clone().try_into()?;
3916
3917                        // Drop both ignore and text columns; we will add them back in
3918                        // as appropriate below.
3919                        curr_options.retain(|o| {
3920                            !matches!(
3921                                o.name,
3922                                MySqlConfigOptionName::TextColumns
3923                                    | MySqlConfigOptionName::ExcludeColumns
3924                            )
3925                        });
3926
3927                        // Drop all text / exclude columns that are not currently referred to.
3928                        let column_referenced =
3929                            |column_qualified_reference: &UnresolvedItemName| {
3930                                mz_ore::soft_assert_eq_or_log!(
3931                                    column_qualified_reference.0.len(),
3932                                    3,
3933                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3934                                );
3935                                let mut table = column_qualified_reference.clone();
3936                                table.0.truncate(2);
3937                                curr_references.contains(&table)
3938                            };
3939                        text_columns.retain(column_referenced);
3940                        exclude_columns.retain(column_referenced);
3941
3942                        // Merge the current text / exclude columns into the new text / exclude columns.
3943                        new_text_columns.extend(text_columns);
3944                        new_exclude_columns.extend(exclude_columns);
3945
3946                        // If we have text columns, add them to the options.
3947                        if !new_text_columns.is_empty() {
3948                            new_text_columns.sort();
3949                            let new_text_columns = new_text_columns
3950                                .into_iter()
3951                                .map(WithOptionValue::UnresolvedItemName)
3952                                .collect();
3953
3954                            curr_options.push(MySqlConfigOption {
3955                                name: MySqlConfigOptionName::TextColumns,
3956                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3957                            });
3958                        }
3959                        // If we have exclude columns, add them to the options.
3960                        if !new_exclude_columns.is_empty() {
3961                            new_exclude_columns.sort();
3962                            let new_exclude_columns = new_exclude_columns
3963                                .into_iter()
3964                                .map(WithOptionValue::UnresolvedItemName)
3965                                .collect();
3966
3967                            curr_options.push(MySqlConfigOption {
3968                                name: MySqlConfigOptionName::ExcludeColumns,
3969                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3970                            });
3971                        }
3972                    }
3973                    CreateSourceConnection::SqlServer {
3974                        options: curr_options,
3975                        ..
3976                    } => {
3977                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3978                            mut text_columns,
3979                            mut exclude_columns,
3980                            ..
3981                        } = curr_options.clone().try_into()?;
3982
3983                        // Drop both ignore and text columns; we will add them back in
3984                        // as appropriate below.
3985                        curr_options.retain(|o| {
3986                            !matches!(
3987                                o.name,
3988                                SqlServerConfigOptionName::TextColumns
3989                                    | SqlServerConfigOptionName::ExcludeColumns
3990                            )
3991                        });
3992
3993                        // Drop all text / exclude columns that are not currently referred to.
3994                        let column_referenced =
3995                            |column_qualified_reference: &UnresolvedItemName| {
3996                                mz_ore::soft_assert_eq_or_log!(
3997                                    column_qualified_reference.0.len(),
3998                                    3,
3999                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4000                                );
4001                                let mut table = column_qualified_reference.clone();
4002                                table.0.truncate(2);
4003                                curr_references.contains(&table)
4004                            };
4005                        text_columns.retain(column_referenced);
4006                        exclude_columns.retain(column_referenced);
4007
4008                        // Merge the current text / exclude columns into the new text / exclude columns.
4009                        new_text_columns.extend(text_columns);
4010                        new_exclude_columns.extend(exclude_columns);
4011
4012                        // If we have text columns, add them to the options.
4013                        if !new_text_columns.is_empty() {
4014                            new_text_columns.sort();
4015                            let new_text_columns = new_text_columns
4016                                .into_iter()
4017                                .map(WithOptionValue::UnresolvedItemName)
4018                                .collect();
4019
4020                            curr_options.push(SqlServerConfigOption {
4021                                name: SqlServerConfigOptionName::TextColumns,
4022                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4023                            });
4024                        }
4025                        // If we have exclude columns, add them to the options.
4026                        if !new_exclude_columns.is_empty() {
4027                            new_exclude_columns.sort();
4028                            let new_exclude_columns = new_exclude_columns
4029                                .into_iter()
4030                                .map(WithOptionValue::UnresolvedItemName)
4031                                .collect();
4032
4033                            curr_options.push(SqlServerConfigOption {
4034                                name: SqlServerConfigOptionName::ExcludeColumns,
4035                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4036                            });
4037                        }
4038                    }
4039                    _ => return Err(purification_err()),
4040                };
4041
4042                let mut catalog = self.catalog().for_system_session();
4043                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4044
4045                // Re-define our source in terms of the amended statement
4046                let plan = match mz_sql::plan::plan(
4047                    None,
4048                    &catalog,
4049                    Statement::CreateSource(create_source_stmt),
4050                    &Params::empty(),
4051                    &resolved_ids,
4052                )
4053                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4054                {
4055                    Plan::CreateSource(plan) => plan,
4056                    _ => unreachable!("create source plan is only valid response"),
4057                };
4058
4059                // Asserting that we've done the right thing with dependencies
4060                // here requires mocking out objects in the catalog, which is a
4061                // large task for an operation we have to cover in tests anyway.
4062                let source = Source::new(
4063                    plan,
4064                    cur_source.global_id,
4065                    resolved_ids,
4066                    cur_source.custom_logical_compaction_window,
4067                    cur_source.is_retained_metrics_object,
4068                );
4069
4070                // Get new ingestion description for storage.
4071                let desc = match &source.data_source {
4072                    DataSourceDesc::Ingestion { desc, .. }
4073                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4074                        desc.clone().into_inline_connection(self.catalog().state())
4075                    }
4076                    _ => unreachable!("already verified of type ingestion"),
4077                };
4078
4079                self.controller
4080                    .storage
4081                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4082                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4083
4084                // Redefine source. This must be done before we create any new
4085                // subsources so that it has the right ingestion.
4086                let mut ops = vec![catalog::Op::UpdateItem {
4087                    id: item_id,
4088                    // Look this up again so we don't have to hold an immutable reference to the
4089                    // entry for so long.
4090                    name: self.catalog.get_entry(&item_id).name().clone(),
4091                    to_item: CatalogItem::Source(source),
4092                }];
4093
4094                let CreateSourceInner {
4095                    ops: new_ops,
4096                    sources: _,
4097                    if_not_exists_ids,
4098                } = self.create_source_inner(session, subsources).await?;
4099
4100                ops.extend(new_ops.into_iter());
4101
4102                assert!(
4103                    if_not_exists_ids.is_empty(),
4104                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4105                );
4106
4107                self.catalog_transact(Some(session), ops).await?;
4108            }
4109            plan::AlterSourceAction::RefreshReferences { references } => {
4110                self.catalog_transact(
4111                    Some(session),
4112                    vec![catalog::Op::UpdateSourceReferences {
4113                        source_id: item_id,
4114                        references: references.into(),
4115                    }],
4116                )
4117                .await?;
4118            }
4119        }
4120
4121        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4122    }
4123
4124    #[instrument]
4125    pub(super) async fn sequence_alter_system_set(
4126        &mut self,
4127        session: &Session,
4128        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4129    ) -> Result<ExecuteResponse, AdapterError> {
4130        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4131        // We want to ensure that the network policy we're switching too actually exists.
4132        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4133            self.validate_alter_system_network_policy(session, &value)?;
4134        }
4135
4136        let op = match value {
4137            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4138                name: name.clone(),
4139                value: OwnedVarInput::SqlSet(values),
4140            },
4141            plan::VariableValue::Default => {
4142                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4143            }
4144        };
4145        self.catalog_transact(Some(session), vec![op]).await?;
4146
4147        session.add_notice(AdapterNotice::VarDefaultUpdated {
4148            role: None,
4149            var_name: Some(name),
4150        });
4151        Ok(ExecuteResponse::AlteredSystemConfiguration)
4152    }
4153
4154    #[instrument]
4155    pub(super) async fn sequence_alter_system_reset(
4156        &mut self,
4157        session: &Session,
4158        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4159    ) -> Result<ExecuteResponse, AdapterError> {
4160        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4161        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4162        self.catalog_transact(Some(session), vec![op]).await?;
4163        session.add_notice(AdapterNotice::VarDefaultUpdated {
4164            role: None,
4165            var_name: Some(name),
4166        });
4167        Ok(ExecuteResponse::AlteredSystemConfiguration)
4168    }
4169
4170    #[instrument]
4171    pub(super) async fn sequence_alter_system_reset_all(
4172        &mut self,
4173        session: &Session,
4174        _: plan::AlterSystemResetAllPlan,
4175    ) -> Result<ExecuteResponse, AdapterError> {
4176        self.is_user_allowed_to_alter_system(session, None)?;
4177        let op = catalog::Op::ResetAllSystemConfiguration;
4178        self.catalog_transact(Some(session), vec![op]).await?;
4179        session.add_notice(AdapterNotice::VarDefaultUpdated {
4180            role: None,
4181            var_name: None,
4182        });
4183        Ok(ExecuteResponse::AlteredSystemConfiguration)
4184    }
4185
4186    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4187    fn is_user_allowed_to_alter_system(
4188        &self,
4189        session: &Session,
4190        var_name: Option<&str>,
4191    ) -> Result<(), AdapterError> {
4192        match (session.user().kind(), var_name) {
4193            // Only internal superusers can reset all system variables.
4194            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4195            // Whether or not a variable can be modified depends if we're an internal superuser.
4196            (UserKind::Superuser, Some(name))
4197                if session.user().is_internal()
4198                    || self.catalog().system_config().user_modifiable(name) =>
4199            {
4200                // In lieu of plumbing the user to all system config functions, just check that
4201                // the var is visible.
4202                let var = self.catalog().system_config().get(name)?;
4203                var.visible(session.user(), self.catalog().system_config())?;
4204                Ok(())
4205            }
4206            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4207            // session variables.
4208            (UserKind::Regular, Some(name))
4209                if self.catalog().system_config().user_modifiable(name) =>
4210            {
4211                Err(AdapterError::Unauthorized(
4212                    rbac::UnauthorizedError::Superuser {
4213                        action: format!("toggle the '{name}' system configuration parameter"),
4214                    },
4215                ))
4216            }
4217            _ => Err(AdapterError::Unauthorized(
4218                rbac::UnauthorizedError::MzSystem {
4219                    action: "alter system".into(),
4220                },
4221            )),
4222        }
4223    }
4224
4225    fn validate_alter_system_network_policy(
4226        &self,
4227        session: &Session,
4228        policy_value: &plan::VariableValue,
4229    ) -> Result<(), AdapterError> {
4230        let policy_name = match &policy_value {
4231            // Make sure the compiled in default still exists.
4232            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4233            plan::VariableValue::Values(values) if values.len() == 1 => {
4234                values.iter().next().cloned()
4235            }
4236            plan::VariableValue::Values(values) => {
4237                tracing::warn!(?values, "can't set multiple network policies at once");
4238                None
4239            }
4240        };
4241        let maybe_network_policy = policy_name
4242            .as_ref()
4243            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4244        let Some(network_policy) = maybe_network_policy else {
4245            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4246                VarError::InvalidParameterValue {
4247                    name: NETWORK_POLICY.name(),
4248                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4249                    reason: "no network policy with such name exists".to_string(),
4250                },
4251            )));
4252        };
4253        self.validate_alter_network_policy(session, &network_policy.rules)
4254    }
4255
4256    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4257    ///
4258    /// This helps prevent users from modifying network policies in a way that would lock out their
4259    /// current connection.
4260    fn validate_alter_network_policy(
4261        &self,
4262        session: &Session,
4263        policy_rules: &Vec<NetworkPolicyRule>,
4264    ) -> Result<(), AdapterError> {
4265        // If the user is not an internal user attempt to protect them from
4266        // blocking themselves.
4267        if session.user().is_internal() {
4268            return Ok(());
4269        }
4270        if let Some(ip) = session.meta().client_ip() {
4271            validate_ip_with_policy_rules(ip, policy_rules)
4272                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4273        } else {
4274            // Sessions without IPs are only temporarily constructed for default values
4275            // they should not be permitted here.
4276            return Err(AdapterError::NetworkPolicyDenied(
4277                NetworkPolicyError::MissingIp,
4278            ));
4279        }
4280        Ok(())
4281    }
4282
4283    // Returns the name of the portal to execute.
4284    #[instrument]
4285    pub(super) fn sequence_execute(
4286        &self,
4287        session: &mut Session,
4288        plan: plan::ExecutePlan,
4289    ) -> Result<String, AdapterError> {
4290        // Verify the stmt is still valid.
4291        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4292        let ps = session
4293            .get_prepared_statement_unverified(&plan.name)
4294            .expect("known to exist");
4295        let stmt = ps.stmt().cloned();
4296        let desc = ps.desc().clone();
4297        let state_revision = ps.state_revision;
4298        let logging = Arc::clone(ps.logging());
4299        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4300    }
4301
4302    #[instrument]
4303    pub(super) async fn sequence_grant_privileges(
4304        &mut self,
4305        session: &Session,
4306        plan::GrantPrivilegesPlan {
4307            update_privileges,
4308            grantees,
4309        }: plan::GrantPrivilegesPlan,
4310    ) -> Result<ExecuteResponse, AdapterError> {
4311        self.sequence_update_privileges(
4312            session,
4313            update_privileges,
4314            grantees,
4315            UpdatePrivilegeVariant::Grant,
4316        )
4317        .await
4318    }
4319
4320    #[instrument]
4321    pub(super) async fn sequence_revoke_privileges(
4322        &mut self,
4323        session: &Session,
4324        plan::RevokePrivilegesPlan {
4325            update_privileges,
4326            revokees,
4327        }: plan::RevokePrivilegesPlan,
4328    ) -> Result<ExecuteResponse, AdapterError> {
4329        self.sequence_update_privileges(
4330            session,
4331            update_privileges,
4332            revokees,
4333            UpdatePrivilegeVariant::Revoke,
4334        )
4335        .await
4336    }
4337
4338    #[instrument]
4339    async fn sequence_update_privileges(
4340        &mut self,
4341        session: &Session,
4342        update_privileges: Vec<UpdatePrivilege>,
4343        grantees: Vec<RoleId>,
4344        variant: UpdatePrivilegeVariant,
4345    ) -> Result<ExecuteResponse, AdapterError> {
4346        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4347        let mut warnings = Vec::new();
4348        let catalog = self.catalog().for_session(session);
4349
4350        for UpdatePrivilege {
4351            acl_mode,
4352            target_id,
4353            grantor,
4354        } in update_privileges
4355        {
4356            let actual_object_type = catalog.get_system_object_type(&target_id);
4357            // For all relations we allow all applicable table privileges, but send a warning if the
4358            // privilege isn't actually applicable to the object type.
4359            if actual_object_type.is_relation() {
4360                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4361                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4362                if !non_applicable_privileges.is_empty() {
4363                    let object_description =
4364                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4365                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4366                        non_applicable_privileges,
4367                        object_description,
4368                    })
4369                }
4370            }
4371
4372            if let SystemObjectId::Object(object_id) = &target_id {
4373                self.catalog()
4374                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4375            }
4376
4377            let privileges = self
4378                .catalog()
4379                .get_privileges(&target_id, session.conn_id())
4380                // Should be unreachable since the parser will refuse to parse grant/revoke
4381                // statements on objects without privileges.
4382                .ok_or(AdapterError::Unsupported(
4383                    "GRANTs/REVOKEs on an object type with no privileges",
4384                ))?;
4385
4386            for grantee in &grantees {
4387                self.catalog().ensure_not_system_role(grantee)?;
4388                self.catalog().ensure_not_predefined_role(grantee)?;
4389                let existing_privilege = privileges
4390                    .get_acl_item(grantee, &grantor)
4391                    .map(Cow::Borrowed)
4392                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4393
4394                match variant {
4395                    UpdatePrivilegeVariant::Grant
4396                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4397                    {
4398                        ops.push(catalog::Op::UpdatePrivilege {
4399                            target_id: target_id.clone(),
4400                            privilege: MzAclItem {
4401                                grantee: *grantee,
4402                                grantor,
4403                                acl_mode,
4404                            },
4405                            variant,
4406                        });
4407                    }
4408                    UpdatePrivilegeVariant::Revoke
4409                        if !existing_privilege
4410                            .acl_mode
4411                            .intersection(acl_mode)
4412                            .is_empty() =>
4413                    {
4414                        ops.push(catalog::Op::UpdatePrivilege {
4415                            target_id: target_id.clone(),
4416                            privilege: MzAclItem {
4417                                grantee: *grantee,
4418                                grantor,
4419                                acl_mode,
4420                            },
4421                            variant,
4422                        });
4423                    }
4424                    // no-op
4425                    _ => {}
4426                }
4427            }
4428        }
4429
4430        if ops.is_empty() {
4431            session.add_notices(warnings);
4432            return Ok(variant.into());
4433        }
4434
4435        let res = self
4436            .catalog_transact(Some(session), ops)
4437            .await
4438            .map(|_| match variant {
4439                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4440                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4441            });
4442        if res.is_ok() {
4443            session.add_notices(warnings);
4444        }
4445        res
4446    }
4447
4448    #[instrument]
4449    pub(super) async fn sequence_alter_default_privileges(
4450        &mut self,
4451        session: &Session,
4452        plan::AlterDefaultPrivilegesPlan {
4453            privilege_objects,
4454            privilege_acl_items,
4455            is_grant,
4456        }: plan::AlterDefaultPrivilegesPlan,
4457    ) -> Result<ExecuteResponse, AdapterError> {
4458        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4459        let variant = if is_grant {
4460            UpdatePrivilegeVariant::Grant
4461        } else {
4462            UpdatePrivilegeVariant::Revoke
4463        };
4464        for privilege_object in &privilege_objects {
4465            self.catalog()
4466                .ensure_not_system_role(&privilege_object.role_id)?;
4467            self.catalog()
4468                .ensure_not_predefined_role(&privilege_object.role_id)?;
4469            if let Some(database_id) = privilege_object.database_id {
4470                self.catalog()
4471                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4472            }
4473            if let Some(schema_id) = privilege_object.schema_id {
4474                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4475                let schema_spec: SchemaSpecifier = schema_id.into();
4476
4477                self.catalog().ensure_not_reserved_object(
4478                    &(database_spec, schema_spec).into(),
4479                    session.conn_id(),
4480                )?;
4481            }
4482            for privilege_acl_item in &privilege_acl_items {
4483                self.catalog()
4484                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4485                self.catalog()
4486                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4487                ops.push(catalog::Op::UpdateDefaultPrivilege {
4488                    privilege_object: privilege_object.clone(),
4489                    privilege_acl_item: privilege_acl_item.clone(),
4490                    variant,
4491                })
4492            }
4493        }
4494
4495        self.catalog_transact(Some(session), ops).await?;
4496        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4497    }
4498
4499    #[instrument]
4500    pub(super) async fn sequence_grant_role(
4501        &mut self,
4502        session: &Session,
4503        plan::GrantRolePlan {
4504            role_ids,
4505            member_ids,
4506            grantor_id,
4507        }: plan::GrantRolePlan,
4508    ) -> Result<ExecuteResponse, AdapterError> {
4509        let catalog = self.catalog();
4510        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4511        for role_id in role_ids {
4512            for member_id in &member_ids {
4513                let member_membership: BTreeSet<_> =
4514                    catalog.get_role(member_id).membership().keys().collect();
4515                if member_membership.contains(&role_id) {
4516                    let role_name = catalog.get_role(&role_id).name().to_string();
4517                    let member_name = catalog.get_role(member_id).name().to_string();
4518                    // We need this check so we don't accidentally return a success on a reserved role.
4519                    catalog.ensure_not_reserved_role(member_id)?;
4520                    catalog.ensure_grantable_role(&role_id)?;
4521                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4522                        role_name,
4523                        member_name,
4524                    });
4525                } else {
4526                    ops.push(catalog::Op::GrantRole {
4527                        role_id,
4528                        member_id: *member_id,
4529                        grantor_id,
4530                    });
4531                }
4532            }
4533        }
4534
4535        if ops.is_empty() {
4536            return Ok(ExecuteResponse::GrantedRole);
4537        }
4538
4539        self.catalog_transact(Some(session), ops)
4540            .await
4541            .map(|_| ExecuteResponse::GrantedRole)
4542    }
4543
4544    #[instrument]
4545    pub(super) async fn sequence_revoke_role(
4546        &mut self,
4547        session: &Session,
4548        plan::RevokeRolePlan {
4549            role_ids,
4550            member_ids,
4551            grantor_id,
4552        }: plan::RevokeRolePlan,
4553    ) -> Result<ExecuteResponse, AdapterError> {
4554        let catalog = self.catalog();
4555        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4556        for role_id in role_ids {
4557            for member_id in &member_ids {
4558                let member_membership: BTreeSet<_> =
4559                    catalog.get_role(member_id).membership().keys().collect();
4560                if !member_membership.contains(&role_id) {
4561                    let role_name = catalog.get_role(&role_id).name().to_string();
4562                    let member_name = catalog.get_role(member_id).name().to_string();
4563                    // We need this check so we don't accidentally return a success on a reserved role.
4564                    catalog.ensure_not_reserved_role(member_id)?;
4565                    catalog.ensure_grantable_role(&role_id)?;
4566                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4567                        role_name,
4568                        member_name,
4569                    });
4570                } else {
4571                    ops.push(catalog::Op::RevokeRole {
4572                        role_id,
4573                        member_id: *member_id,
4574                        grantor_id,
4575                    });
4576                }
4577            }
4578        }
4579
4580        if ops.is_empty() {
4581            return Ok(ExecuteResponse::RevokedRole);
4582        }
4583
4584        self.catalog_transact(Some(session), ops)
4585            .await
4586            .map(|_| ExecuteResponse::RevokedRole)
4587    }
4588
4589    #[instrument]
4590    pub(super) async fn sequence_alter_owner(
4591        &mut self,
4592        session: &Session,
4593        plan::AlterOwnerPlan {
4594            id,
4595            object_type,
4596            new_owner,
4597        }: plan::AlterOwnerPlan,
4598    ) -> Result<ExecuteResponse, AdapterError> {
4599        let mut ops = vec![catalog::Op::UpdateOwner {
4600            id: id.clone(),
4601            new_owner,
4602        }];
4603
4604        match &id {
4605            ObjectId::Item(global_id) => {
4606                let entry = self.catalog().get_entry(global_id);
4607
4608                // Cannot directly change the owner of an index.
4609                if entry.is_index() {
4610                    let name = self
4611                        .catalog()
4612                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4613                        .to_string();
4614                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4615                    return Ok(ExecuteResponse::AlteredObject(object_type));
4616                }
4617
4618                // Alter owner cascades down to dependent indexes.
4619                let dependent_index_ops = entry
4620                    .used_by()
4621                    .into_iter()
4622                    .filter(|id| self.catalog().get_entry(id).is_index())
4623                    .map(|id| catalog::Op::UpdateOwner {
4624                        id: ObjectId::Item(*id),
4625                        new_owner,
4626                    });
4627                ops.extend(dependent_index_ops);
4628
4629                // Alter owner cascades down to progress collections.
4630                let dependent_subsources =
4631                    entry
4632                        .progress_id()
4633                        .into_iter()
4634                        .map(|item_id| catalog::Op::UpdateOwner {
4635                            id: ObjectId::Item(item_id),
4636                            new_owner,
4637                        });
4638                ops.extend(dependent_subsources);
4639            }
4640            ObjectId::Cluster(cluster_id) => {
4641                let cluster = self.catalog().get_cluster(*cluster_id);
4642                // Alter owner cascades down to cluster replicas.
4643                let managed_cluster_replica_ops =
4644                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4645                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4646                        new_owner,
4647                    });
4648                ops.extend(managed_cluster_replica_ops);
4649            }
4650            _ => {}
4651        }
4652
4653        self.catalog_transact(Some(session), ops)
4654            .await
4655            .map(|_| ExecuteResponse::AlteredObject(object_type))
4656    }
4657
4658    #[instrument]
4659    pub(super) async fn sequence_reassign_owned(
4660        &mut self,
4661        session: &Session,
4662        plan::ReassignOwnedPlan {
4663            old_roles,
4664            new_role,
4665            reassign_ids,
4666        }: plan::ReassignOwnedPlan,
4667    ) -> Result<ExecuteResponse, AdapterError> {
4668        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4669            self.catalog().ensure_not_reserved_role(role_id)?;
4670        }
4671
4672        let ops = reassign_ids
4673            .into_iter()
4674            .map(|id| catalog::Op::UpdateOwner {
4675                id,
4676                new_owner: new_role,
4677            })
4678            .collect();
4679
4680        self.catalog_transact(Some(session), ops)
4681            .await
4682            .map(|_| ExecuteResponse::ReassignOwned)
4683    }
4684
4685    #[instrument]
4686    pub(crate) async fn handle_deferred_statement(&mut self) {
4687        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4688        // was processed, removing the single element from deferred_statements, so it is expected
4689        // that this is sometimes empty.
4690        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4691            return;
4692        };
4693        match ps {
4694            crate::coord::PlanStatement::Statement { stmt, params } => {
4695                self.handle_execute_inner(stmt, params, ctx).await;
4696            }
4697            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4698                self.sequence_plan(ctx, plan, resolved_ids).await;
4699            }
4700        }
4701    }
4702
4703    #[instrument]
4704    // TODO(parkmycar): Remove this once we have an actual implementation.
4705    #[allow(clippy::unused_async)]
4706    pub(super) async fn sequence_alter_table(
4707        &mut self,
4708        ctx: &mut ExecuteContext,
4709        plan: plan::AlterTablePlan,
4710    ) -> Result<ExecuteResponse, AdapterError> {
4711        let plan::AlterTablePlan {
4712            relation_id,
4713            column_name,
4714            column_type,
4715            raw_sql_type,
4716        } = plan;
4717
4718        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4719        let id_ts = self.get_catalog_write_ts().await;
4720        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4721        let ops = vec![catalog::Op::AlterAddColumn {
4722            id: relation_id,
4723            new_global_id,
4724            name: column_name,
4725            typ: column_type,
4726            sql: raw_sql_type,
4727        }];
4728
4729        self.catalog_transact_with_context(None, Some(ctx), ops)
4730            .await?;
4731
4732        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4733    }
4734
4735    /// Prepares to apply a replacement materialized view.
4736    #[instrument]
4737    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4738        &mut self,
4739        ctx: ExecuteContext,
4740        plan: AlterMaterializedViewApplyReplacementPlan,
4741    ) {
4742        // To ensure there is no time gap in the output, we can only apply a replacement if the
4743        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4744        // might not be the case initially, so we have to wait. To this end, we install a watch set
4745        // waiting for the target MV's write frontier to advance sufficiently.
4746        //
4747        // Note that the replacement's dataflow is not performing any writes, so it can only be
4748        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4749        // replacement's write frontier is always <= the target's.
4750
4751        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4752
4753        let plan_validity = PlanValidity::new(
4754            self.catalog().transient_revision(),
4755            BTreeSet::from_iter([id, replacement_id]),
4756            None,
4757            None,
4758            ctx.session().role_metadata().clone(),
4759        );
4760
4761        let target = self.catalog.get_entry(&id);
4762        let target_gid = target.latest_global_id();
4763
4764        let replacement = self.catalog.get_entry(&replacement_id);
4765        let replacement_gid = replacement.latest_global_id();
4766
4767        let target_upper = self
4768            .controller
4769            .storage_collections
4770            .collection_frontiers(target_gid)
4771            .expect("target MV exists")
4772            .write_frontier;
4773        let replacement_upper = self
4774            .controller
4775            .compute
4776            .collection_frontiers(replacement_gid, replacement.cluster_id())
4777            .expect("replacement MV exists")
4778            .write_frontier;
4779
4780        info!(
4781            %id, %replacement_id, ?target_upper, ?replacement_upper,
4782            "preparing materialized view replacement application",
4783        );
4784
4785        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4786            // A replacement's write frontier can only become empty if the target's write frontier
4787            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4788            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4789            // the user by returning an error, rather than applying the useless replacement.
4790            //
4791            // Note that we can't assert on `target_upper` being empty here, because the reporting
4792            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4793            // from persist, which we cannot do without incurring I/O.
4794            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4795                name: target.name().item.clone(),
4796            }));
4797            return;
4798        };
4799
4800        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4801        // specified timestamp. Since we only need to wait until the target frontier is >= the
4802        // replacement's frontier, we can step back the timestamp.
4803        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4804
4805        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4806        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4807        // command.
4808        self.install_storage_watch_set(
4809            ctx.session().conn_id().clone(),
4810            BTreeSet::from_iter([target_gid]),
4811            replacement_upper_ts,
4812            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4813                ctx: Some(ctx),
4814                otel_ctx: OpenTelemetryContext::obtain(),
4815                plan,
4816                plan_validity,
4817            }),
4818        )
4819        .expect("target collection exists");
4820    }
4821
4822    /// Finishes applying a replacement materialized view after the frontier wait completed.
4823    #[instrument]
4824    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4825        &mut self,
4826        mut ctx: AlterMaterializedViewReadyContext,
4827    ) {
4828        ctx.otel_ctx.attach_as_parent();
4829
4830        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4831
4832        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4833        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4834        // world has arbitrarily changed since we performed planning, and we must re-assert
4835        // that it still matches our requirements.
4836        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4837            ctx.retire(Err(err));
4838            return;
4839        }
4840
4841        info!(
4842            %id, %replacement_id,
4843            "finishing materialized view replacement application",
4844        );
4845
4846        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4847        match self
4848            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4849            .await
4850        {
4851            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4852                ObjectType::MaterializedView,
4853            ))),
4854            Err(err) => ctx.retire(Err(err)),
4855        }
4856    }
4857
4858    pub(super) async fn statistics_oracle(
4859        &self,
4860        session: &Session,
4861        source_ids: &BTreeSet<GlobalId>,
4862        query_as_of: &Antichain<Timestamp>,
4863        is_oneshot: bool,
4864    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4865        super::statistics_oracle(
4866            session,
4867            source_ids,
4868            query_as_of,
4869            is_oneshot,
4870            self.catalog().system_config(),
4871            self.controller.storage_collections.as_ref(),
4872        )
4873        .await
4874    }
4875}
4876
4877impl Coordinator {
4878    /// Process the metainfo from a newly created non-transient dataflow.
4879    async fn process_dataflow_metainfo(
4880        &mut self,
4881        df_meta: DataflowMetainfo,
4882        export_id: GlobalId,
4883        ctx: Option<&mut ExecuteContext>,
4884        notice_ids: Vec<GlobalId>,
4885    ) -> Option<BuiltinTableAppendNotify> {
4886        // Emit raw notices to the user.
4887        if let Some(ctx) = ctx {
4888            emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4889        }
4890
4891        // Create a metainfo with rendered notices.
4892        let df_meta = self
4893            .catalog()
4894            .render_notices(df_meta, notice_ids, Some(export_id));
4895
4896        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4897        // in-memory state.
4898        if self.catalog().state().system_config().enable_mz_notices()
4899            && !df_meta.optimizer_notices.is_empty()
4900        {
4901            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4902            self.catalog().state().pack_optimizer_notices(
4903                &mut builtin_table_updates,
4904                df_meta.optimizer_notices.iter(),
4905                Diff::ONE,
4906            );
4907
4908            // Save the metainfo.
4909            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4910
4911            Some(
4912                self.builtin_table_update()
4913                    .execute(builtin_table_updates)
4914                    .await
4915                    .0,
4916            )
4917        } else {
4918            // Save the metainfo.
4919            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4920
4921            None
4922        }
4923    }
4924}