Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, Eval, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44    RowIterator, Timestamp,
45};
46use mz_secrets::SecretsReader;
47use mz_sql::ast::{
48    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
49    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
50    SqlServerConfigOptionName,
51};
52use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
53use mz_sql::catalog::{
54    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
55    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
56    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
57};
58use mz_sql::names::{
59    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
60    SchemaSpecifier, SystemObjectId,
61};
62use mz_sql::plan::{
63    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
64    StatementContext,
65};
66use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
67use mz_storage_types::sinks::StorageSinkDesc;
68// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
69use mz_sql::plan::{
70    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
78    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85    WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::ExportDescription;
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_transform::dataflow::DataflowMetainfo;
93use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
94use smallvec::SmallVec;
95use timely::progress::Antichain;
96use tokio::sync::{oneshot, watch};
97use tracing::{Instrument, Span, info, warn};
98
99use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
100use crate::command::{ExecuteResponse, Response};
101use crate::coord::appends::{
102    BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn, UserWriteResponder,
103};
104use crate::coord::read_then_write::validate_read_then_write_dependencies;
105use crate::coord::sequencer::emit_optimizer_notices;
106use crate::coord::{
107    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
108    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
109    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
110    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
111    WatchSetResponse, validate_ip_with_policy_rules,
112};
113use crate::error::AdapterError;
114use crate::notice::{AdapterNotice, DroppedInUseIndex};
115use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
116use crate::optimize::{self, Optimize};
117use crate::session::{
118    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
119    WriteLocks, WriteOp,
120};
121use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
122use crate::{PeekResponseUnary, ReadHolds};
123
124/// A future that resolves to a real-time recency timestamp.
125type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
126
127mod cluster;
128mod copy_from;
129mod create_index;
130mod create_materialized_view;
131mod create_view;
132mod explain_timestamp;
133mod peek;
134mod secret;
135mod subscribe;
136
137/// Attempts to evaluate an expression. If an error is returned then the error is sent
138/// to the client and the function is exited.
139macro_rules! return_if_err {
140    ($expr:expr, $ctx:expr) => {
141        match $expr {
142            Ok(v) => v,
143            Err(e) => return $ctx.retire(Err(e.into())),
144        }
145    };
146}
147
148pub(super) use return_if_err;
149
150struct DropOps {
151    ops: Vec<catalog::Op>,
152    dropped_active_db: bool,
153    dropped_active_cluster: bool,
154    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
155}
156
157// A bundle of values returned from create_source_inner
158struct CreateSourceInner {
159    ops: Vec<catalog::Op>,
160    sources: Vec<(CatalogItemId, Source)>,
161    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
162}
163
164impl Coordinator {
165    /// Sequences a [Staged] plan.
166    ///
167    /// This is designed for plans that execute both on and off the coordinator
168    /// thread. Stages can either produce another stage to execute or a final
169    /// response. Maintains the connection-scoped cancel watch in
170    /// `connection_cancel_watches` while a stage is cancelable.
171    pub(crate) async fn sequence_staged<S>(
172        &mut self,
173        mut ctx: S::Ctx,
174        parent_span: Span,
175        mut stage: S,
176    ) where
177        S: Staged + 'static,
178        S::Ctx: Send + 'static,
179    {
180        return_if_err!(stage.validity().check(self.catalog()), ctx);
181        loop {
182            let mut cancel_enabled = stage.cancel_enabled();
183            if let Some(session) = ctx.session() {
184                if cancel_enabled {
185                    // Channel to await cancellation. Insert a new channel, but check if the previous one
186                    // was already canceled.
187                    if let Some((_prev_tx, prev_rx)) = self
188                        .connection_cancel_watches
189                        .insert(session.conn_id().clone(), watch::channel(false))
190                    {
191                        let was_canceled = *prev_rx.borrow();
192                        if was_canceled {
193                            ctx.retire(Err(AdapterError::Canceled));
194                            return;
195                        }
196                    }
197                } else {
198                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
199                    // when cancel_enabled may have been true on an earlier stage.
200                    self.connection_cancel_watches.remove(session.conn_id());
201                }
202            } else {
203                cancel_enabled = false
204            };
205            let next = stage
206                .stage(self, &mut ctx)
207                .instrument(parent_span.clone())
208                .await;
209            let res = return_if_err!(next, ctx);
210            stage = match res {
211                StageResult::Handle(handle) => {
212                    let internal_cmd_tx = self.internal_cmd_tx.clone();
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
214                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
215                    });
216                    return;
217                }
218                StageResult::HandleRetire(handle) => {
219                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
220                        ctx.retire(Ok(resp));
221                    });
222                    return;
223                }
224                StageResult::Response(resp) => {
225                    ctx.retire(Ok(resp));
226                    return;
227                }
228                StageResult::Immediate(stage) => *stage,
229            }
230        }
231    }
232
233    /// Waits for either the spawned stage work to complete or cancellation to
234    /// be signaled through the connection-scoped cancel watch.
235    fn handle_spawn<C, T, F>(
236        &self,
237        ctx: C,
238        handle: JoinHandle<Result<T, AdapterError>>,
239        cancel_enabled: bool,
240        f: F,
241    ) where
242        C: StagedContext + Send + 'static,
243        T: Send + 'static,
244        F: FnOnce(C, T) + Send + 'static,
245    {
246        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
247            .session()
248            .and_then(|session| self.connection_cancel_watches.get(session.conn_id()))
249        {
250            let mut rx = rx.clone();
251            Box::pin(async move {
252                // Wait for true or dropped sender.
253                let _ = rx.wait_for(|v| *v).await;
254                ()
255            })
256        } else {
257            Box::pin(future::pending())
258        };
259        spawn(|| "sequence_staged", async move {
260            tokio::select! {
261                res = handle => {
262                    let next = return_if_err!(res, ctx);
263                    f(ctx, next);
264                }
265                _ = rx, if cancel_enabled => {
266                    ctx.retire(Err(AdapterError::Canceled));
267                }
268            }
269        });
270    }
271
272    async fn create_source_inner(
273        &self,
274        session: &Session,
275        plans: Vec<plan::CreateSourcePlanBundle>,
276    ) -> Result<CreateSourceInner, AdapterError> {
277        let mut ops = vec![];
278        let mut sources = vec![];
279
280        let if_not_exists_ids = plans
281            .iter()
282            .filter_map(
283                |plan::CreateSourcePlanBundle {
284                     item_id,
285                     global_id: _,
286                     plan,
287                     resolved_ids: _,
288                     available_source_references: _,
289                 }| {
290                    if plan.if_not_exists {
291                        Some((*item_id, plan.name.clone()))
292                    } else {
293                        None
294                    }
295                },
296            )
297            .collect::<BTreeMap<_, _>>();
298
299        for plan::CreateSourcePlanBundle {
300            item_id,
301            global_id,
302            mut plan,
303            resolved_ids,
304            available_source_references,
305        } in plans
306        {
307            let name = plan.name.clone();
308
309            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
310            if let mz_sql::plan::DataSourceDesc::Webhook {
311                validate_using: Some(validate),
312                ..
313            } = &mut plan.source.data_source
314            {
315                if let Err(reason) = validate.reduce_expression().await {
316                    self.metrics
317                        .webhook_validation_reduce_failures
318                        .with_label_values(&[reason])
319                        .inc();
320                    return Err(AdapterError::Internal(format!(
321                        "failed to reduce check expression, {reason}"
322                    )));
323                }
324            }
325
326            // If this source contained a set of available source references, update the
327            // source references catalog table.
328            let mut reference_ops = vec![];
329            if let Some(references) = &available_source_references {
330                reference_ops.push(catalog::Op::UpdateSourceReferences {
331                    source_id: item_id,
332                    references: references.clone().into(),
333                });
334            }
335
336            let source = Source::new(plan, global_id, resolved_ids, None, false);
337            ops.push(catalog::Op::CreateItem {
338                id: item_id,
339                name,
340                item: CatalogItem::Source(source.clone()),
341                owner_id: *session.current_role_id(),
342            });
343            sources.push((item_id, source));
344            // These operations must be executed after the source is added to the catalog.
345            ops.extend(reference_ops);
346        }
347
348        Ok(CreateSourceInner {
349            ops,
350            sources,
351            if_not_exists_ids,
352        })
353    }
354
355    /// Subsources are planned differently from other statements because they
356    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
357    /// Because of this, we have usually "missed" the opportunity to plan them
358    /// through the normal statement execution life cycle (the exception being
359    /// during bootstrapping).
360    ///
361    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
362    pub(crate) fn plan_subsource(
363        &self,
364        session: &Session,
365        params: &mz_sql::plan::Params,
366        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
367        item_id: CatalogItemId,
368        global_id: GlobalId,
369    ) -> Result<CreateSourcePlanBundle, AdapterError> {
370        let catalog = self.catalog().for_session(session);
371        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
372
373        let (plan, _sql_impl_ids) = self.plan_statement(
374            session,
375            Statement::CreateSubsource(subsource_stmt),
376            params,
377            &resolved_ids,
378        )?;
379        let plan = match plan {
380            Plan::CreateSource(plan) => plan,
381            _ => unreachable!(),
382        };
383        Ok(CreateSourcePlanBundle {
384            item_id,
385            global_id,
386            plan,
387            resolved_ids,
388            available_source_references: None,
389        })
390    }
391
392    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
393    pub(crate) async fn plan_purified_alter_source_add_subsource(
394        &mut self,
395        session: &Session,
396        params: Params,
397        source_name: ResolvedItemName,
398        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
399        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
400    ) -> Result<(Plan, ResolvedIds), AdapterError> {
401        let mut subsource_plans = Vec::with_capacity(subsources.len());
402
403        // Generate subsource statements
404        let conn_catalog = self.catalog().for_system_session();
405        let pcx = plan::PlanContext::zero();
406        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
407
408        let entry = self.catalog().get_entry(source_name.item_id());
409        let source = entry.source().ok_or_else(|| {
410            AdapterError::internal(
411                "plan alter source",
412                format!("expected Source found {entry:?}"),
413            )
414        })?;
415
416        let item_id = entry.id();
417        let ingestion_id = source.global_id();
418        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
419
420        let ids = self
421            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
422            .await?;
423        for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
424            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
425            subsource_plans.push(s);
426        }
427
428        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
429            subsources: subsource_plans,
430            options,
431        };
432
433        Ok((
434            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
435                item_id,
436                ingestion_id,
437                action,
438            }),
439            ResolvedIds::empty(),
440        ))
441    }
442
443    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
444    pub(crate) fn plan_purified_alter_source_refresh_references(
445        &self,
446        _session: &Session,
447        _params: Params,
448        source_name: ResolvedItemName,
449        available_source_references: plan::SourceReferences,
450    ) -> Result<(Plan, ResolvedIds), AdapterError> {
451        let entry = self.catalog().get_entry(source_name.item_id());
452        let source = entry.source().ok_or_else(|| {
453            AdapterError::internal(
454                "plan alter source",
455                format!("expected Source found {entry:?}"),
456            )
457        })?;
458        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
459            references: available_source_references,
460        };
461
462        Ok((
463            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
464                item_id: entry.id(),
465                ingestion_id: source.global_id(),
466                action,
467            }),
468            ResolvedIds::empty(),
469        ))
470    }
471
472    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
473    /// the primary source, and any ingestion export subsources (e.g. PG
474    /// tables).
475    pub(crate) async fn plan_purified_create_source(
476        &mut self,
477        ctx: &ExecuteContext,
478        params: Params,
479        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
480        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
481        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
482        available_source_references: plan::SourceReferences,
483    ) -> Result<(Plan, ResolvedIds), AdapterError> {
484        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
485
486        // 1. First plan the progress subsource, if any.
487        if let Some(progress_stmt) = progress_stmt {
488            // The primary source depends on this subsource because the primary
489            // source needs to know its shard ID, and the easiest way of
490            // guaranteeing that the shard ID is discoverable is to create this
491            // collection first.
492            assert_none!(progress_stmt.of_source);
493            let (item_id, global_id) = self.allocate_user_id().await?;
494            let progress_plan =
495                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
496            let progress_full_name = self
497                .catalog()
498                .resolve_full_name(&progress_plan.plan.name, None);
499            let progress_subsource = ResolvedItemName::Item {
500                id: progress_plan.item_id,
501                qualifiers: progress_plan.plan.name.qualifiers.clone(),
502                full_name: progress_full_name,
503                print_id: true,
504                version: RelationVersionSelector::Latest,
505            };
506
507            create_source_plans.push(progress_plan);
508
509            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
510        }
511
512        let catalog = self.catalog().for_session(ctx.session());
513        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
514
515        let propagated_with_options: Vec<_> = source_stmt
516            .with_options
517            .iter()
518            .filter_map(|opt| match opt.name {
519                CreateSourceOptionName::TimestampInterval => None,
520                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
521                    name: CreateSubsourceOptionName::RetainHistory,
522                    value: opt.value.clone(),
523                }),
524            })
525            .collect();
526
527        // 2. Then plan the main source.
528        let source_plan = match self.plan_statement(
529            ctx.session(),
530            Statement::CreateSource(source_stmt),
531            &params,
532            &resolved_ids,
533        )? {
534            (Plan::CreateSource(plan), _sql_impl_ids) => plan,
535            (p, _) => unreachable!("s must be CreateSourcePlan but got {:?}", p),
536        };
537
538        let (item_id, global_id) = self.allocate_user_id().await?;
539
540        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
541        let of_source = ResolvedItemName::Item {
542            id: item_id,
543            qualifiers: source_plan.name.qualifiers.clone(),
544            full_name: source_full_name,
545            print_id: true,
546            version: RelationVersionSelector::Latest,
547        };
548
549        // Generate subsource statements
550        let conn_catalog = self.catalog().for_system_session();
551        let pcx = plan::PlanContext::zero();
552        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
553
554        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
555
556        for subsource_stmt in subsource_stmts.iter_mut() {
557            subsource_stmt
558                .with_options
559                .extend(propagated_with_options.iter().cloned())
560        }
561
562        create_source_plans.push(CreateSourcePlanBundle {
563            item_id,
564            global_id,
565            plan: source_plan,
566            resolved_ids: resolved_ids.clone(),
567            available_source_references: Some(available_source_references),
568        });
569
570        // 3. Finally, plan all the subsources
571        let ids = self
572            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
573            .await?;
574        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
575            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
576            create_source_plans.push(plan);
577        }
578
579        Ok((
580            Plan::CreateSources(create_source_plans),
581            ResolvedIds::empty(),
582        ))
583    }
584
585    #[instrument]
586    pub(super) async fn sequence_create_source(
587        &mut self,
588        ctx: &mut ExecuteContext,
589        plans: Vec<plan::CreateSourcePlanBundle>,
590    ) -> Result<ExecuteResponse, AdapterError> {
591        let CreateSourceInner {
592            ops,
593            sources,
594            if_not_exists_ids,
595        } = self.create_source_inner(ctx.session(), plans).await?;
596
597        let transact_result = self
598            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
599            .await;
600
601        // Check if any sources are webhook sources and report them as created.
602        for (item_id, source) in &sources {
603            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
604                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
605                    ctx.session()
606                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
607                }
608            }
609        }
610
611        match transact_result {
612            Ok(()) => Ok(ExecuteResponse::CreatedSource),
613            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
614                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
615            })) if if_not_exists_ids.contains_key(&id) => {
616                ctx.session()
617                    .add_notice(AdapterNotice::ObjectAlreadyExists {
618                        name: if_not_exists_ids[&id].item.clone(),
619                        ty: "source",
620                    });
621                Ok(ExecuteResponse::CreatedSource)
622            }
623            Err(err) => Err(err),
624        }
625    }
626
627    /// Applies `details.secret_content_guards()` by reading each guarded
628    /// secret's current contents. Must be called whenever a connection is
629    /// created or its options altered, before the change takes effect.
630    async fn check_connection_secret_content_guards(
631        &self,
632        details: &ConnectionDetails,
633    ) -> Result<(), AdapterError> {
634        for (secret_id, guard) in details.secret_content_guards() {
635            let contents = self.caching_secrets_reader.read_string(secret_id).await?;
636            guard(&contents)?;
637        }
638        Ok(())
639    }
640
641    /// Applies the secret-content guards of every connection that uses
642    /// `secret_id` against proposed new `contents` for that secret. Must be
643    /// called whenever a secret's contents change, before the new value is
644    /// persisted.
645    pub(super) fn check_secret_content_guards_of_dependents(
646        &self,
647        secret_id: CatalogItemId,
648        contents: &str,
649    ) -> Result<(), AdapterError> {
650        for dependent_id in self.catalog().get_entry(&secret_id).used_by() {
651            if let CatalogItem::Connection(conn) = self.catalog().get_entry(dependent_id).item() {
652                for (guarded_id, guard) in conn.details.secret_content_guards() {
653                    if guarded_id == secret_id {
654                        guard(contents)?;
655                    }
656                }
657            }
658        }
659        Ok(())
660    }
661
662    #[instrument]
663    pub(super) async fn sequence_create_connection(
664        &mut self,
665        mut ctx: ExecuteContext,
666        plan: plan::CreateConnectionPlan,
667        resolved_ids: ResolvedIds,
668    ) {
669        let (connection_id, connection_gid) = match self.allocate_user_id().await {
670            Ok(item_id) => item_id,
671            Err(err) => return ctx.retire(Err(err)),
672        };
673
674        match &plan.connection.details {
675            ConnectionDetails::Ssh { key_1, key_2, .. } => {
676                let key_1 = match key_1.as_key_pair() {
677                    Some(key_1) => key_1.clone(),
678                    None => {
679                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
680                            "the PUBLIC KEY 1 option cannot be explicitly specified"
681                        ))));
682                    }
683                };
684
685                let key_2 = match key_2.as_key_pair() {
686                    Some(key_2) => key_2.clone(),
687                    None => {
688                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
689                            "the PUBLIC KEY 2 option cannot be explicitly specified"
690                        ))));
691                    }
692                };
693
694                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
695                let secret = key_set.to_bytes();
696                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
697                    return ctx.retire(Err(err.into()));
698                }
699                self.caching_secrets_reader.invalidate(connection_id);
700            }
701            _ => (),
702        };
703
704        // Inspect guarded secrets as early as we can, before the connection is
705        // installed in the catalog.
706        if let Err(err) = self
707            .check_connection_secret_content_guards(&plan.connection.details)
708            .await
709        {
710            return ctx.retire(Err(err));
711        }
712
713        if plan.validate {
714            let internal_cmd_tx = self.internal_cmd_tx.clone();
715            let transient_revision = self.catalog().transient_revision();
716            let conn_id = ctx.session().conn_id().clone();
717            let otel_ctx = OpenTelemetryContext::obtain();
718            let role_metadata = ctx.session().role_metadata().clone();
719
720            let connection = plan
721                .connection
722                .details
723                .to_connection()
724                .into_inline_connection(self.catalog().state());
725
726            let current_storage_parameters = self.controller.storage.config().clone();
727            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
728                let result = match std::panic::AssertUnwindSafe(
729                    connection.validate(connection_id, &current_storage_parameters),
730                )
731                .ore_catch_unwind()
732                .await
733                {
734                    Ok(Ok(())) => Ok(plan),
735                    Ok(Err(err)) => Err(err.into()),
736                    Err(_panic) => {
737                        tracing::error!("connection validation panicked");
738                        Err(AdapterError::Internal(
739                            "connection validation panicked".into(),
740                        ))
741                    }
742                };
743
744                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
745                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
746                    CreateConnectionValidationReady {
747                        ctx,
748                        result,
749                        connection_id,
750                        connection_gid,
751                        plan_validity: PlanValidity::new(
752                            transient_revision,
753                            resolved_ids.items().copied().collect(),
754                            None,
755                            None,
756                            role_metadata,
757                        ),
758                        otel_ctx,
759                        resolved_ids: resolved_ids.clone(),
760                    },
761                ));
762                if let Err(e) = result {
763                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
764                }
765            });
766        } else {
767            let result = self
768                .sequence_create_connection_stage_finish(
769                    &mut ctx,
770                    connection_id,
771                    connection_gid,
772                    plan,
773                    resolved_ids,
774                )
775                .await;
776            ctx.retire(result);
777        }
778    }
779
780    #[instrument]
781    pub(crate) async fn sequence_create_connection_stage_finish(
782        &mut self,
783        ctx: &mut ExecuteContext,
784        connection_id: CatalogItemId,
785        connection_gid: GlobalId,
786        plan: plan::CreateConnectionPlan,
787        resolved_ids: ResolvedIds,
788    ) -> Result<ExecuteResponse, AdapterError> {
789        let ops = vec![catalog::Op::CreateItem {
790            id: connection_id,
791            name: plan.name.clone(),
792            item: CatalogItem::Connection(Connection {
793                create_sql: plan.connection.create_sql,
794                global_id: connection_gid,
795                details: plan.connection.details.clone(),
796                resolved_ids,
797            }),
798            owner_id: *ctx.session().current_role_id(),
799        }];
800
801        // VPC endpoint creation for AWS PrivateLink connections is now handled
802        // in apply_catalog_implications.
803        let conn_id = ctx.session().conn_id().clone();
804        let transact_result = self
805            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
806            .await;
807
808        match transact_result {
809            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
810            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
811                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
812            })) if plan.if_not_exists => {
813                // Clean up SSH key material if it was persisted, since the
814                // catalog item was not created.
815                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
816                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
817                        tracing::warn!(
818                            "Dropping SSH secret for existing connection has encountered an error: {}",
819                            e
820                        );
821                    } else {
822                        self.caching_secrets_reader.invalidate(connection_id);
823                    }
824                }
825                ctx.session()
826                    .add_notice(AdapterNotice::ObjectAlreadyExists {
827                        name: plan.name.item,
828                        ty: "connection",
829                    });
830                Ok(ExecuteResponse::CreatedConnection)
831            }
832            Err(err) => {
833                // Clean up SSH key material if it was persisted, since the
834                // catalog item was not created.
835                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
836                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
837                        tracing::warn!(
838                            "Dropping SSH secret for failed connection has encountered an error: {}",
839                            e
840                        );
841                    } else {
842                        self.caching_secrets_reader.invalidate(connection_id);
843                    }
844                }
845                Err(err)
846            }
847        }
848    }
849
850    #[instrument]
851    pub(super) async fn sequence_create_database(
852        &mut self,
853        session: &Session,
854        plan: plan::CreateDatabasePlan,
855    ) -> Result<ExecuteResponse, AdapterError> {
856        let ops = vec![catalog::Op::CreateDatabase {
857            name: plan.name.clone(),
858            owner_id: *session.current_role_id(),
859        }];
860        match self.catalog_transact(Some(session), ops).await {
861            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
862            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
863                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
864            })) if plan.if_not_exists => {
865                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
866                Ok(ExecuteResponse::CreatedDatabase)
867            }
868            Err(err) => Err(err),
869        }
870    }
871
872    #[instrument]
873    pub(super) async fn sequence_create_schema(
874        &mut self,
875        session: &Session,
876        plan: plan::CreateSchemaPlan,
877    ) -> Result<ExecuteResponse, AdapterError> {
878        let op = catalog::Op::CreateSchema {
879            database_id: plan.database_spec,
880            schema_name: plan.schema_name.clone(),
881            owner_id: *session.current_role_id(),
882        };
883        match self.catalog_transact(Some(session), vec![op]).await {
884            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
885            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
886                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
887            })) if plan.if_not_exists => {
888                session.add_notice(AdapterNotice::SchemaAlreadyExists {
889                    name: plan.schema_name,
890                });
891                Ok(ExecuteResponse::CreatedSchema)
892            }
893            Err(err) => Err(err),
894        }
895    }
896
897    /// Validates the role attributes for a `CREATE ROLE` statement.
898    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
899        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
900            if attributes.superuser.is_some() || attributes.password.is_some() {
901                return Err(AdapterError::UnavailableFeature {
902                    feature: "SUPERUSER and PASSWORD attributes".to_string(),
903                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
904                });
905            }
906        }
907        Ok(())
908    }
909
910    #[instrument]
911    pub(super) async fn sequence_create_role(
912        &mut self,
913        conn_id: Option<&ConnectionId>,
914        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
915    ) -> Result<ExecuteResponse, AdapterError> {
916        self.validate_role_attributes(&attributes.clone())?;
917        let op = catalog::Op::CreateRole { name, attributes };
918        self.catalog_transact_with_context(conn_id, None, vec![op])
919            .await
920            .map(|_| ExecuteResponse::CreatedRole)
921    }
922
923    #[instrument]
924    pub(super) async fn sequence_create_network_policy(
925        &mut self,
926        session: &Session,
927        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
928    ) -> Result<ExecuteResponse, AdapterError> {
929        let op = catalog::Op::CreateNetworkPolicy {
930            rules,
931            name,
932            owner_id: *session.current_role_id(),
933        };
934        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
935            .await
936            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
937    }
938
939    #[instrument]
940    pub(super) async fn sequence_alter_network_policy(
941        &mut self,
942        session: &Session,
943        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
944    ) -> Result<ExecuteResponse, AdapterError> {
945        // TODO(network_policy): Consider role based network policies here.
946        let current_network_policy_name =
947            self.catalog().system_config().default_network_policy_name();
948        // Check if the way we're alerting the policy is still valid for the current connection.
949        if current_network_policy_name == name {
950            self.validate_alter_network_policy(session, &rules)?;
951        }
952
953        let op = catalog::Op::AlterNetworkPolicy {
954            id,
955            rules,
956            name,
957            owner_id: *session.current_role_id(),
958        };
959        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
960            .await
961            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
962    }
963
964    #[instrument]
965    pub(super) async fn sequence_create_table(
966        &mut self,
967        ctx: &mut ExecuteContext,
968        plan: plan::CreateTablePlan,
969        resolved_ids: ResolvedIds,
970    ) -> Result<ExecuteResponse, AdapterError> {
971        let plan::CreateTablePlan {
972            name,
973            table,
974            if_not_exists,
975        } = plan;
976
977        let conn_id = if table.temporary {
978            Some(ctx.session().conn_id())
979        } else {
980            None
981        };
982        let (table_id, global_id) = self.allocate_user_id().await?;
983        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
984
985        let data_source = match table.data_source {
986            plan::TableDataSource::TableWrites { defaults } => {
987                TableDataSource::TableWrites { defaults }
988            }
989            plan::TableDataSource::DataSource {
990                desc: data_source_plan,
991                timeline,
992            } => match data_source_plan {
993                plan::DataSourceDesc::IngestionExport {
994                    ingestion_id,
995                    external_reference,
996                    details,
997                    data_config,
998                } => TableDataSource::DataSource {
999                    desc: DataSourceDesc::IngestionExport {
1000                        ingestion_id,
1001                        external_reference,
1002                        details,
1003                        data_config,
1004                    },
1005                    timeline,
1006                },
1007                plan::DataSourceDesc::Webhook {
1008                    validate_using,
1009                    body_format,
1010                    headers,
1011                    cluster_id,
1012                } => TableDataSource::DataSource {
1013                    desc: DataSourceDesc::Webhook {
1014                        validate_using,
1015                        body_format,
1016                        headers,
1017                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1018                    },
1019                    timeline,
1020                },
1021                o => {
1022                    unreachable!("CREATE TABLE data source got {:?}", o)
1023                }
1024            },
1025        };
1026
1027        let is_webhook = if let TableDataSource::DataSource {
1028            desc: DataSourceDesc::Webhook { .. },
1029            timeline: _,
1030        } = &data_source
1031        {
1032            true
1033        } else {
1034            false
1035        };
1036
1037        let table = Table {
1038            create_sql: Some(table.create_sql),
1039            desc: table.desc,
1040            collections,
1041            conn_id: conn_id.cloned(),
1042            resolved_ids,
1043            custom_logical_compaction_window: table.compaction_window,
1044            is_retained_metrics_object: false,
1045            data_source,
1046        };
1047        let ops = vec![catalog::Op::CreateItem {
1048            id: table_id,
1049            name: name.clone(),
1050            item: CatalogItem::Table(table.clone()),
1051            owner_id: *ctx.session().current_role_id(),
1052        }];
1053
1054        let catalog_result = self
1055            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1056            .await;
1057
1058        if is_webhook {
1059            // try_get_webhook_url will make up a URL for things that are not
1060            // webhooks, so we guard against that here.
1061            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1062                ctx.session()
1063                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1064            }
1065        }
1066
1067        match catalog_result {
1068            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1069            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1070                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1071            })) if if_not_exists => {
1072                ctx.session_mut()
1073                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1074                        name: name.item,
1075                        ty: "table",
1076                    });
1077                Ok(ExecuteResponse::CreatedTable)
1078            }
1079            Err(err) => Err(err),
1080        }
1081    }
1082
1083    #[instrument]
1084    pub(super) async fn sequence_create_sink(
1085        &mut self,
1086        ctx: ExecuteContext,
1087        plan: plan::CreateSinkPlan,
1088        resolved_ids: ResolvedIds,
1089    ) {
1090        let plan::CreateSinkPlan {
1091            name,
1092            sink,
1093            with_snapshot,
1094            if_not_exists,
1095            in_cluster,
1096        } = plan;
1097
1098        // First try to allocate an ID and an OID. If either fails, we're done.
1099        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1100
1101        let catalog_sink = Sink {
1102            create_sql: sink.create_sql,
1103            global_id,
1104            from: sink.from,
1105            connection: sink.connection,
1106            envelope: sink.envelope,
1107            version: sink.version,
1108            with_snapshot,
1109            resolved_ids,
1110            cluster_id: in_cluster,
1111            commit_interval: sink.commit_interval,
1112        };
1113
1114        let ops = vec![catalog::Op::CreateItem {
1115            id: item_id,
1116            name: name.clone(),
1117            item: CatalogItem::Sink(catalog_sink.clone()),
1118            owner_id: *ctx.session().current_role_id(),
1119        }];
1120
1121        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1122
1123        match result {
1124            Ok(()) => {}
1125            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1126                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1127            })) if if_not_exists => {
1128                ctx.session()
1129                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1130                        name: name.item,
1131                        ty: "sink",
1132                    });
1133                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1134                return;
1135            }
1136            Err(e) => {
1137                ctx.retire(Err(e));
1138                return;
1139            }
1140        };
1141
1142        self.create_storage_export(global_id, &catalog_sink)
1143            .await
1144            .unwrap_or_terminate("cannot fail to create exports");
1145
1146        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1147            .await;
1148
1149        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1150    }
1151
1152    /// Validates that a view definition does not contain any expressions that may lead to
1153    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1154    ///
1155    /// We prevent these expressions so that we can add columns to system tables without
1156    /// changing the definition of the view.
1157    ///
1158    /// Here is a bit of a hand wavy proof as to why we only need to check the
1159    /// immediate view definition for system objects and ambiguous column
1160    /// references, and not the entire dependency tree:
1161    ///
1162    ///   - A view with no object references cannot have any ambiguous column
1163    ///   references to a system object, because it has no system objects.
1164    ///   - A view with a direct reference to a system object and a * or
1165    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1166    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1167    ///   any ambiguous column references to a system object, because all column
1168    ///   references are explicitly named.
1169    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1170    ///   system object cannot have any ambiguous column references to a system
1171    ///   object, because there are no system objects in the top level view and
1172    ///   all sub-views are guaranteed to have no ambiguous column references to
1173    ///   system objects.
1174    pub(super) fn validate_system_column_references(
1175        &self,
1176        uses_ambiguous_columns: bool,
1177        depends_on: &BTreeSet<GlobalId>,
1178    ) -> Result<(), AdapterError> {
1179        if uses_ambiguous_columns
1180            && depends_on
1181                .iter()
1182                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1183        {
1184            Err(AdapterError::AmbiguousSystemColumnReference)
1185        } else {
1186            Ok(())
1187        }
1188    }
1189
1190    #[instrument]
1191    pub(super) async fn sequence_create_type(
1192        &mut self,
1193        session: &Session,
1194        plan: plan::CreateTypePlan,
1195        resolved_ids: ResolvedIds,
1196    ) -> Result<ExecuteResponse, AdapterError> {
1197        let (item_id, global_id) = self.allocate_user_id().await?;
1198        // Validate the type definition (e.g., composite columns) before storing.
1199        plan.typ
1200            .inner
1201            .desc(&self.catalog().for_session(session))
1202            .map_err(AdapterError::from)?;
1203        let typ = Type {
1204            create_sql: Some(plan.typ.create_sql),
1205            global_id,
1206            details: CatalogTypeDetails {
1207                array_id: None,
1208                typ: plan.typ.inner,
1209                pg_metadata: None,
1210            },
1211            resolved_ids,
1212        };
1213        let op = catalog::Op::CreateItem {
1214            id: item_id,
1215            name: plan.name,
1216            item: CatalogItem::Type(typ),
1217            owner_id: *session.current_role_id(),
1218        };
1219        match self.catalog_transact(Some(session), vec![op]).await {
1220            Ok(()) => Ok(ExecuteResponse::CreatedType),
1221            Err(err) => Err(err),
1222        }
1223    }
1224
1225    #[instrument]
1226    pub(super) async fn sequence_comment_on(
1227        &mut self,
1228        session: &Session,
1229        plan: plan::CommentPlan,
1230    ) -> Result<ExecuteResponse, AdapterError> {
1231        let op = catalog::Op::Comment {
1232            object_id: plan.object_id,
1233            sub_component: plan.sub_component,
1234            comment: plan.comment,
1235        };
1236        self.catalog_transact(Some(session), vec![op]).await?;
1237        Ok(ExecuteResponse::Comment)
1238    }
1239
1240    #[instrument]
1241    pub(super) async fn sequence_drop_objects(
1242        &mut self,
1243        ctx: &mut ExecuteContext,
1244        plan::DropObjectsPlan {
1245            drop_ids,
1246            object_type,
1247            referenced_ids,
1248        }: plan::DropObjectsPlan,
1249    ) -> Result<ExecuteResponse, AdapterError> {
1250        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1251        let mut objects = Vec::new();
1252        for obj_id in &drop_ids {
1253            if !referenced_ids_hashset.contains(obj_id) {
1254                let object_info = ErrorMessageObjectDescription::from_id(
1255                    obj_id,
1256                    &self.catalog().for_session(ctx.session()),
1257                )
1258                .to_string();
1259                objects.push(object_info);
1260            }
1261        }
1262
1263        if !objects.is_empty() {
1264            ctx.session()
1265                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1266        }
1267
1268        // Collect GlobalIds for expression cache invalidation.
1269        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1270            .iter()
1271            .filter_map(|id| match id {
1272                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1273                _ => None,
1274            })
1275            .flatten()
1276            .collect();
1277
1278        let DropOps {
1279            ops,
1280            dropped_active_db,
1281            dropped_active_cluster,
1282            dropped_in_use_indexes,
1283        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1284
1285        self.catalog_transact_with_context(None, Some(ctx), ops)
1286            .await?;
1287
1288        // Invalidate expression cache entries for dropped objects.
1289        if !expr_cache_invalidate_ids.is_empty() {
1290            let _fut = self.catalog().update_expression_cache(
1291                Default::default(),
1292                Default::default(),
1293                expr_cache_invalidate_ids,
1294            );
1295        }
1296
1297        fail::fail_point!("after_sequencer_drop_replica");
1298
1299        if dropped_active_db {
1300            ctx.session()
1301                .add_notice(AdapterNotice::DroppedActiveDatabase {
1302                    name: ctx.session().vars().database().to_string(),
1303                });
1304        }
1305        if dropped_active_cluster {
1306            ctx.session()
1307                .add_notice(AdapterNotice::DroppedActiveCluster {
1308                    name: ctx.session().vars().cluster().to_string(),
1309                });
1310        }
1311        for dropped_in_use_index in dropped_in_use_indexes {
1312            ctx.session()
1313                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1314            self.metrics
1315                .optimization_notices
1316                .with_label_values(&["DroppedInUseIndex"])
1317                .inc_by(1);
1318        }
1319        Ok(ExecuteResponse::DroppedObject(object_type))
1320    }
1321
1322    fn validate_dropped_role_ownership(
1323        &self,
1324        session: &Session,
1325        dropped_roles: &BTreeMap<RoleId, &str>,
1326    ) -> Result<(), AdapterError> {
1327        fn privilege_check(
1328            privileges: &PrivilegeMap,
1329            dropped_roles: &BTreeMap<RoleId, &str>,
1330            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1331            object_id: &SystemObjectId,
1332            catalog: &ConnCatalog,
1333        ) {
1334            for privilege in privileges.all_values() {
1335                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1336                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1337                    let object_description =
1338                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1339                    dependent_objects
1340                        .entry(role_name.to_string())
1341                        .or_default()
1342                        .push(format!(
1343                            "privileges on {object_description} granted by {grantor_name}",
1344                        ));
1345                }
1346                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1347                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1348                    let object_description =
1349                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1350                    dependent_objects
1351                        .entry(role_name.to_string())
1352                        .or_default()
1353                        .push(format!(
1354                            "privileges granted on {object_description} to {grantee_name}"
1355                        ));
1356                }
1357            }
1358        }
1359
1360        let catalog = self.catalog().for_session(session);
1361        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1362        for entry in self.catalog.entries() {
1363            let id = SystemObjectId::Object(entry.id().into());
1364            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1365                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1366                dependent_objects
1367                    .entry(role_name.to_string())
1368                    .or_default()
1369                    .push(format!("owner of {object_description}"));
1370            }
1371            privilege_check(
1372                entry.privileges(),
1373                dropped_roles,
1374                &mut dependent_objects,
1375                &id,
1376                &catalog,
1377            );
1378        }
1379        for database in self.catalog.databases() {
1380            let database_id = SystemObjectId::Object(database.id().into());
1381            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1382                let object_description =
1383                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1384                dependent_objects
1385                    .entry(role_name.to_string())
1386                    .or_default()
1387                    .push(format!("owner of {object_description}"));
1388            }
1389            privilege_check(
1390                &database.privileges,
1391                dropped_roles,
1392                &mut dependent_objects,
1393                &database_id,
1394                &catalog,
1395            );
1396            for schema in database.schemas_by_id.values() {
1397                let schema_id = SystemObjectId::Object(
1398                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1399                );
1400                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1401                    let object_description =
1402                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1403                    dependent_objects
1404                        .entry(role_name.to_string())
1405                        .or_default()
1406                        .push(format!("owner of {object_description}"));
1407                }
1408                privilege_check(
1409                    &schema.privileges,
1410                    dropped_roles,
1411                    &mut dependent_objects,
1412                    &schema_id,
1413                    &catalog,
1414                );
1415            }
1416        }
1417        for cluster in self.catalog.clusters() {
1418            let cluster_id = SystemObjectId::Object(cluster.id().into());
1419            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1420                let object_description =
1421                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1422                dependent_objects
1423                    .entry(role_name.to_string())
1424                    .or_default()
1425                    .push(format!("owner of {object_description}"));
1426            }
1427            privilege_check(
1428                &cluster.privileges,
1429                dropped_roles,
1430                &mut dependent_objects,
1431                &cluster_id,
1432                &catalog,
1433            );
1434            for replica in cluster.replicas() {
1435                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1436                    let replica_id =
1437                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1438                    let object_description =
1439                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1440                    dependent_objects
1441                        .entry(role_name.to_string())
1442                        .or_default()
1443                        .push(format!("owner of {object_description}"));
1444                }
1445            }
1446        }
1447        privilege_check(
1448            self.catalog().system_privileges(),
1449            dropped_roles,
1450            &mut dependent_objects,
1451            &SystemObjectId::System,
1452            &catalog,
1453        );
1454        for (default_privilege_object, default_privilege_acl_items) in
1455            self.catalog.default_privileges()
1456        {
1457            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1458                dependent_objects
1459                    .entry(role_name.to_string())
1460                    .or_default()
1461                    .push(format!(
1462                        "default privileges on {}S created by {}",
1463                        default_privilege_object.object_type, role_name
1464                    ));
1465            }
1466            for default_privilege_acl_item in default_privilege_acl_items {
1467                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1468                    dependent_objects
1469                        .entry(role_name.to_string())
1470                        .or_default()
1471                        .push(format!(
1472                            "default privileges on {}S granted to {}",
1473                            default_privilege_object.object_type, role_name
1474                        ));
1475                }
1476            }
1477        }
1478
1479        if !dependent_objects.is_empty() {
1480            Err(AdapterError::DependentObject(dependent_objects))
1481        } else {
1482            Ok(())
1483        }
1484    }
1485
1486    #[instrument]
1487    pub(super) async fn sequence_drop_owned(
1488        &mut self,
1489        session: &Session,
1490        plan: plan::DropOwnedPlan,
1491    ) -> Result<ExecuteResponse, AdapterError> {
1492        for role_id in &plan.role_ids {
1493            self.catalog().ensure_not_reserved_role(role_id)?;
1494        }
1495
1496        let mut privilege_revokes = plan.privilege_revokes;
1497
1498        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1499        let session_catalog = self.catalog().for_session(session);
1500        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1501            && !session.is_superuser()
1502        {
1503            // Obtain all roles that the current session is a member of.
1504            let role_membership =
1505                session_catalog.collect_role_membership(session.current_role_id());
1506            let invalid_revokes: BTreeSet<_> = privilege_revokes
1507                .extract_if(.., |(_, privilege)| {
1508                    !role_membership.contains(&privilege.grantor)
1509                })
1510                .map(|(object_id, _)| object_id)
1511                .collect();
1512            for invalid_revoke in invalid_revokes {
1513                let object_description =
1514                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1515                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1516            }
1517        }
1518
1519        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1520            catalog::Op::UpdatePrivilege {
1521                target_id: object_id,
1522                privilege,
1523                variant: UpdatePrivilegeVariant::Revoke,
1524            }
1525        });
1526        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1527            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1528                privilege_object,
1529                privilege_acl_item,
1530                variant: UpdatePrivilegeVariant::Revoke,
1531            },
1532        );
1533        let DropOps {
1534            ops: drop_ops,
1535            dropped_active_db,
1536            dropped_active_cluster,
1537            dropped_in_use_indexes,
1538        } = self.sequence_drop_common(session, plan.drop_ids)?;
1539
1540        let ops = privilege_revoke_ops
1541            .chain(default_privilege_revoke_ops)
1542            .chain(drop_ops.into_iter())
1543            .collect();
1544
1545        self.catalog_transact(Some(session), ops).await?;
1546
1547        if dropped_active_db {
1548            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1549                name: session.vars().database().to_string(),
1550            });
1551        }
1552        if dropped_active_cluster {
1553            session.add_notice(AdapterNotice::DroppedActiveCluster {
1554                name: session.vars().cluster().to_string(),
1555            });
1556        }
1557        for dropped_in_use_index in dropped_in_use_indexes {
1558            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1559        }
1560        Ok(ExecuteResponse::DroppedOwned)
1561    }
1562
1563    fn sequence_drop_common(
1564        &self,
1565        session: &Session,
1566        ids: Vec<ObjectId>,
1567    ) -> Result<DropOps, AdapterError> {
1568        let mut dropped_active_db = false;
1569        let mut dropped_active_cluster = false;
1570        let mut dropped_in_use_indexes = Vec::new();
1571        let mut dropped_roles = BTreeMap::new();
1572        let mut dropped_databases = BTreeSet::new();
1573        let mut dropped_schemas = BTreeSet::new();
1574        // Dropping either the group role or the member role of a role membership will trigger a
1575        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1576        // role membership twice.
1577        let mut role_revokes = BTreeSet::new();
1578        // Dropping a database or a schema will revoke all default roles associated with that
1579        // database or schema.
1580        let mut default_privilege_revokes = BTreeSet::new();
1581
1582        // Clusters we're dropping
1583        let mut clusters_to_drop = BTreeSet::new();
1584
1585        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1586        for id in &ids {
1587            match id {
1588                ObjectId::Database(id) => {
1589                    let name = self.catalog().get_database(id).name();
1590                    if name == session.vars().database() {
1591                        dropped_active_db = true;
1592                    }
1593                    dropped_databases.insert(id);
1594                }
1595                ObjectId::Schema((_, spec)) => {
1596                    if let SchemaSpecifier::Id(id) = spec {
1597                        dropped_schemas.insert(id);
1598                    }
1599                }
1600                ObjectId::Cluster(id) => {
1601                    clusters_to_drop.insert(*id);
1602                    if let Some(active_id) = self
1603                        .catalog()
1604                        .active_cluster(session)
1605                        .ok()
1606                        .map(|cluster| cluster.id())
1607                    {
1608                        if id == &active_id {
1609                            dropped_active_cluster = true;
1610                        }
1611                    }
1612                }
1613                ObjectId::Role(id) => {
1614                    let role = self.catalog().get_role(id);
1615                    let name = role.name();
1616                    dropped_roles.insert(*id, name);
1617                    // We must revoke all role memberships that the dropped roles belongs to.
1618                    for (group_id, grantor_id) in &role.membership.map {
1619                        role_revokes.insert((*group_id, *id, *grantor_id));
1620                    }
1621                }
1622                ObjectId::Item(id) => {
1623                    if let Some(index) = self.catalog().get_entry(id).index() {
1624                        let humanizer = self.catalog().for_session(session);
1625                        let dependants = self
1626                            .controller
1627                            .compute
1628                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1629                            .ok()
1630                            .into_iter()
1631                            .flatten()
1632                            .filter(|dependant_id| {
1633                                // Transient Ids belong to Peeks. We are not interested for now in
1634                                // peeks depending on a dropped index.
1635                                // TODO: show a different notice in this case. Something like
1636                                // "There is an in-progress ad hoc SELECT that uses the dropped
1637                                // index. The resources used by the index will be freed when all
1638                                // such SELECTs complete."
1639                                if dependant_id.is_transient() {
1640                                    return false;
1641                                }
1642                                // The item should exist, but don't panic if it doesn't.
1643                                let Some(dependent_id) = humanizer
1644                                    .try_get_item_by_global_id(dependant_id)
1645                                    .map(|item| item.id())
1646                                else {
1647                                    return false;
1648                                };
1649                                // If the dependent object is also being dropped, then there is no
1650                                // problem, so we don't want a notice.
1651                                !ids_set.contains(&ObjectId::Item(dependent_id))
1652                            })
1653                            .flat_map(|dependant_id| {
1654                                // If we are not able to find a name for this ID it probably means
1655                                // we have already dropped the compute collection, in which case we
1656                                // can ignore it.
1657                                humanizer.humanize_id(dependant_id)
1658                            })
1659                            .collect_vec();
1660                        if !dependants.is_empty() {
1661                            dropped_in_use_indexes.push(DroppedInUseIndex {
1662                                index_name: humanizer
1663                                    .humanize_id(index.global_id())
1664                                    .unwrap_or_else(|| id.to_string()),
1665                                dependant_objects: dependants,
1666                            });
1667                        }
1668                    }
1669                }
1670                _ => {}
1671            }
1672        }
1673
1674        for id in &ids {
1675            match id {
1676                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1677                // unless they are internal replicas, which exist outside the scope
1678                // of managed clusters.
1679                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1680                    if !clusters_to_drop.contains(cluster_id) {
1681                        let cluster = self.catalog.get_cluster(*cluster_id);
1682                        if cluster.is_managed() {
1683                            let replica =
1684                                cluster.replica(*replica_id).expect("Catalog out of sync");
1685                            if !replica.config.location.internal() {
1686                                coord_bail!("cannot drop replica of managed cluster");
1687                            }
1688                        }
1689                    }
1690                }
1691                _ => {}
1692            }
1693        }
1694
1695        for role_id in dropped_roles.keys() {
1696            self.catalog().ensure_not_reserved_role(role_id)?;
1697        }
1698        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1699        // If any role is a member of a dropped role, then we must revoke that membership.
1700        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1701        for role in self.catalog().user_roles() {
1702            for dropped_role_id in
1703                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1704            {
1705                role_revokes.insert((
1706                    **dropped_role_id,
1707                    role.id(),
1708                    *role
1709                        .membership
1710                        .map
1711                        .get(*dropped_role_id)
1712                        .expect("included in keys above"),
1713                ));
1714            }
1715        }
1716
1717        for (default_privilege_object, default_privilege_acls) in
1718            self.catalog().default_privileges()
1719        {
1720            if matches!(
1721                &default_privilege_object.database_id,
1722                Some(database_id) if dropped_databases.contains(database_id),
1723            ) || matches!(
1724                &default_privilege_object.schema_id,
1725                Some(schema_id) if dropped_schemas.contains(schema_id),
1726            ) {
1727                for default_privilege_acl in default_privilege_acls {
1728                    default_privilege_revokes.insert((
1729                        default_privilege_object.clone(),
1730                        default_privilege_acl.clone(),
1731                    ));
1732                }
1733            }
1734        }
1735
1736        let ops = role_revokes
1737            .into_iter()
1738            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1739                role_id,
1740                member_id,
1741                grantor_id,
1742            })
1743            .chain(default_privilege_revokes.into_iter().map(
1744                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1745                    privilege_object,
1746                    privilege_acl_item,
1747                    variant: UpdatePrivilegeVariant::Revoke,
1748                },
1749            ))
1750            .chain(iter::once(catalog::Op::DropObjects(
1751                ids.into_iter()
1752                    .map(DropObjectInfo::manual_drop_from_object_id)
1753                    .collect(),
1754            )))
1755            .collect();
1756
1757        Ok(DropOps {
1758            ops,
1759            dropped_active_db,
1760            dropped_active_cluster,
1761            dropped_in_use_indexes,
1762        })
1763    }
1764
1765    pub(super) fn sequence_explain_schema(
1766        &self,
1767        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1768    ) -> Result<ExecuteResponse, AdapterError> {
1769        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1770            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1771        })?;
1772
1773        let json_string = json_string(&json_value);
1774        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1775        Ok(Self::send_immediate_rows(row))
1776    }
1777
1778    pub(super) fn sequence_show_all_variables(
1779        &self,
1780        session: &Session,
1781    ) -> Result<ExecuteResponse, AdapterError> {
1782        let mut rows = viewable_variables(self.catalog().state(), session)
1783            .map(|v| (v.name(), v.value(), v.description()))
1784            .collect::<Vec<_>>();
1785        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1786
1787        // TODO(parkmycar): Pack all of these into a single RowCollection.
1788        let rows: Vec<_> = rows
1789            .into_iter()
1790            .map(|(name, val, desc)| {
1791                Row::pack_slice(&[
1792                    Datum::String(name),
1793                    Datum::String(&val),
1794                    Datum::String(desc),
1795                ])
1796            })
1797            .collect();
1798        Ok(Self::send_immediate_rows(rows))
1799    }
1800
1801    pub(super) fn sequence_show_variable(
1802        &self,
1803        session: &Session,
1804        plan: plan::ShowVariablePlan,
1805    ) -> Result<ExecuteResponse, AdapterError> {
1806        if &plan.name == SCHEMA_ALIAS {
1807            let schemas = self.catalog.resolve_search_path(session);
1808            let schema = schemas.first();
1809            return match schema {
1810                Some((database_spec, schema_spec)) => {
1811                    let schema_name = &self
1812                        .catalog
1813                        .get_schema(database_spec, schema_spec, session.conn_id())
1814                        .name()
1815                        .schema;
1816                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1817                    Ok(Self::send_immediate_rows(row))
1818                }
1819                None => {
1820                    if session.vars().current_object_missing_warnings() {
1821                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1822                            search_path: session
1823                                .vars()
1824                                .search_path()
1825                                .into_iter()
1826                                .map(|schema| schema.to_string())
1827                                .collect(),
1828                        });
1829                    }
1830                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1831                }
1832            };
1833        }
1834
1835        let variable = session
1836            .vars()
1837            .get(self.catalog().system_config(), &plan.name)
1838            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1839
1840        // In lieu of plumbing the user to all system config functions, just check that the var is
1841        // visible.
1842        variable.visible(session.user(), self.catalog().system_config())?;
1843
1844        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1845        if variable.name() == vars::DATABASE.name()
1846            && matches!(
1847                self.catalog().resolve_database(&variable.value()),
1848                Err(CatalogError::UnknownDatabase(_))
1849            )
1850            && session.vars().current_object_missing_warnings()
1851        {
1852            let name = variable.value();
1853            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1854        } else if variable.name() == vars::CLUSTER.name()
1855            && matches!(
1856                self.catalog().resolve_cluster(&variable.value()),
1857                Err(CatalogError::UnknownCluster(_))
1858            )
1859            && session.vars().current_object_missing_warnings()
1860        {
1861            let name = variable.value();
1862            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1863        }
1864        Ok(Self::send_immediate_rows(row))
1865    }
1866
1867    #[instrument]
1868    pub(super) async fn sequence_inspect_shard(
1869        &self,
1870        session: &Session,
1871        plan: plan::InspectShardPlan,
1872    ) -> Result<ExecuteResponse, AdapterError> {
1873        // TODO: Not thrilled about this rbac special case here, but probably
1874        // sufficient for now.
1875        if !session.user().is_internal() {
1876            return Err(AdapterError::Unauthorized(
1877                rbac::UnauthorizedError::MzSystem {
1878                    action: "inspect".into(),
1879                },
1880            ));
1881        }
1882        let state = self
1883            .controller
1884            .storage
1885            .inspect_persist_state(plan.id)
1886            .await?;
1887        let jsonb = Jsonb::from_serde_json(state)?;
1888        Ok(Self::send_immediate_rows(jsonb.into_row()))
1889    }
1890
1891    #[instrument]
1892    pub(super) fn sequence_set_variable(
1893        &self,
1894        session: &mut Session,
1895        plan: plan::SetVariablePlan,
1896    ) -> Result<ExecuteResponse, AdapterError> {
1897        let (name, local) = (plan.name, plan.local);
1898        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1899            self.validate_set_isolation_level(session)?;
1900        }
1901        if &name == vars::CLUSTER.name() {
1902            self.validate_set_cluster(session)?;
1903        }
1904
1905        let vars = session.vars_mut();
1906        let values = match plan.value {
1907            plan::VariableValue::Default => None,
1908            plan::VariableValue::Values(values) => Some(values),
1909        };
1910
1911        match values {
1912            Some(values) => {
1913                vars.set(
1914                    self.catalog().system_config(),
1915                    &name,
1916                    VarInput::SqlSet(&values),
1917                    local,
1918                )?;
1919
1920                let vars = session.vars();
1921
1922                // Emit a warning when deprecated variables are used.
1923                // TODO(database-issues#8069) remove this after sufficient time has passed
1924                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1925                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1926                } else if name == vars::CLUSTER.name()
1927                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1928                {
1929                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1930                }
1931
1932                // Database or cluster value does not correspond to a catalog item.
1933                if name.as_str() == vars::DATABASE.name()
1934                    && matches!(
1935                        self.catalog().resolve_database(vars.database()),
1936                        Err(CatalogError::UnknownDatabase(_))
1937                    )
1938                    && session.vars().current_object_missing_warnings()
1939                {
1940                    let name = vars.database().to_string();
1941                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1942                } else if name.as_str() == vars::CLUSTER.name()
1943                    && matches!(
1944                        self.catalog().resolve_cluster(vars.cluster()),
1945                        Err(CatalogError::UnknownCluster(_))
1946                    )
1947                    && session.vars().current_object_missing_warnings()
1948                {
1949                    let name = vars.cluster().to_string();
1950                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1951                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1952                    let v = values.into_first().to_lowercase();
1953                    if v == IsolationLevel::ReadUncommitted.as_variant_str()
1954                        || v == IsolationLevel::ReadCommitted.as_variant_str()
1955                        || v == IsolationLevel::RepeatableRead.as_variant_str()
1956                    {
1957                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1958                            isolation_level: v,
1959                        });
1960                    } else if v == IsolationLevel::StrongSessionSerializable.as_variant_str() {
1961                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1962                    }
1963                }
1964
1965                // Reject incompatible combinations of bounded staleness and
1966                // `real_time_recency` after the variable has been applied. Either
1967                // SET name can introduce the conflict, so check both.
1968                if (name.as_str() == TRANSACTION_ISOLATION_VAR_NAME
1969                    || name.as_str() == vars::REAL_TIME_RECENCY.name())
1970                    && session
1971                        .vars()
1972                        .transaction_isolation()
1973                        .is_bounded_staleness()
1974                    && session.vars().real_time_recency()
1975                {
1976                    return Err(AdapterError::BoundedStalenessRealTimeRecencyConflict);
1977                }
1978            }
1979            None => vars.reset(self.catalog().system_config(), &name, local)?,
1980        }
1981
1982        Ok(ExecuteResponse::SetVariable { name, reset: false })
1983    }
1984
1985    pub(super) fn sequence_reset_variable(
1986        &self,
1987        session: &mut Session,
1988        plan: plan::ResetVariablePlan,
1989    ) -> Result<ExecuteResponse, AdapterError> {
1990        let name = plan.name;
1991        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1992            self.validate_set_isolation_level(session)?;
1993        }
1994        if &name == vars::CLUSTER.name() {
1995            self.validate_set_cluster(session)?;
1996        }
1997        session
1998            .vars_mut()
1999            .reset(self.catalog().system_config(), &name, false)?;
2000        Ok(ExecuteResponse::SetVariable { name, reset: true })
2001    }
2002
2003    pub(super) fn sequence_set_transaction(
2004        &self,
2005        session: &mut Session,
2006        plan: plan::SetTransactionPlan,
2007    ) -> Result<ExecuteResponse, AdapterError> {
2008        // TODO(jkosh44) Only supports isolation levels for now.
2009        for mode in plan.modes {
2010            match mode {
2011                TransactionMode::AccessMode(_) => {
2012                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2013                }
2014                TransactionMode::IsolationLevel(isolation_level) => {
2015                    self.validate_set_isolation_level(session)?;
2016
2017                    session.vars_mut().set(
2018                        self.catalog().system_config(),
2019                        TRANSACTION_ISOLATION_VAR_NAME,
2020                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2021                        plan.local,
2022                    )?
2023                }
2024            }
2025        }
2026        Ok(ExecuteResponse::SetVariable {
2027            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2028            reset: false,
2029        })
2030    }
2031
2032    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2033        if session.transaction().contains_ops() {
2034            Err(AdapterError::InvalidSetIsolationLevel)
2035        } else {
2036            Ok(())
2037        }
2038    }
2039
2040    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2041        if session.transaction().contains_ops() {
2042            Err(AdapterError::InvalidSetCluster)
2043        } else {
2044            Ok(())
2045        }
2046    }
2047
2048    #[instrument]
2049    pub(super) async fn sequence_end_transaction(
2050        &mut self,
2051        mut ctx: ExecuteContext,
2052        mut action: EndTransactionAction,
2053    ) {
2054        // If the transaction has failed, we can only rollback.
2055        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2056            (&action, ctx.session().transaction())
2057        {
2058            action = EndTransactionAction::Rollback;
2059        }
2060        let response = match action {
2061            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2062                params: BTreeMap::new(),
2063            }),
2064            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2065                params: BTreeMap::new(),
2066            }),
2067        };
2068
2069        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2070
2071        let (response, action) = match result {
2072            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2073                (response, action)
2074            }
2075            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2076                // Make sure we have the correct set of write locks for this transaction.
2077                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2078                // transactions.
2079                let validated_locks = match write_lock_guards {
2080                    None => None,
2081                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2082                        Ok(locks) => Some(locks),
2083                        Err(missing) => {
2084                            tracing::error!(?missing, "programming error, missing write locks");
2085                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2086                        }
2087                    },
2088                };
2089
2090                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2091                for WriteOp { id, rows } in writes {
2092                    let total_rows = collected_writes.entry(id).or_default();
2093                    total_rows.push(rows);
2094                }
2095
2096                self.submit_write(PendingWriteTxn::User {
2097                    span: Span::current(),
2098                    writes: collected_writes,
2099                    write_locks: validated_locks,
2100                    responder: UserWriteResponder::Session(PendingTxn {
2101                        ctx,
2102                        response,
2103                        action,
2104                    }),
2105                });
2106                return;
2107            }
2108            Ok((
2109                Some(TransactionOps::Peeks {
2110                    determination,
2111                    requires_linearization: RequireLinearization::Required,
2112                    ..
2113                }),
2114                _,
2115            )) if ctx.session().vars().transaction_isolation()
2116                == &IsolationLevel::StrictSerializable =>
2117            {
2118                let conn_id = ctx.session().conn_id().clone();
2119                let pending_read_txn = PendingReadTxn {
2120                    txn: PendingRead::Read {
2121                        txn: PendingTxn {
2122                            ctx,
2123                            response,
2124                            action,
2125                        },
2126                    },
2127                    timestamp_context: determination.timestamp_context,
2128                    created: Instant::now(),
2129                    num_requeues: 0,
2130                    otel_ctx: OpenTelemetryContext::obtain(),
2131                };
2132                self.strict_serializable_reads_tx
2133                    .send((conn_id, pending_read_txn))
2134                    .expect("sending to strict_serializable_reads_tx cannot fail");
2135                return;
2136            }
2137            Ok((
2138                Some(TransactionOps::Peeks {
2139                    determination,
2140                    requires_linearization: RequireLinearization::Required,
2141                    ..
2142                }),
2143                _,
2144            )) if ctx.session().vars().transaction_isolation()
2145                == &IsolationLevel::StrongSessionSerializable =>
2146            {
2147                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2148                    ctx.session_mut()
2149                        .ensure_timestamp_oracle(timeline.clone())
2150                        .apply_write(*ts);
2151                }
2152                (response, action)
2153            }
2154            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2155                self.internal_cmd_tx
2156                    .send(Message::ExecuteSingleStatementTransaction {
2157                        ctx,
2158                        otel_ctx: OpenTelemetryContext::obtain(),
2159                        stmt,
2160                        params,
2161                    })
2162                    .expect("must send");
2163                return;
2164            }
2165            Ok((_, _)) => (response, action),
2166            Err(err) => (Err(err), EndTransactionAction::Rollback),
2167        };
2168        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2169        // Append any parameters that changed to the response.
2170        let response = response.map(|mut r| {
2171            r.extend_params(changed);
2172            ExecuteResponse::from(r)
2173        });
2174
2175        ctx.retire(response);
2176    }
2177
2178    #[instrument]
2179    async fn sequence_end_transaction_inner(
2180        &mut self,
2181        ctx: &mut ExecuteContext,
2182        action: EndTransactionAction,
2183    ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2184        let txn = self.clear_transaction(ctx.session_mut()).await;
2185
2186        if let EndTransactionAction::Commit = action {
2187            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2188                match &mut ops {
2189                    TransactionOps::Writes(writes) => {
2190                        for WriteOp { id, .. } in &mut writes.iter() {
2191                            // Re-verify this id exists.
2192                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2193                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2194                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2195                                })
2196                            })?;
2197                        }
2198
2199                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2200                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2201                    }
2202                    TransactionOps::DDL {
2203                        ops,
2204                        state: _,
2205                        side_effects,
2206                        revision,
2207                        snapshot: _,
2208                    } => {
2209                        // Make sure our catalog hasn't changed.
2210                        if *revision != self.catalog().transient_revision() {
2211                            return Err(AdapterError::DDLTransactionRace);
2212                        }
2213                        // Commit all of our queued ops.
2214                        let ops = std::mem::take(ops);
2215                        let side_effects = std::mem::take(side_effects);
2216                        self.catalog_transact_with_side_effects(
2217                            Some(ctx),
2218                            ops,
2219                            move |a, mut ctx| {
2220                                Box::pin(async move {
2221                                    for side_effect in side_effects {
2222                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2223                                    }
2224                                })
2225                            },
2226                        )
2227                        .await?;
2228                    }
2229                    _ => (),
2230                }
2231                return Ok((Some(ops), write_lock_guards));
2232            }
2233        }
2234
2235        Ok((None, None))
2236    }
2237
2238    pub(super) async fn sequence_side_effecting_func(
2239        &mut self,
2240        ctx: ExecuteContext,
2241        plan: SideEffectingFunc,
2242    ) {
2243        match plan {
2244            SideEffectingFunc::PgCancelBackend { connection_id } => {
2245                let Some(connection_id) = connection_id else {
2246                    // The argument was `NULL`, so, like in PostgreSQL, the
2247                    // function returns `NULL`.
2248                    ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[
2249                        Datum::Null,
2250                    ]))));
2251                    return;
2252                };
2253
2254                if ctx.session().conn_id().unhandled() == connection_id {
2255                    // As a special case, if we're canceling ourselves, we send
2256                    // back a canceled resposne to the client issuing the query,
2257                    // and so we need to do no further processing of the cancel.
2258                    ctx.retire(Err(AdapterError::Canceled));
2259                    return;
2260                }
2261
2262                let res = if let Some((id_handle, _conn_meta)) =
2263                    self.active_conns.get_key_value(&connection_id)
2264                {
2265                    // check_plan already verified role membership.
2266                    self.handle_privileged_cancel(id_handle.clone()).await;
2267                    Datum::True
2268                } else {
2269                    Datum::False
2270                };
2271                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2272            }
2273        }
2274    }
2275
2276    /// Execute a side-effecting function from the frontend peek path.
2277    /// This is separate from `sequence_side_effecting_func` because
2278    /// - It doesn't have an ExecuteContext.
2279    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2280    ///   peek sequencing can't look at `active_conns`.
2281    ///
2282    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2283    /// sequencing.
2284    pub(crate) async fn execute_side_effecting_func(
2285        &mut self,
2286        plan: SideEffectingFunc,
2287        conn_id: ConnectionId,
2288        current_role: RoleId,
2289    ) -> Result<ExecuteResponse, AdapterError> {
2290        match plan {
2291            SideEffectingFunc::PgCancelBackend { connection_id } => {
2292                let Some(connection_id) = connection_id else {
2293                    // The argument was `NULL`, so, like in PostgreSQL, the
2294                    // function returns `NULL`.
2295                    return Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])));
2296                };
2297
2298                if conn_id.unhandled() == connection_id {
2299                    // As a special case, if we're canceling ourselves, we return
2300                    // a canceled response to the client issuing the query,
2301                    // and so we need to do no further processing of the cancel.
2302                    return Err(AdapterError::Canceled);
2303                }
2304
2305                // Perform RBAC check: the current user must be a member of the role
2306                // that owns the connection being cancelled.
2307                if let Some((_id_handle, conn_meta)) =
2308                    self.active_conns.get_key_value(&connection_id)
2309                {
2310                    let target_role = *conn_meta.authenticated_role_id();
2311                    let role_membership = self
2312                        .catalog()
2313                        .state()
2314                        .collect_role_membership(&current_role);
2315                    if !role_membership.contains(&target_role) {
2316                        let target_role_name = self
2317                            .catalog()
2318                            .try_get_role(&target_role)
2319                            .map(|role| role.name().to_string())
2320                            .unwrap_or_else(|| target_role.to_string());
2321                        return Err(AdapterError::Unauthorized(
2322                            rbac::UnauthorizedError::RoleMembership {
2323                                role_names: vec![target_role_name],
2324                            },
2325                        ));
2326                    }
2327
2328                    // RBAC check passed, proceed with cancellation.
2329                    let id_handle = self
2330                        .active_conns
2331                        .get_key_value(&connection_id)
2332                        .map(|(id, _)| id.clone())
2333                        .expect("checked above");
2334                    self.handle_privileged_cancel(id_handle).await;
2335                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2336                } else {
2337                    // Connection not found, return false.
2338                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2339                }
2340            }
2341        }
2342    }
2343
2344    /// Inner method that performs the actual real-time recency timestamp determination.
2345    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2346    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2347    pub(crate) async fn determine_real_time_recent_timestamp(
2348        &self,
2349        source_ids: impl Iterator<Item = GlobalId>,
2350        real_time_recency_timeout: Duration,
2351    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2352        let item_ids = source_ids
2353            .map(|gid| {
2354                self.catalog
2355                    .try_resolve_item_id(&gid)
2356                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2357            })
2358            .collect::<Result<Vec<_>, _>>()?;
2359
2360        // Find all dependencies transitively because we need to ensure that
2361        // RTR queries determine the timestamp from the sources' (i.e.
2362        // storage objects that ingest data from external systems) remap
2363        // data. We "cheat" a little bit and filter out any IDs that aren't
2364        // user objects because we know they are not a RTR source.
2365        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2366        // If none of the sources are user objects, we don't need to provide
2367        // a RTR timestamp.
2368        if to_visit.is_empty() {
2369            return Ok(None);
2370        }
2371
2372        let mut timestamp_objects = BTreeSet::new();
2373
2374        while let Some(id) = to_visit.pop_front() {
2375            timestamp_objects.insert(id);
2376            to_visit.extend(
2377                self.catalog()
2378                    .get_entry(&id)
2379                    .uses()
2380                    .into_iter()
2381                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2382            );
2383        }
2384        let timestamp_objects = timestamp_objects
2385            .into_iter()
2386            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2387            .collect();
2388
2389        let r = self
2390            .controller
2391            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2392            .await?;
2393
2394        Ok(Some(r))
2395    }
2396
2397    pub(crate) async fn await_real_time_recent_timestamp<F>(
2398        catalog: Arc<Catalog>,
2399        fut: F,
2400    ) -> Result<Timestamp, AdapterError>
2401    where
2402        F: Future<Output = Result<Timestamp, StorageError>>,
2403    {
2404        fut.await
2405            .map_err(|error| Self::real_time_recent_timestamp_error(&catalog, error))
2406    }
2407
2408    fn real_time_recent_timestamp_error(catalog: &Catalog, error: StorageError) -> AdapterError {
2409        let rtr_name = |id: &GlobalId| {
2410            catalog
2411                .try_get_entry_by_global_id(id)
2412                .map(|e| e.name().item.clone())
2413                .unwrap_or_else(|| id.to_string())
2414        };
2415
2416        match error {
2417            StorageError::RtrTimeout(id) => AdapterError::RtrTimeout(rtr_name(&id)),
2418            StorageError::RtrDropFailure(id) => AdapterError::RtrDropFailure(rtr_name(&id)),
2419            error => error.into(),
2420        }
2421    }
2422
2423    /// Checks to see if the session needs a real time recency timestamp and if so returns
2424    /// a future that will return the timestamp.
2425    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2426        &self,
2427        session: &Session,
2428        source_ids: impl Iterator<Item = GlobalId>,
2429    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2430        let vars = session.vars();
2431
2432        if vars.real_time_recency()
2433            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2434            && !session.contains_read_timestamp()
2435        {
2436            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2437                .await
2438        } else {
2439            Ok(None)
2440        }
2441    }
2442
2443    #[instrument]
2444    pub(super) async fn sequence_explain_plan(
2445        &mut self,
2446        ctx: ExecuteContext,
2447        plan: plan::ExplainPlanPlan,
2448        target_cluster: TargetCluster,
2449    ) {
2450        match &plan.explainee {
2451            plan::Explainee::Statement(stmt) => match stmt {
2452                plan::ExplaineeStatement::CreateView { .. } => {
2453                    self.explain_create_view(ctx, plan).await;
2454                }
2455                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2456                    self.explain_create_materialized_view(ctx, plan).await;
2457                }
2458                plan::ExplaineeStatement::CreateIndex { .. } => {
2459                    self.explain_create_index(ctx, plan).await;
2460                }
2461                plan::ExplaineeStatement::Select { .. } => {
2462                    self.explain_peek(ctx, plan, target_cluster).await;
2463                }
2464                plan::ExplaineeStatement::Subscribe { .. } => {
2465                    self.explain_subscribe(ctx, plan, target_cluster).await;
2466                }
2467            },
2468            plan::Explainee::View(_) => {
2469                let result = self.explain_view(&ctx, plan);
2470                ctx.retire(result);
2471            }
2472            plan::Explainee::MaterializedView(_) => {
2473                let result = self.explain_materialized_view(&ctx, plan);
2474                ctx.retire(result);
2475            }
2476            plan::Explainee::Index(_) => {
2477                let result = self.explain_index(&ctx, plan);
2478                ctx.retire(result);
2479            }
2480            plan::Explainee::ReplanView(_) => {
2481                self.explain_replan_view(ctx, plan).await;
2482            }
2483            plan::Explainee::ReplanMaterializedView(_) => {
2484                self.explain_replan_materialized_view(ctx, plan).await;
2485            }
2486            plan::Explainee::ReplanIndex(_) => {
2487                self.explain_replan_index(ctx, plan).await;
2488            }
2489        };
2490    }
2491
2492    pub(super) async fn sequence_explain_pushdown(
2493        &mut self,
2494        ctx: ExecuteContext,
2495        plan: plan::ExplainPushdownPlan,
2496        target_cluster: TargetCluster,
2497    ) {
2498        match plan.explainee {
2499            Explainee::Statement(ExplaineeStatement::Select {
2500                broken: false,
2501                plan,
2502                desc: _,
2503            }) => {
2504                let stage = return_if_err!(
2505                    self.peek_validate(
2506                        ctx.session(),
2507                        plan,
2508                        target_cluster,
2509                        None,
2510                        ExplainContext::Pushdown,
2511                        Some(ctx.session().vars().max_query_result_size()),
2512                    ),
2513                    ctx
2514                );
2515                self.sequence_staged(ctx, Span::current(), stage).await;
2516            }
2517            Explainee::MaterializedView(item_id) => {
2518                self.explain_pushdown_materialized_view(ctx, item_id).await;
2519            }
2520            _ => {
2521                ctx.retire(Err(AdapterError::Unsupported(
2522                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2523                )));
2524            }
2525        };
2526    }
2527
2528    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2529    async fn execute_explain_pushdown_with_read_holds(
2530        &self,
2531        ctx: ExecuteContext,
2532        as_of: Antichain<Timestamp>,
2533        mz_now: ResultSpec<'static>,
2534        read_holds: Option<ReadHolds>,
2535        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2536    ) {
2537        let fut = self
2538            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2539            .await;
2540        task::spawn(|| "render explain pushdown", async move {
2541            // Transfer the necessary read holds over to the background task
2542            let _read_holds = read_holds;
2543            let res = fut.await;
2544            ctx.retire(res);
2545        });
2546    }
2547
2548    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2549    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2550        &self,
2551        session: &Session,
2552        as_of: Antichain<Timestamp>,
2553        mz_now: ResultSpec<'static>,
2554        imports: I,
2555    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2556        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2557        super::explain_pushdown_future_inner(
2558            session,
2559            &self.catalog,
2560            &self.controller.storage_collections,
2561            as_of,
2562            mz_now,
2563            imports,
2564        )
2565        .await
2566    }
2567
2568    #[instrument]
2569    pub(super) async fn sequence_insert(
2570        &mut self,
2571        mut ctx: ExecuteContext,
2572        plan: plan::InsertPlan,
2573    ) {
2574        // Normally, this would get checked when trying to add "write ops" to
2575        // the transaction but we go down diverging paths below, based on
2576        // whether the INSERT is only constant values or not.
2577        //
2578        // For the non-constant case we sequence an implicit read-then-write,
2579        // which messes with the transaction ops and would allow an implicit
2580        // read-then-write to sneak into a read-only transaction.
2581        if !ctx.session_mut().transaction().allows_writes() {
2582            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2583            return;
2584        }
2585        if ctx
2586            .session()
2587            .vars()
2588            .transaction_isolation()
2589            .is_bounded_staleness()
2590        {
2591            ctx.retire(Err(AdapterError::BoundedStalenessReadOnly));
2592            return;
2593        }
2594
2595        // The structure of this code originates from a time where
2596        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2597        // optimized `MirRelationExpr`.
2598        //
2599        // Ideally, we would like to make the `selection.as_const().is_some()`
2600        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2601        // are planned as a Wrap($n, $vals) call, so until we can reduce
2602        // HirRelationExpr this will always returns false.
2603        //
2604        // Unfortunately, hitting the default path of the match below also
2605        // causes a lot of tests to fail, so we opted to go with the extra
2606        // `plan.values.clone()` statements when producing the `optimized_mir`
2607        // and re-optimize the values in the `sequence_read_then_write` call.
2608        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2609            // We don't perform any optimizations on an expression that is already
2610            // a constant for writes, as we want to maximize bulk-insert throughput.
2611            let expr = return_if_err!(
2612                plan.values
2613                    .clone()
2614                    .lower(self.catalog().system_config(), None),
2615                ctx
2616            );
2617            OptimizedMirRelationExpr(expr)
2618        } else {
2619            // Collect optimizer parameters.
2620            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2621
2622            // (`optimize::view::Optimizer` has a special case for constant queries.)
2623            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2624
2625            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2626            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2627        };
2628
2629        match optimized_mir.into_inner() {
2630            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2631                let catalog = self.owned_catalog();
2632                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2633                    let result =
2634                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2635                    ctx.retire(result);
2636                });
2637            }
2638            // All non-constant values must be planned as read-then-writes.
2639            _ => {
2640                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2641                    Some(table) => {
2642                        // Inserts always occur at the latest version of the table.
2643                        let desc = table.relation_desc_latest().expect("table has a desc");
2644                        desc.arity()
2645                    }
2646                    None => {
2647                        ctx.retire(Err(AdapterError::Catalog(
2648                            mz_catalog::memory::error::Error {
2649                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2650                                    plan.id.to_string(),
2651                                )),
2652                            },
2653                        )));
2654                        return;
2655                    }
2656                };
2657
2658                let finishing = RowSetFinishing {
2659                    order_by: vec![],
2660                    limit: None,
2661                    offset: 0,
2662                    project: (0..desc_arity).collect(),
2663                };
2664
2665                let read_then_write_plan = plan::ReadThenWritePlan {
2666                    id: plan.id,
2667                    selection: plan.values,
2668                    finishing,
2669                    assignments: BTreeMap::new(),
2670                    kind: MutationKind::Insert,
2671                    returning: plan.returning,
2672                };
2673
2674                self.sequence_read_then_write(ctx, read_then_write_plan)
2675                    .await;
2676            }
2677        }
2678    }
2679
2680    /// ReadThenWrite is a plan whose writes depend on the results of a
2681    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2682    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2683    /// serializability violation could occur.
2684    #[instrument]
2685    pub(super) async fn sequence_read_then_write(
2686        &mut self,
2687        mut ctx: ExecuteContext,
2688        plan: plan::ReadThenWritePlan,
2689    ) {
2690        if ctx
2691            .session()
2692            .vars()
2693            .transaction_isolation()
2694            .is_bounded_staleness()
2695        {
2696            ctx.retire(Err(AdapterError::BoundedStalenessReadOnly));
2697            return;
2698        }
2699
2700        let mut source_ids: BTreeSet<_> = plan
2701            .selection
2702            .depends_on()
2703            .into_iter()
2704            .map(|gid| self.catalog().resolve_item_id(&gid))
2705            .collect();
2706        source_ids.insert(plan.id);
2707
2708        // If the transaction doesn't already have write locks, acquire them.
2709        if ctx.session().transaction().write_locks().is_none() {
2710            // Pre-define all of the locks we need.
2711            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2712
2713            // Try acquiring all of our locks.
2714            for id in &source_ids {
2715                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2716                    write_locks.insert_lock(*id, lock);
2717                }
2718            }
2719
2720            // See if we acquired all of the neccessary locks.
2721            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2722                Ok(locks) => locks,
2723                Err(missing) => {
2724                    // Defer our write if we couldn't acquire all of the locks.
2725                    let role_metadata = ctx.session().role_metadata().clone();
2726                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2727                    let plan = DeferredPlan {
2728                        ctx,
2729                        plan: Plan::ReadThenWrite(plan),
2730                        validity: PlanValidity::new(
2731                            self.catalog.transient_revision(),
2732                            source_ids.clone(),
2733                            None,
2734                            None,
2735                            role_metadata,
2736                        ),
2737                        requires_locks: source_ids,
2738                        // Writes don't track resolved IDs.
2739                        resolved_ids: ResolvedIds::empty(),
2740                        sql_impl_resolved_ids: ResolvedIds::empty(),
2741                    };
2742                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2743                }
2744            };
2745
2746            ctx.session_mut()
2747                .try_grant_write_locks(write_locks)
2748                .expect("session has already been granted write locks");
2749        }
2750
2751        let plan::ReadThenWritePlan {
2752            id,
2753            kind,
2754            selection,
2755            mut assignments,
2756            finishing,
2757            mut returning,
2758        } = plan;
2759
2760        // Read then writes can be queued, so re-verify the id exists.
2761        let desc = match self.catalog().try_get_entry(&id) {
2762            Some(table) => {
2763                // Inserts always occur at the latest version of the table.
2764                table
2765                    .relation_desc_latest()
2766                    .expect("table has a desc")
2767                    .into_owned()
2768            }
2769            None => {
2770                ctx.retire(Err(AdapterError::Catalog(
2771                    mz_catalog::memory::error::Error {
2772                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2773                    },
2774                )));
2775                return;
2776            }
2777        };
2778
2779        // Disallow mz_now in any position because read time and write time differ.
2780        let contains_temporal = selection.contains_temporal()
2781            || assignments.values().any(|e| e.contains_temporal())
2782            || returning.iter().any(|e| e.contains_temporal());
2783        if contains_temporal {
2784            ctx.retire(Err(AdapterError::Unsupported(
2785                "calls to mz_now in write statements",
2786            )));
2787            return;
2788        }
2789
2790        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations.
2791        for gid in selection.depends_on() {
2792            let item_id = self.catalog().resolve_item_id(&gid);
2793            if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2794                ctx.retire(Err(err));
2795                return;
2796            }
2797        }
2798
2799        let (peek_tx, peek_rx) = oneshot::channel();
2800        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2801        let (tx, _, session, extra) = ctx.into_parts();
2802        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2803        // execution context, because this peek does not directly correspond to an execute,
2804        // and so we don't need to take any action on its retirement.
2805        // TODO[btv]: we might consider extending statement logging to log the inner
2806        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2807        // and mint a new "real" execution context here. We'd also have to add some logic to
2808        // make sure such "sub-statements" are always sampled when the top-level statement is
2809        //
2810        // It's debatable whether this makes sense conceptually,
2811        // because the inner fragment here is not actually a
2812        // "statement" in its own right.
2813        let peek_ctx = ExecuteContext::from_parts(
2814            peek_client_tx,
2815            self.internal_cmd_tx.clone(),
2816            session,
2817            Default::default(),
2818        );
2819
2820        self.sequence_peek(
2821            peek_ctx,
2822            plan::SelectPlan {
2823                select: None,
2824                source: selection,
2825                when: QueryWhen::FreshestTableWrite,
2826                finishing,
2827                copy_to: None,
2828            },
2829            TargetCluster::Active,
2830            None,
2831        )
2832        .await;
2833
2834        let internal_cmd_tx = self.internal_cmd_tx.clone();
2835        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2836        let catalog = self.owned_catalog();
2837        let max_result_size = self.catalog().system_config().max_result_size();
2838
2839        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2840            let (peek_response, session) = match peek_rx.await {
2841                Ok(Response {
2842                    result: Ok(resp),
2843                    session,
2844                    otel_ctx,
2845                }) => {
2846                    otel_ctx.attach_as_parent();
2847                    (resp, session)
2848                }
2849                Ok(Response {
2850                    result: Err(e),
2851                    session,
2852                    otel_ctx,
2853                }) => {
2854                    let ctx =
2855                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2856                    otel_ctx.attach_as_parent();
2857                    ctx.retire(Err(e));
2858                    return;
2859                }
2860                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2861                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2862            };
2863            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2864            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2865
2866            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2867            if timeout_dur == Duration::ZERO {
2868                timeout_dur = Duration::MAX;
2869            }
2870
2871            let style = ExprPrepOneShot {
2872                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2873                session: ctx.session(),
2874                catalog_state: catalog.state(),
2875            };
2876            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2877                return_if_err!(style.prep_scalar_expr(expr), ctx);
2878            }
2879
2880            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2881                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2882                    let arena = RowArena::new();
2883                    let mut diffs = Vec::new();
2884                    let mut datum_vec = mz_repr::DatumVec::new();
2885
2886                    while let Some(row) = rows.next() {
2887                        if !assignments.is_empty() {
2888                            assert!(
2889                                matches!(kind, MutationKind::Update),
2890                                "only updates support assignments"
2891                            );
2892                            let mut datums = datum_vec.borrow_with(row);
2893                            let mut updates = vec![];
2894                            for (idx, expr) in &assignments {
2895                                let updated = match expr.eval(&datums, &arena) {
2896                                    Ok(updated) => updated,
2897                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2898                                };
2899                                updates.push((*idx, updated));
2900                            }
2901                            for (idx, new_value) in updates {
2902                                datums[idx] = new_value;
2903                            }
2904                            let updated = Row::pack_slice(&datums);
2905                            diffs.push((updated, Diff::ONE));
2906                        }
2907                        match kind {
2908                            // Updates and deletes always remove the
2909                            // current row. Updates will also add an
2910                            // updated value.
2911                            MutationKind::Update | MutationKind::Delete => {
2912                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2913                            }
2914                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2915                        }
2916                    }
2917
2918                    // Sum of all the rows' byte size, for checking if we go
2919                    // above the max_result_size threshold.
2920                    let mut byte_size: u64 = 0;
2921                    for (row, diff) in &diffs {
2922                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2923                        if diff.is_positive() {
2924                            for (idx, datum) in row.iter().enumerate() {
2925                                desc.constraints_met(idx, &datum)?;
2926                            }
2927                        }
2928                    }
2929                    Ok((diffs, byte_size))
2930                };
2931
2932            let diffs = match peek_response {
2933                ExecuteResponse::SendingRowsStreaming {
2934                    rows: mut rows_stream,
2935                    ..
2936                } => {
2937                    let mut byte_size: u64 = 0;
2938                    let mut diffs = Vec::new();
2939                    let result = loop {
2940                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2941                            Ok(Some(res)) => match res {
2942                                PeekResponseUnary::Rows(new_rows) => {
2943                                    match make_diffs(new_rows) {
2944                                        Ok((mut new_diffs, new_byte_size)) => {
2945                                            byte_size = byte_size.saturating_add(new_byte_size);
2946                                            if byte_size > max_result_size {
2947                                                break Err(AdapterError::ResultSize(format!(
2948                                                    "result exceeds max size of {max_result_size}"
2949                                                )));
2950                                            }
2951                                            diffs.append(&mut new_diffs)
2952                                        }
2953                                        Err(e) => break Err(e),
2954                                    };
2955                                }
2956                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2957                                PeekResponseUnary::Error(e) => {
2958                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2959                                }
2960                                PeekResponseUnary::DependencyDropped(dep) => {
2961                                    break Err(dep.to_concurrent_dependency_drop());
2962                                }
2963                            },
2964                            Ok(None) => break Ok(diffs),
2965                            Err(_) => {
2966                                // We timed out, so remove the pending peek. This is
2967                                // best-effort and doesn't guarantee we won't
2968                                // receive a response.
2969                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2970                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2971                                    conn_id: ctx.session().conn_id().clone(),
2972                                });
2973                                if let Err(e) = result {
2974                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2975                                }
2976                                break Err(AdapterError::StatementTimeout);
2977                            }
2978                        }
2979                    };
2980
2981                    result
2982                }
2983                ExecuteResponse::SendingRowsImmediate { rows } => {
2984                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2985                }
2986                resp => Err(AdapterError::Unstructured(anyhow!(
2987                    "unexpected peek response: {resp:?}"
2988                ))),
2989            };
2990
2991            let mut returning_rows = Vec::new();
2992            let mut diff_err: Option<AdapterError> = None;
2993            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2994                let arena = RowArena::new();
2995                for (row, diff) in diffs {
2996                    if !diff.is_positive() {
2997                        continue;
2998                    }
2999                    let mut returning_row = Row::with_capacity(returning.len());
3000                    let mut packer = returning_row.packer();
3001                    for expr in &returning {
3002                        let datums: Vec<_> = row.iter().collect();
3003                        match expr.eval(&datums, &arena) {
3004                            Ok(datum) => {
3005                                packer.push(datum);
3006                            }
3007                            Err(err) => {
3008                                diff_err = Some(err.into());
3009                                break;
3010                            }
3011                        }
3012                    }
3013                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3014                    let diff = match NonZeroUsize::try_from(diff) {
3015                        Ok(diff) => diff,
3016                        Err(err) => {
3017                            diff_err = Some(err.into());
3018                            break;
3019                        }
3020                    };
3021                    returning_rows.push((returning_row, diff));
3022                    if diff_err.is_some() {
3023                        break;
3024                    }
3025                }
3026            }
3027            let diffs = if let Some(err) = diff_err {
3028                Err(err)
3029            } else {
3030                diffs
3031            };
3032
3033            // We need to clear out the timestamp context so the write doesn't fail due to a
3034            // read only transaction.
3035            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3036            // No matter what isolation level the client is using, we must linearize this
3037            // read. The write will be performed right after this, as part of a single
3038            // transaction, so the write must have a timestamp greater than or equal to the
3039            // read.
3040            //
3041            // Note: It's only OK for the write to have a greater timestamp than the read
3042            // because the write lock prevents any other writes from happening in between
3043            // the read and write.
3044            if let Some(timestamp_context) = timestamp_context {
3045                let (tx, rx) = tokio::sync::oneshot::channel();
3046                let conn_id = ctx.session().conn_id().clone();
3047                let pending_read_txn = PendingReadTxn {
3048                    txn: PendingRead::ReadThenWrite { ctx, tx },
3049                    timestamp_context,
3050                    created: Instant::now(),
3051                    num_requeues: 0,
3052                    otel_ctx: OpenTelemetryContext::obtain(),
3053                };
3054                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3055                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3056                if let Err(e) = result {
3057                    warn!(
3058                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3059                        e
3060                    );
3061                    return;
3062                }
3063                let result = rx.await;
3064                // It is not an error for these results to be ready after `tx` has been dropped.
3065                ctx = match result {
3066                    Ok(Some(ctx)) => ctx,
3067                    Ok(None) => {
3068                        // Coordinator took our context and will handle responding to the client.
3069                        // This usually indicates that our transaction was aborted.
3070                        return;
3071                    }
3072                    Err(e) => {
3073                        warn!(
3074                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3075                            e
3076                        );
3077                        return;
3078                    }
3079                };
3080            }
3081
3082            match diffs {
3083                Ok(diffs) => {
3084                    let result = Self::send_diffs(
3085                        ctx.session_mut(),
3086                        plan::SendDiffsPlan {
3087                            id,
3088                            updates: diffs,
3089                            kind,
3090                            returning: returning_rows,
3091                            max_result_size,
3092                        },
3093                    );
3094                    ctx.retire(result);
3095                }
3096                Err(e) => {
3097                    ctx.retire(Err(e));
3098                }
3099            }
3100        });
3101    }
3102
3103    #[instrument]
3104    pub(super) async fn sequence_alter_item_rename(
3105        &mut self,
3106        ctx: &mut ExecuteContext,
3107        plan: plan::AlterItemRenamePlan,
3108    ) -> Result<ExecuteResponse, AdapterError> {
3109        let op = catalog::Op::RenameItem {
3110            id: plan.id,
3111            current_full_name: plan.current_full_name,
3112            to_name: plan.to_name,
3113        };
3114        match self
3115            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3116            .await
3117        {
3118            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3119            Err(err) => Err(err),
3120        }
3121    }
3122
3123    #[instrument]
3124    pub(super) async fn sequence_alter_retain_history(
3125        &mut self,
3126        ctx: &mut ExecuteContext,
3127        plan: plan::AlterRetainHistoryPlan,
3128    ) -> Result<ExecuteResponse, AdapterError> {
3129        let ops = vec![catalog::Op::AlterRetainHistory {
3130            id: plan.id,
3131            value: plan.value,
3132            window: plan.window,
3133        }];
3134        self.catalog_transact_with_context(None, Some(ctx), ops)
3135            .await?;
3136        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3137    }
3138
3139    #[instrument]
3140    pub(super) async fn sequence_alter_source_timestamp_interval(
3141        &mut self,
3142        ctx: &mut ExecuteContext,
3143        plan: plan::AlterSourceTimestampIntervalPlan,
3144    ) -> Result<ExecuteResponse, AdapterError> {
3145        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3146            id: plan.id,
3147            value: plan.value,
3148            interval: plan.interval,
3149        }];
3150        self.catalog_transact_with_context(None, Some(ctx), ops)
3151            .await?;
3152        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3153    }
3154
3155    #[instrument]
3156    pub(super) async fn sequence_alter_schema_rename(
3157        &mut self,
3158        ctx: &mut ExecuteContext,
3159        plan: plan::AlterSchemaRenamePlan,
3160    ) -> Result<ExecuteResponse, AdapterError> {
3161        let (database_spec, schema_spec) = plan.cur_schema_spec;
3162        let op = catalog::Op::RenameSchema {
3163            database_spec,
3164            schema_spec,
3165            new_name: plan.new_schema_name,
3166            check_reserved_names: true,
3167        };
3168        match self
3169            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3170            .await
3171        {
3172            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3173            Err(err) => Err(err),
3174        }
3175    }
3176
3177    #[instrument]
3178    pub(super) async fn sequence_alter_schema_swap(
3179        &mut self,
3180        ctx: &mut ExecuteContext,
3181        plan: plan::AlterSchemaSwapPlan,
3182    ) -> Result<ExecuteResponse, AdapterError> {
3183        let plan::AlterSchemaSwapPlan {
3184            schema_a_spec: (schema_a_db, schema_a),
3185            schema_a_name,
3186            schema_b_spec: (schema_b_db, schema_b),
3187            schema_b_name,
3188            name_temp,
3189        } = plan;
3190
3191        let op_a = catalog::Op::RenameSchema {
3192            database_spec: schema_a_db,
3193            schema_spec: schema_a,
3194            new_name: name_temp,
3195            check_reserved_names: false,
3196        };
3197        let op_b = catalog::Op::RenameSchema {
3198            database_spec: schema_b_db,
3199            schema_spec: schema_b,
3200            new_name: schema_a_name,
3201            check_reserved_names: false,
3202        };
3203        let op_c = catalog::Op::RenameSchema {
3204            database_spec: schema_a_db,
3205            schema_spec: schema_a,
3206            new_name: schema_b_name,
3207            check_reserved_names: false,
3208        };
3209
3210        match self
3211            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3212                Box::pin(async {})
3213            })
3214            .await
3215        {
3216            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3217            Err(err) => Err(err),
3218        }
3219    }
3220
3221    #[instrument]
3222    pub(super) async fn sequence_alter_role(
3223        &mut self,
3224        session: &Session,
3225        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3226    ) -> Result<ExecuteResponse, AdapterError> {
3227        let catalog = self.catalog().for_session(session);
3228        let role = catalog.get_role(&id);
3229
3230        // We'll send these notices to the user, if the operation is successful.
3231        let mut notices = vec![];
3232
3233        // Get the attributes and variables from the role, as they currently are.
3234        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3235        let mut vars = role.vars().clone();
3236
3237        // Whether to set the password to NULL. This is a special case since the existing
3238        // password is not stored in the role attributes.
3239        let mut nopassword = false;
3240
3241        // Apply our updates.
3242        match option {
3243            PlannedAlterRoleOption::Attributes(attrs) => {
3244                self.validate_role_attributes(&attrs.clone().into())?;
3245
3246                if let Some(inherit) = attrs.inherit {
3247                    attributes.inherit = inherit;
3248                }
3249
3250                if let Some(password) = attrs.password {
3251                    attributes.password = Some(password);
3252                    attributes.scram_iterations =
3253                        Some(self.catalog().system_config().scram_iterations())
3254                }
3255
3256                if let Some(superuser) = attrs.superuser {
3257                    attributes.superuser = Some(superuser);
3258                }
3259
3260                if let Some(login) = attrs.login {
3261                    attributes.login = Some(login);
3262                }
3263
3264                if attrs.nopassword.unwrap_or(false) {
3265                    nopassword = true;
3266                }
3267
3268                if let Some(notice) = self.should_emit_rbac_notice(session) {
3269                    notices.push(notice);
3270                }
3271            }
3272            PlannedAlterRoleOption::Variable(variable) => {
3273                // Get the variable to make sure it's valid and visible.
3274                let session_var = session.vars().inspect(variable.name())?;
3275                // Return early if it's not visible.
3276                session_var.visible(session.user(), catalog.system_vars())?;
3277
3278                // Emit a warning when deprecated variables are used.
3279                // TODO(database-issues#8069) remove this after sufficient time has passed
3280                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3281                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3282                } else if let PlannedRoleVariable::Set {
3283                    name,
3284                    value: VariableValue::Values(vals),
3285                } = &variable
3286                {
3287                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3288                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3289                    }
3290                }
3291
3292                let var_name = match variable {
3293                    PlannedRoleVariable::Set { name, value } => {
3294                        // Update our persisted set.
3295                        match &value {
3296                            VariableValue::Default => {
3297                                vars.remove(&name);
3298                            }
3299                            VariableValue::Values(vals) => {
3300                                let var = match &vals[..] {
3301                                    [val] => OwnedVarInput::Flat(val.clone()),
3302                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3303                                };
3304                                // Make sure the input is valid.
3305                                session_var.check(var.borrow())?;
3306
3307                                vars.insert(name.clone(), var);
3308                            }
3309                        };
3310                        name
3311                    }
3312                    PlannedRoleVariable::Reset { name } => {
3313                        // Remove it from our persisted values.
3314                        vars.remove(&name);
3315                        name
3316                    }
3317                };
3318
3319                // Emit a notice that they need to reconnect to see the change take effect.
3320                notices.push(AdapterNotice::VarDefaultUpdated {
3321                    role: Some(name.clone()),
3322                    var_name: Some(var_name),
3323                });
3324            }
3325        }
3326
3327        let op = catalog::Op::AlterRole {
3328            id,
3329            name,
3330            attributes,
3331            nopassword,
3332            vars: RoleVars { map: vars },
3333        };
3334        let response = self
3335            .catalog_transact(Some(session), vec![op])
3336            .await
3337            .map(|_| ExecuteResponse::AlteredRole)?;
3338
3339        // Send all of our queued notices.
3340        session.add_notices(notices);
3341
3342        Ok(response)
3343    }
3344
3345    #[instrument]
3346    pub(super) async fn sequence_alter_sink_prepare(
3347        &mut self,
3348        ctx: ExecuteContext,
3349        plan: plan::AlterSinkPlan,
3350    ) {
3351        // Put a read hold on the new relation
3352        let id_bundle = crate::CollectionIdBundle {
3353            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3354            compute_ids: BTreeMap::new(),
3355        };
3356        let read_hold = self.acquire_read_holds(&id_bundle);
3357
3358        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3359            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3360            return;
3361        };
3362
3363        let otel_ctx = OpenTelemetryContext::obtain();
3364        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3365
3366        let plan_validity = PlanValidity::new(
3367            self.catalog().transient_revision(),
3368            BTreeSet::from_iter([plan.item_id, from_item_id]),
3369            Some(plan.in_cluster),
3370            None,
3371            ctx.session().role_metadata().clone(),
3372        );
3373
3374        info!(
3375            "preparing alter sink for {}: frontiers={:?} export={:?}",
3376            plan.global_id,
3377            self.controller
3378                .storage_collections
3379                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3380            self.controller.storage.export(plan.global_id)
3381        );
3382
3383        // Now we must wait for the sink to make enough progress such that there is overlap between
3384        // the new `from` collection's read hold and the sink's write frontier.
3385        //
3386        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3387        // the watch set never completes and neither does the `ALTER SINK` command.
3388        self.install_storage_watch_set(
3389            ctx.session().conn_id().clone(),
3390            BTreeSet::from_iter([plan.global_id]),
3391            read_ts,
3392            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3393                ctx: Some(ctx),
3394                otel_ctx,
3395                plan,
3396                plan_validity,
3397                read_hold,
3398            }),
3399        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3400    }
3401
3402    #[instrument]
3403    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3404        ctx.otel_ctx.attach_as_parent();
3405
3406        let plan::AlterSinkPlan {
3407            item_id,
3408            global_id,
3409            sink: sink_plan,
3410            with_snapshot,
3411            in_cluster,
3412        } = ctx.plan.clone();
3413
3414        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3415        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3416        // arbitrarily changed since we performed planning, and we must re-assert that it still
3417        // matches our requirements.
3418        //
3419        // The `PlanValidity` check ensures that both the sink and the new source relation still
3420        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3421        // time, which we do by comparing the catalog sink version to the one in the plan.
3422        match ctx.plan_validity.check(self.catalog()) {
3423            Ok(()) => {}
3424            Err(err) => {
3425                ctx.retire(Err(err));
3426                return;
3427            }
3428        }
3429
3430        let entry = self.catalog().get_entry(&item_id);
3431        let CatalogItem::Sink(old_sink) = entry.item() else {
3432            panic!("invalid item kind for `AlterSinkPlan`");
3433        };
3434
3435        if sink_plan.version != old_sink.version + 1 {
3436            ctx.retire(Err(AdapterError::ChangedPlan(
3437                "sink was altered concurrently".into(),
3438            )));
3439            return;
3440        }
3441
3442        info!(
3443            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3444            self.controller
3445                .storage_collections
3446                .collections_frontiers(vec![global_id, sink_plan.from]),
3447            self.controller.storage.export(global_id),
3448        );
3449
3450        // Assert that we can recover the updates that happened at the timestamps of the write
3451        // frontier. This must be true in this call.
3452        let write_frontier = &self
3453            .controller
3454            .storage
3455            .export(global_id)
3456            .expect("sink known to exist")
3457            .write_frontier;
3458        let as_of = ctx.read_hold.least_valid_read();
3459        assert!(
3460            write_frontier.iter().all(|t| as_of.less_than(t)),
3461            "{:?} should be strictly less than {:?}",
3462            &*as_of,
3463            &**write_frontier
3464        );
3465
3466        // Parse the `create_sql` so we can update it to the new sink definition.
3467        //
3468        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3469        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3470        // names in the `create_sql` may have changed, for example due to a schema swap.
3471        let create_sql = &old_sink.create_sql;
3472        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3473        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3474            unreachable!("invalid statement kind for sink");
3475        };
3476
3477        // Update the sink version.
3478        stmt.with_options
3479            .retain(|o| o.name != CreateSinkOptionName::Version);
3480        stmt.with_options.push(CreateSinkOption {
3481            name: CreateSinkOptionName::Version,
3482            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3483                sink_plan.version.to_string(),
3484            ))),
3485        });
3486
3487        let conn_catalog = self.catalog().for_system_session();
3488        let (mut stmt, resolved_ids) =
3489            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3490
3491        // Update the `from` relation.
3492        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3493        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3494        stmt.from = ResolvedItemName::Item {
3495            id: from_entry.id(),
3496            qualifiers: from_entry.name.qualifiers.clone(),
3497            full_name,
3498            print_id: true,
3499            version: from_entry.version,
3500        };
3501
3502        let new_sink = Sink {
3503            create_sql: stmt.to_ast_string_stable(),
3504            global_id,
3505            from: sink_plan.from,
3506            connection: sink_plan.connection.clone(),
3507            envelope: sink_plan.envelope,
3508            version: sink_plan.version,
3509            with_snapshot,
3510            resolved_ids: resolved_ids.clone(),
3511            cluster_id: in_cluster,
3512            commit_interval: sink_plan.commit_interval,
3513        };
3514
3515        let ops = vec![catalog::Op::UpdateItem {
3516            id: item_id,
3517            name: entry.name().clone(),
3518            to_item: CatalogItem::Sink(new_sink),
3519        }];
3520
3521        match self
3522            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3523            .await
3524        {
3525            Ok(()) => {}
3526            Err(err) => {
3527                ctx.retire(Err(err));
3528                return;
3529            }
3530        }
3531
3532        let storage_sink_desc = StorageSinkDesc {
3533            from: sink_plan.from,
3534            from_desc: from_entry
3535                .relation_desc()
3536                .expect("sinks can only be built on items with descs")
3537                .into_owned(),
3538            connection: sink_plan
3539                .connection
3540                .clone()
3541                .into_inline_connection(self.catalog().state()),
3542            envelope: sink_plan.envelope,
3543            as_of,
3544            with_snapshot,
3545            version: sink_plan.version,
3546            from_storage_metadata: (),
3547            to_storage_metadata: (),
3548            commit_interval: sink_plan.commit_interval,
3549        };
3550
3551        self.controller
3552            .storage
3553            .alter_export(
3554                global_id,
3555                ExportDescription {
3556                    sink: storage_sink_desc,
3557                    instance_id: in_cluster,
3558                },
3559            )
3560            .await
3561            .unwrap_or_terminate("cannot fail to alter source desc");
3562
3563        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3564    }
3565
3566    #[instrument]
3567    pub(super) async fn sequence_alter_connection(
3568        &mut self,
3569        ctx: ExecuteContext,
3570        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3571    ) {
3572        match action {
3573            AlterConnectionAction::RotateKeys => {
3574                self.sequence_rotate_keys(ctx, id).await;
3575            }
3576            AlterConnectionAction::AlterOptions {
3577                set_options,
3578                drop_options,
3579                validate,
3580            } => {
3581                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3582                    .await
3583            }
3584        }
3585    }
3586
3587    #[instrument]
3588    async fn sequence_alter_connection_options(
3589        &mut self,
3590        mut ctx: ExecuteContext,
3591        id: CatalogItemId,
3592        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3593        drop_options: BTreeSet<ConnectionOptionName>,
3594        validate: bool,
3595    ) {
3596        let cur_entry = self.catalog().get_entry(&id);
3597        let cur_conn = cur_entry.connection().expect("known to be connection");
3598        let connection_gid = cur_conn.global_id();
3599
3600        let inner = || -> Result<Connection, AdapterError> {
3601            // Parse statement.
3602            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3603                .expect("invalid create sql persisted to catalog")
3604                .into_element()
3605                .ast
3606            {
3607                Statement::CreateConnection(stmt) => stmt,
3608                _ => unreachable!("proved type is source"),
3609            };
3610
3611            let catalog = self.catalog().for_system_session();
3612
3613            // Resolve items in statement
3614            let (mut create_conn_stmt, resolved_ids) =
3615                mz_sql::names::resolve(&catalog, create_conn_stmt)
3616                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3617
3618            // Retain options that are neither set nor dropped.
3619            create_conn_stmt
3620                .values
3621                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3622
3623            // Set new values
3624            create_conn_stmt.values.extend(
3625                set_options
3626                    .into_iter()
3627                    .map(|(name, value)| ConnectionOption { name, value }),
3628            );
3629
3630            // Open a new catalog, which we will use to re-plan our
3631            // statement with the desired config.
3632            let mut catalog = self.catalog().for_system_session();
3633            catalog.mark_id_unresolvable_for_replanning(id);
3634
3635            // Re-define our source in terms of the amended statement
3636            let plan = match mz_sql::plan::plan(
3637                None,
3638                &catalog,
3639                Statement::CreateConnection(create_conn_stmt),
3640                &Params::empty(),
3641                &resolved_ids,
3642            )
3643            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3644            {
3645                (Plan::CreateConnection(plan), _sql_impl_ids) => plan,
3646                (p, _) => {
3647                    unreachable!("create connection plan is only valid response, got {:?}", p)
3648                }
3649            };
3650
3651            // Parse statement.
3652            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3653                .expect("invalid create sql persisted to catalog")
3654                .into_element()
3655                .ast
3656            {
3657                Statement::CreateConnection(stmt) => stmt,
3658                _ => unreachable!("proved type is source"),
3659            };
3660
3661            let catalog = self.catalog().for_system_session();
3662
3663            // Resolve items in statement
3664            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3665                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3666
3667            Ok(Connection {
3668                create_sql: plan.connection.create_sql,
3669                global_id: cur_conn.global_id,
3670                details: plan.connection.details,
3671                resolved_ids: new_deps,
3672            })
3673        };
3674
3675        let conn = match inner() {
3676            Ok(conn) => conn,
3677            Err(e) => {
3678                return ctx.retire(Err(e));
3679            }
3680        };
3681
3682        // Inspect guarded secrets whether or not validation was requested,
3683        // before the altered connection is installed in the catalog.
3684        if let Err(err) = self
3685            .check_connection_secret_content_guards(&conn.details)
3686            .await
3687        {
3688            return ctx.retire(Err(err));
3689        }
3690
3691        if validate {
3692            let connection = conn
3693                .details
3694                .to_connection()
3695                .into_inline_connection(self.catalog().state());
3696
3697            let internal_cmd_tx = self.internal_cmd_tx.clone();
3698            let transient_revision = self.catalog().transient_revision();
3699            let conn_id = ctx.session().conn_id().clone();
3700            let otel_ctx = OpenTelemetryContext::obtain();
3701            let role_metadata = ctx.session().role_metadata().clone();
3702            let current_storage_parameters = self.controller.storage.config().clone();
3703
3704            task::spawn(
3705                || format!("validate_alter_connection:{conn_id}"),
3706                async move {
3707                    let resolved_ids = conn.resolved_ids.clone();
3708                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3709                    let result = match std::panic::AssertUnwindSafe(
3710                        connection.validate(id, &current_storage_parameters),
3711                    )
3712                    .ore_catch_unwind()
3713                    .await
3714                    {
3715                        Ok(Ok(())) => Ok(conn),
3716                        Ok(Err(err)) => Err(err.into()),
3717                        Err(_panic) => {
3718                            tracing::error!("alter connection validation panicked");
3719                            Err(AdapterError::Internal(
3720                                "connection validation panicked".into(),
3721                            ))
3722                        }
3723                    };
3724
3725                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3726                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3727                        AlterConnectionValidationReady {
3728                            ctx,
3729                            result,
3730                            connection_id: id,
3731                            connection_gid,
3732                            plan_validity: PlanValidity::new(
3733                                transient_revision,
3734                                dependency_ids.clone(),
3735                                None,
3736                                None,
3737                                role_metadata,
3738                            ),
3739                            otel_ctx,
3740                            resolved_ids,
3741                        },
3742                    ));
3743                    if let Err(e) = result {
3744                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3745                    }
3746                },
3747            );
3748        } else {
3749            let result = self
3750                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3751                .await;
3752            ctx.retire(result);
3753        }
3754    }
3755
3756    #[instrument]
3757    pub(crate) async fn sequence_alter_connection_stage_finish(
3758        &mut self,
3759        session: &Session,
3760        id: CatalogItemId,
3761        connection: Connection,
3762    ) -> Result<ExecuteResponse, AdapterError> {
3763        match self.catalog.get_entry(&id).item() {
3764            CatalogItem::Connection(curr_conn) => {
3765                curr_conn
3766                    .details
3767                    .to_connection()
3768                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3769                    .map_err(StorageError::from)?;
3770            }
3771            _ => unreachable!("known to be a connection"),
3772        };
3773
3774        let ops = vec![catalog::Op::UpdateItem {
3775            id,
3776            name: self.catalog.get_entry(&id).name().clone(),
3777            to_item: CatalogItem::Connection(connection.clone()),
3778        }];
3779
3780        self.catalog_transact(Some(session), ops).await?;
3781
3782        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3783        // and propagating connection changes to dependent sources, sinks, and
3784        // tables) is handled in `apply_catalog_implications` via
3785        // `handle_alter_connection`. The catalog transact above triggers that
3786        // code path.
3787
3788        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3789    }
3790
3791    #[instrument]
3792    pub(super) async fn sequence_alter_source(
3793        &mut self,
3794        session: &Session,
3795        plan::AlterSourcePlan {
3796            item_id,
3797            ingestion_id,
3798            action,
3799        }: plan::AlterSourcePlan,
3800    ) -> Result<ExecuteResponse, AdapterError> {
3801        let cur_entry = self.catalog().get_entry(&item_id);
3802        let cur_source = cur_entry.source().expect("known to be source");
3803
3804        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3805            // Parse statement.
3806            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3807                .expect("invalid create sql persisted to catalog")
3808                .into_element()
3809                .ast
3810            {
3811                Statement::CreateSource(stmt) => stmt,
3812                _ => unreachable!("proved type is source"),
3813            };
3814
3815            let catalog = coord.catalog().for_system_session();
3816
3817            // Resolve items in statement
3818            mz_sql::names::resolve(&catalog, create_source_stmt)
3819                .map_err(|e| AdapterError::internal(err_cx, e))
3820        };
3821
3822        match action {
3823            plan::AlterSourceAction::AddSubsourceExports {
3824                subsources,
3825                options,
3826            } => {
3827                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3828
3829                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3830                    text_columns: mut new_text_columns,
3831                    exclude_columns: mut new_exclude_columns,
3832                    ..
3833                } = options.try_into()?;
3834
3835                // Resolve items in statement
3836                let (mut create_source_stmt, resolved_ids) =
3837                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3838
3839                // Get all currently referred-to items
3840                let catalog = self.catalog();
3841                let curr_references: BTreeSet<_> = catalog
3842                    .get_entry(&item_id)
3843                    .used_by()
3844                    .into_iter()
3845                    .filter_map(|subsource| {
3846                        catalog
3847                            .get_entry(subsource)
3848                            .subsource_details()
3849                            .map(|(_id, reference, _details)| reference)
3850                    })
3851                    .collect();
3852
3853                // We are doing a lot of unwrapping, so just make an error to reference; all of
3854                // these invariants are guaranteed to be true because of how we plan subsources.
3855                let purification_err =
3856                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3857
3858                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3859                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3860                // statement.
3861                match &mut create_source_stmt.connection {
3862                    CreateSourceConnection::Postgres {
3863                        options: curr_options,
3864                        ..
3865                    } => {
3866                        let mz_sql::plan::PgConfigOptionExtracted {
3867                            mut text_columns, ..
3868                        } = curr_options.clone().try_into()?;
3869
3870                        // Drop text columns; we will add them back in
3871                        // as appropriate below.
3872                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3873
3874                        // Drop all text columns that are not currently referred to.
3875                        text_columns.retain(|column_qualified_reference| {
3876                            mz_ore::soft_assert_eq_or_log!(
3877                                column_qualified_reference.0.len(),
3878                                4,
3879                                "all TEXT COLUMNS values must be column-qualified references"
3880                            );
3881                            let mut table = column_qualified_reference.clone();
3882                            table.0.truncate(3);
3883                            curr_references.contains(&table)
3884                        });
3885
3886                        // Merge the current text columns into the new text columns.
3887                        new_text_columns.extend(text_columns);
3888
3889                        // If we have text columns, add them to the options.
3890                        if !new_text_columns.is_empty() {
3891                            new_text_columns.sort();
3892                            let new_text_columns = new_text_columns
3893                                .into_iter()
3894                                .map(WithOptionValue::UnresolvedItemName)
3895                                .collect();
3896
3897                            curr_options.push(PgConfigOption {
3898                                name: PgConfigOptionName::TextColumns,
3899                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3900                            });
3901                        }
3902                    }
3903                    CreateSourceConnection::MySql {
3904                        options: curr_options,
3905                        ..
3906                    } => {
3907                        let mz_sql::plan::MySqlConfigOptionExtracted {
3908                            mut text_columns,
3909                            mut exclude_columns,
3910                            ..
3911                        } = curr_options.clone().try_into()?;
3912
3913                        // Drop both ignore and text columns; we will add them back in
3914                        // as appropriate below.
3915                        curr_options.retain(|o| {
3916                            !matches!(
3917                                o.name,
3918                                MySqlConfigOptionName::TextColumns
3919                                    | MySqlConfigOptionName::ExcludeColumns
3920                            )
3921                        });
3922
3923                        // Drop all text / exclude columns that are not currently referred to.
3924                        let column_referenced =
3925                            |column_qualified_reference: &UnresolvedItemName| {
3926                                mz_ore::soft_assert_eq_or_log!(
3927                                    column_qualified_reference.0.len(),
3928                                    3,
3929                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3930                                );
3931                                let mut table = column_qualified_reference.clone();
3932                                table.0.truncate(2);
3933                                curr_references.contains(&table)
3934                            };
3935                        text_columns.retain(column_referenced);
3936                        exclude_columns.retain(column_referenced);
3937
3938                        // Merge the current text / exclude columns into the new text / exclude columns.
3939                        new_text_columns.extend(text_columns);
3940                        new_exclude_columns.extend(exclude_columns);
3941
3942                        // If we have text columns, add them to the options.
3943                        if !new_text_columns.is_empty() {
3944                            new_text_columns.sort();
3945                            let new_text_columns = new_text_columns
3946                                .into_iter()
3947                                .map(WithOptionValue::UnresolvedItemName)
3948                                .collect();
3949
3950                            curr_options.push(MySqlConfigOption {
3951                                name: MySqlConfigOptionName::TextColumns,
3952                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3953                            });
3954                        }
3955                        // If we have exclude columns, add them to the options.
3956                        if !new_exclude_columns.is_empty() {
3957                            new_exclude_columns.sort();
3958                            let new_exclude_columns = new_exclude_columns
3959                                .into_iter()
3960                                .map(WithOptionValue::UnresolvedItemName)
3961                                .collect();
3962
3963                            curr_options.push(MySqlConfigOption {
3964                                name: MySqlConfigOptionName::ExcludeColumns,
3965                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3966                            });
3967                        }
3968                    }
3969                    CreateSourceConnection::SqlServer {
3970                        options: curr_options,
3971                        ..
3972                    } => {
3973                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3974                            mut text_columns,
3975                            mut exclude_columns,
3976                            ..
3977                        } = curr_options.clone().try_into()?;
3978
3979                        // Drop both ignore and text columns; we will add them back in
3980                        // as appropriate below.
3981                        curr_options.retain(|o| {
3982                            !matches!(
3983                                o.name,
3984                                SqlServerConfigOptionName::TextColumns
3985                                    | SqlServerConfigOptionName::ExcludeColumns
3986                            )
3987                        });
3988
3989                        // Drop all text / exclude columns that are not currently referred to.
3990                        // SQL Server text/exclude column refs are 3-part (schema.table.col),
3991                        // which truncate to 2-part (schema.table). But external references
3992                        // are 3-part (database.schema.table). Use suffix matching since
3993                        // a SQL Server source connects to a single database.
3994                        let column_referenced =
3995                            |column_qualified_reference: &UnresolvedItemName| {
3996                                mz_ore::soft_assert_eq_or_log!(
3997                                    column_qualified_reference.0.len(),
3998                                    3,
3999                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4000                                );
4001                                let mut table = column_qualified_reference.clone();
4002                                table.0.truncate(2);
4003                                curr_references.iter().any(|r| r.0.ends_with(&table.0))
4004                            };
4005                        text_columns.retain(column_referenced);
4006                        exclude_columns.retain(column_referenced);
4007
4008                        // Merge the current text / exclude columns into the new text / exclude columns.
4009                        new_text_columns.extend(text_columns);
4010                        new_exclude_columns.extend(exclude_columns);
4011
4012                        // If we have text columns, add them to the options.
4013                        if !new_text_columns.is_empty() {
4014                            new_text_columns.sort();
4015                            let new_text_columns = new_text_columns
4016                                .into_iter()
4017                                .map(WithOptionValue::UnresolvedItemName)
4018                                .collect();
4019
4020                            curr_options.push(SqlServerConfigOption {
4021                                name: SqlServerConfigOptionName::TextColumns,
4022                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4023                            });
4024                        }
4025                        // If we have exclude columns, add them to the options.
4026                        if !new_exclude_columns.is_empty() {
4027                            new_exclude_columns.sort();
4028                            let new_exclude_columns = new_exclude_columns
4029                                .into_iter()
4030                                .map(WithOptionValue::UnresolvedItemName)
4031                                .collect();
4032
4033                            curr_options.push(SqlServerConfigOption {
4034                                name: SqlServerConfigOptionName::ExcludeColumns,
4035                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4036                            });
4037                        }
4038                    }
4039                    _ => return Err(purification_err()),
4040                };
4041
4042                let mut catalog = self.catalog().for_system_session();
4043                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4044
4045                // Re-define our source in terms of the amended statement
4046                let planned = mz_sql::plan::plan(
4047                    None,
4048                    &catalog,
4049                    Statement::CreateSource(create_source_stmt),
4050                    &Params::empty(),
4051                    &resolved_ids,
4052                )
4053                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4054                let plan = match planned {
4055                    (Plan::CreateSource(plan), _sql_impl_ids) => plan,
4056                    (p, _) => {
4057                        unreachable!("create source plan is only valid response, got {:?}", p)
4058                    }
4059                };
4060
4061                // Asserting that we've done the right thing with dependencies
4062                // here requires mocking out objects in the catalog, which is a
4063                // large task for an operation we have to cover in tests anyway.
4064                let source = Source::new(
4065                    plan,
4066                    cur_source.global_id,
4067                    resolved_ids,
4068                    cur_source.custom_logical_compaction_window,
4069                    cur_source.is_retained_metrics_object,
4070                );
4071
4072                // Get new ingestion description for storage.
4073                let desc = match &source.data_source {
4074                    DataSourceDesc::Ingestion { desc, .. }
4075                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4076                        desc.clone().into_inline_connection(self.catalog().state())
4077                    }
4078                    _ => unreachable!("already verified of type ingestion"),
4079                };
4080
4081                self.controller
4082                    .storage
4083                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4084                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4085
4086                // Redefine source. This must be done before we create any new
4087                // subsources so that it has the right ingestion.
4088                let mut ops = vec![catalog::Op::UpdateItem {
4089                    id: item_id,
4090                    // Look this up again so we don't have to hold an immutable reference to the
4091                    // entry for so long.
4092                    name: self.catalog.get_entry(&item_id).name().clone(),
4093                    to_item: CatalogItem::Source(source),
4094                }];
4095
4096                let CreateSourceInner {
4097                    ops: new_ops,
4098                    sources: _,
4099                    if_not_exists_ids,
4100                } = self.create_source_inner(session, subsources).await?;
4101
4102                ops.extend(new_ops.into_iter());
4103
4104                assert!(
4105                    if_not_exists_ids.is_empty(),
4106                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4107                );
4108
4109                self.catalog_transact(Some(session), ops).await?;
4110            }
4111            plan::AlterSourceAction::RefreshReferences { references } => {
4112                self.catalog_transact(
4113                    Some(session),
4114                    vec![catalog::Op::UpdateSourceReferences {
4115                        source_id: item_id,
4116                        references: references.into(),
4117                    }],
4118                )
4119                .await?;
4120            }
4121        }
4122
4123        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4124    }
4125
4126    #[instrument]
4127    pub(super) async fn sequence_alter_system_set(
4128        &mut self,
4129        session: &Session,
4130        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4131    ) -> Result<ExecuteResponse, AdapterError> {
4132        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4133        // We want to ensure that the network policy we're switching too actually exists.
4134        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4135            self.validate_alter_system_network_policy(session, &value)?;
4136        }
4137
4138        let op = match value {
4139            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4140                name: name.clone(),
4141                value: OwnedVarInput::SqlSet(values),
4142            },
4143            plan::VariableValue::Default => {
4144                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4145            }
4146        };
4147        self.catalog_transact(Some(session), vec![op]).await?;
4148
4149        session.add_notice(AdapterNotice::VarDefaultUpdated {
4150            role: None,
4151            var_name: Some(name),
4152        });
4153        Ok(ExecuteResponse::AlteredSystemConfiguration)
4154    }
4155
4156    #[instrument]
4157    pub(super) async fn sequence_alter_system_reset(
4158        &mut self,
4159        session: &Session,
4160        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4161    ) -> Result<ExecuteResponse, AdapterError> {
4162        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4163        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4164        self.catalog_transact(Some(session), vec![op]).await?;
4165        session.add_notice(AdapterNotice::VarDefaultUpdated {
4166            role: None,
4167            var_name: Some(name),
4168        });
4169        Ok(ExecuteResponse::AlteredSystemConfiguration)
4170    }
4171
4172    #[instrument]
4173    pub(super) async fn sequence_alter_system_reset_all(
4174        &mut self,
4175        session: &Session,
4176        _: plan::AlterSystemResetAllPlan,
4177    ) -> Result<ExecuteResponse, AdapterError> {
4178        self.is_user_allowed_to_alter_system(session, None)?;
4179        let op = catalog::Op::ResetAllSystemConfiguration;
4180        self.catalog_transact(Some(session), vec![op]).await?;
4181        session.add_notice(AdapterNotice::VarDefaultUpdated {
4182            role: None,
4183            var_name: None,
4184        });
4185        Ok(ExecuteResponse::AlteredSystemConfiguration)
4186    }
4187
4188    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4189    fn is_user_allowed_to_alter_system(
4190        &self,
4191        session: &Session,
4192        var_name: Option<&str>,
4193    ) -> Result<(), AdapterError> {
4194        match (session.user().kind(), var_name) {
4195            // Only internal superusers can reset all system variables.
4196            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4197            // Whether or not a variable can be modified depends if we're an internal superuser.
4198            (UserKind::Superuser, Some(name))
4199                if session.user().is_internal()
4200                    || self.catalog().system_config().user_modifiable(name) =>
4201            {
4202                // In lieu of plumbing the user to all system config functions, just check that
4203                // the var is visible.
4204                let var = self.catalog().system_config().get(name)?;
4205                var.visible(session.user(), self.catalog().system_config())?;
4206                Ok(())
4207            }
4208            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4209            // session variables.
4210            (UserKind::Regular, Some(name))
4211                if self.catalog().system_config().user_modifiable(name) =>
4212            {
4213                Err(AdapterError::Unauthorized(
4214                    rbac::UnauthorizedError::Superuser {
4215                        action: format!("toggle the '{name}' system configuration parameter"),
4216                    },
4217                ))
4218            }
4219            _ => Err(AdapterError::Unauthorized(
4220                rbac::UnauthorizedError::MzSystem {
4221                    action: "alter system".into(),
4222                },
4223            )),
4224        }
4225    }
4226
4227    fn validate_alter_system_network_policy(
4228        &self,
4229        session: &Session,
4230        policy_value: &plan::VariableValue,
4231    ) -> Result<(), AdapterError> {
4232        let policy_name = match &policy_value {
4233            // Make sure the compiled in default still exists.
4234            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4235            plan::VariableValue::Values(values) if values.len() == 1 => {
4236                values.iter().next().cloned()
4237            }
4238            plan::VariableValue::Values(values) => {
4239                tracing::warn!(?values, "can't set multiple network policies at once");
4240                None
4241            }
4242        };
4243        let maybe_network_policy = policy_name
4244            .as_ref()
4245            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4246        let Some(network_policy) = maybe_network_policy else {
4247            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4248                VarError::InvalidParameterValue {
4249                    name: NETWORK_POLICY.name(),
4250                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4251                    reason: "no network policy with such name exists".to_string(),
4252                },
4253            )));
4254        };
4255        self.validate_alter_network_policy(session, &network_policy.rules)
4256    }
4257
4258    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4259    ///
4260    /// This helps prevent users from modifying network policies in a way that would lock out their
4261    /// current connection.
4262    fn validate_alter_network_policy(
4263        &self,
4264        session: &Session,
4265        policy_rules: &Vec<NetworkPolicyRule>,
4266    ) -> Result<(), AdapterError> {
4267        // If the user is not an internal user attempt to protect them from
4268        // blocking themselves.
4269        if session.user().is_internal() {
4270            return Ok(());
4271        }
4272        if let Some(ip) = session.meta().client_ip() {
4273            validate_ip_with_policy_rules(ip, policy_rules)
4274                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4275        } else {
4276            // Sessions without IPs are only temporarily constructed for default values
4277            // they should not be permitted here.
4278            return Err(AdapterError::NetworkPolicyDenied(
4279                NetworkPolicyError::MissingIp,
4280            ));
4281        }
4282        Ok(())
4283    }
4284
4285    // Returns the name of the portal to execute.
4286    #[instrument]
4287    pub(super) fn sequence_execute(
4288        &self,
4289        session: &mut Session,
4290        plan: plan::ExecutePlan,
4291    ) -> Result<String, AdapterError> {
4292        // Verify the stmt is still valid.
4293        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4294        let ps = session
4295            .get_prepared_statement_unverified(&plan.name)
4296            .expect("known to exist");
4297        let stmt = ps.stmt().cloned();
4298        let desc = ps.desc().clone();
4299        let state_revision = ps.state_revision;
4300        let logging = Arc::clone(ps.logging());
4301        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4302    }
4303
4304    #[instrument]
4305    pub(super) async fn sequence_grant_privileges(
4306        &mut self,
4307        session: &Session,
4308        plan::GrantPrivilegesPlan {
4309            update_privileges,
4310            grantees,
4311        }: plan::GrantPrivilegesPlan,
4312    ) -> Result<ExecuteResponse, AdapterError> {
4313        self.sequence_update_privileges(
4314            session,
4315            update_privileges,
4316            grantees,
4317            UpdatePrivilegeVariant::Grant,
4318        )
4319        .await
4320    }
4321
4322    #[instrument]
4323    pub(super) async fn sequence_revoke_privileges(
4324        &mut self,
4325        session: &Session,
4326        plan::RevokePrivilegesPlan {
4327            update_privileges,
4328            revokees,
4329        }: plan::RevokePrivilegesPlan,
4330    ) -> Result<ExecuteResponse, AdapterError> {
4331        self.sequence_update_privileges(
4332            session,
4333            update_privileges,
4334            revokees,
4335            UpdatePrivilegeVariant::Revoke,
4336        )
4337        .await
4338    }
4339
4340    #[instrument]
4341    async fn sequence_update_privileges(
4342        &mut self,
4343        session: &Session,
4344        update_privileges: Vec<UpdatePrivilege>,
4345        grantees: Vec<RoleId>,
4346        variant: UpdatePrivilegeVariant,
4347    ) -> Result<ExecuteResponse, AdapterError> {
4348        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4349        let mut warnings = Vec::new();
4350        let catalog = self.catalog().for_session(session);
4351
4352        for UpdatePrivilege {
4353            acl_mode,
4354            target_id,
4355            grantor,
4356            acl_from_all,
4357        } in update_privileges
4358        {
4359            let actual_object_type = catalog.get_system_object_type(&target_id);
4360            // For all relations we allow all applicable table privileges, but send a warning if the
4361            // privilege isn't actually applicable to the object type. We skip the warning when the
4362            // user used the `ALL [PRIVILEGES]` shorthand: the user did not explicitly name a
4363            // non-applicable privilege, and via PostgreSQL-compatible `ON TABLE <view>` syntax
4364            // `ALL` deliberately expands to the full table set.
4365            if actual_object_type.is_relation() && !acl_from_all {
4366                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4367                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4368                if !non_applicable_privileges.is_empty() {
4369                    let object_description =
4370                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4371                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4372                        non_applicable_privileges,
4373                        object_description,
4374                    })
4375                }
4376            }
4377
4378            if let SystemObjectId::Object(object_id) = &target_id {
4379                self.catalog()
4380                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4381            }
4382
4383            let privileges = self
4384                .catalog()
4385                .get_privileges(&target_id, session.conn_id())
4386                // Should be unreachable since the parser will refuse to parse grant/revoke
4387                // statements on objects without privileges.
4388                .ok_or(AdapterError::Unsupported(
4389                    "GRANTs/REVOKEs on an object type with no privileges",
4390                ))?;
4391
4392            for grantee in &grantees {
4393                self.catalog().ensure_not_system_role(grantee)?;
4394                self.catalog().ensure_not_predefined_role(grantee)?;
4395                let existing_privilege = privileges
4396                    .get_acl_item(grantee, &grantor)
4397                    .map(Cow::Borrowed)
4398                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4399
4400                match variant {
4401                    UpdatePrivilegeVariant::Grant
4402                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4403                    {
4404                        ops.push(catalog::Op::UpdatePrivilege {
4405                            target_id: target_id.clone(),
4406                            privilege: MzAclItem {
4407                                grantee: *grantee,
4408                                grantor,
4409                                acl_mode,
4410                            },
4411                            variant,
4412                        });
4413                    }
4414                    UpdatePrivilegeVariant::Revoke
4415                        if !existing_privilege
4416                            .acl_mode
4417                            .intersection(acl_mode)
4418                            .is_empty() =>
4419                    {
4420                        ops.push(catalog::Op::UpdatePrivilege {
4421                            target_id: target_id.clone(),
4422                            privilege: MzAclItem {
4423                                grantee: *grantee,
4424                                grantor,
4425                                acl_mode,
4426                            },
4427                            variant,
4428                        });
4429                    }
4430                    // no-op
4431                    _ => {}
4432                }
4433            }
4434        }
4435
4436        if ops.is_empty() {
4437            session.add_notices(warnings);
4438            return Ok(variant.into());
4439        }
4440
4441        let res = self
4442            .catalog_transact(Some(session), ops)
4443            .await
4444            .map(|_| match variant {
4445                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4446                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4447            });
4448        if res.is_ok() {
4449            session.add_notices(warnings);
4450        }
4451        res
4452    }
4453
4454    #[instrument]
4455    pub(super) async fn sequence_alter_default_privileges(
4456        &mut self,
4457        session: &Session,
4458        plan::AlterDefaultPrivilegesPlan {
4459            privilege_objects,
4460            privilege_acl_items,
4461            is_grant,
4462        }: plan::AlterDefaultPrivilegesPlan,
4463    ) -> Result<ExecuteResponse, AdapterError> {
4464        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4465        let variant = if is_grant {
4466            UpdatePrivilegeVariant::Grant
4467        } else {
4468            UpdatePrivilegeVariant::Revoke
4469        };
4470        for privilege_object in &privilege_objects {
4471            self.catalog()
4472                .ensure_not_system_role(&privilege_object.role_id)?;
4473            self.catalog()
4474                .ensure_not_predefined_role(&privilege_object.role_id)?;
4475            if let Some(database_id) = privilege_object.database_id {
4476                self.catalog()
4477                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4478            }
4479            if let Some(schema_id) = privilege_object.schema_id {
4480                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4481                let schema_spec: SchemaSpecifier = schema_id.into();
4482
4483                self.catalog().ensure_not_reserved_object(
4484                    &(database_spec, schema_spec).into(),
4485                    session.conn_id(),
4486                )?;
4487            }
4488            for privilege_acl_item in &privilege_acl_items {
4489                self.catalog()
4490                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4491                self.catalog()
4492                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4493                ops.push(catalog::Op::UpdateDefaultPrivilege {
4494                    privilege_object: privilege_object.clone(),
4495                    privilege_acl_item: privilege_acl_item.clone(),
4496                    variant,
4497                })
4498            }
4499        }
4500
4501        self.catalog_transact(Some(session), ops).await?;
4502        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4503    }
4504
4505    #[instrument]
4506    pub(super) async fn sequence_grant_role(
4507        &mut self,
4508        session: &Session,
4509        plan::GrantRolePlan {
4510            role_ids,
4511            member_ids,
4512            grantor_id,
4513        }: plan::GrantRolePlan,
4514    ) -> Result<ExecuteResponse, AdapterError> {
4515        let catalog = self.catalog();
4516        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4517        for role_id in role_ids {
4518            for member_id in &member_ids {
4519                let member_membership: BTreeSet<_> =
4520                    catalog.get_role(member_id).membership().keys().collect();
4521                if member_membership.contains(&role_id) {
4522                    let role_name = catalog.get_role(&role_id).name().to_string();
4523                    let member_name = catalog.get_role(member_id).name().to_string();
4524                    // We need this check so we don't accidentally return a success on a reserved role.
4525                    catalog.ensure_not_reserved_role(member_id)?;
4526                    catalog.ensure_grantable_role(&role_id)?;
4527                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4528                        role_name,
4529                        member_name,
4530                    });
4531                } else {
4532                    ops.push(catalog::Op::GrantRole {
4533                        role_id,
4534                        member_id: *member_id,
4535                        grantor_id,
4536                    });
4537                }
4538            }
4539        }
4540
4541        if ops.is_empty() {
4542            return Ok(ExecuteResponse::GrantedRole);
4543        }
4544
4545        self.catalog_transact(Some(session), ops)
4546            .await
4547            .map(|_| ExecuteResponse::GrantedRole)
4548    }
4549
4550    #[instrument]
4551    pub(super) async fn sequence_revoke_role(
4552        &mut self,
4553        session: &Session,
4554        plan::RevokeRolePlan {
4555            role_ids,
4556            member_ids,
4557            grantor_id,
4558        }: plan::RevokeRolePlan,
4559    ) -> Result<ExecuteResponse, AdapterError> {
4560        let catalog = self.catalog();
4561        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4562        for role_id in role_ids {
4563            for member_id in &member_ids {
4564                let member_membership: BTreeSet<_> =
4565                    catalog.get_role(member_id).membership().keys().collect();
4566                if !member_membership.contains(&role_id) {
4567                    let role_name = catalog.get_role(&role_id).name().to_string();
4568                    let member_name = catalog.get_role(member_id).name().to_string();
4569                    // We need this check so we don't accidentally return a success on a reserved role.
4570                    catalog.ensure_not_reserved_role(member_id)?;
4571                    catalog.ensure_grantable_role(&role_id)?;
4572                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4573                        role_name,
4574                        member_name,
4575                    });
4576                } else {
4577                    ops.push(catalog::Op::RevokeRole {
4578                        role_id,
4579                        member_id: *member_id,
4580                        grantor_id,
4581                    });
4582                }
4583            }
4584        }
4585
4586        if ops.is_empty() {
4587            return Ok(ExecuteResponse::RevokedRole);
4588        }
4589
4590        self.catalog_transact(Some(session), ops)
4591            .await
4592            .map(|_| ExecuteResponse::RevokedRole)
4593    }
4594
4595    #[instrument]
4596    pub(super) async fn sequence_alter_owner(
4597        &mut self,
4598        session: &Session,
4599        plan::AlterOwnerPlan {
4600            id,
4601            object_type,
4602            new_owner,
4603        }: plan::AlterOwnerPlan,
4604    ) -> Result<ExecuteResponse, AdapterError> {
4605        let mut ops = vec![catalog::Op::UpdateOwner {
4606            id: id.clone(),
4607            new_owner,
4608        }];
4609
4610        match &id {
4611            ObjectId::Item(global_id) => {
4612                let entry = self.catalog().get_entry(global_id);
4613
4614                // Cannot directly change the owner of an index.
4615                if entry.is_index() {
4616                    let name = self
4617                        .catalog()
4618                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4619                        .to_string();
4620                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4621                    return Ok(ExecuteResponse::AlteredObject(object_type));
4622                }
4623
4624                // Alter owner cascades down to dependent indexes.
4625                let dependent_index_ops = entry
4626                    .used_by()
4627                    .into_iter()
4628                    .filter(|id| self.catalog().get_entry(id).is_index())
4629                    .map(|id| catalog::Op::UpdateOwner {
4630                        id: ObjectId::Item(*id),
4631                        new_owner,
4632                    });
4633                ops.extend(dependent_index_ops);
4634
4635                // Alter owner cascades down to progress collections.
4636                let dependent_subsources =
4637                    entry
4638                        .progress_id()
4639                        .into_iter()
4640                        .map(|item_id| catalog::Op::UpdateOwner {
4641                            id: ObjectId::Item(item_id),
4642                            new_owner,
4643                        });
4644                ops.extend(dependent_subsources);
4645            }
4646            ObjectId::Cluster(cluster_id) => {
4647                let cluster = self.catalog().get_cluster(*cluster_id);
4648                // Alter owner cascades down to cluster replicas.
4649                let managed_cluster_replica_ops =
4650                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4651                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4652                        new_owner,
4653                    });
4654                ops.extend(managed_cluster_replica_ops);
4655            }
4656            _ => {}
4657        }
4658
4659        self.catalog_transact(Some(session), ops)
4660            .await
4661            .map(|_| ExecuteResponse::AlteredObject(object_type))
4662    }
4663
4664    #[instrument]
4665    pub(super) async fn sequence_reassign_owned(
4666        &mut self,
4667        session: &Session,
4668        plan::ReassignOwnedPlan {
4669            old_roles,
4670            new_role,
4671            reassign_ids,
4672        }: plan::ReassignOwnedPlan,
4673    ) -> Result<ExecuteResponse, AdapterError> {
4674        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4675            self.catalog().ensure_not_reserved_role(role_id)?;
4676        }
4677
4678        let ops = reassign_ids
4679            .into_iter()
4680            .map(|id| catalog::Op::UpdateOwner {
4681                id,
4682                new_owner: new_role,
4683            })
4684            .collect();
4685
4686        self.catalog_transact(Some(session), ops)
4687            .await
4688            .map(|_| ExecuteResponse::ReassignOwned)
4689    }
4690
4691    #[instrument]
4692    pub(crate) async fn handle_deferred_statement(&mut self) {
4693        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4694        // was processed, removing the single element from deferred_statements, so it is expected
4695        // that this is sometimes empty.
4696        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4697            return;
4698        };
4699        match ps {
4700            crate::coord::PlanStatement::Statement { stmt, params } => {
4701                self.handle_execute_inner(stmt, params, ctx).await;
4702            }
4703            crate::coord::PlanStatement::Plan {
4704                plan,
4705                resolved_ids,
4706                sql_impl_resolved_ids,
4707            } => {
4708                self.sequence_plan(ctx, plan, resolved_ids, sql_impl_resolved_ids)
4709                    .await;
4710            }
4711        }
4712    }
4713
4714    #[instrument]
4715    // TODO(parkmycar): Remove this once we have an actual implementation.
4716    #[allow(clippy::unused_async)]
4717    pub(super) async fn sequence_alter_table(
4718        &mut self,
4719        ctx: &mut ExecuteContext,
4720        plan: plan::AlterTablePlan,
4721    ) -> Result<ExecuteResponse, AdapterError> {
4722        let plan::AlterTablePlan {
4723            relation_id,
4724            column_name,
4725            column_type,
4726            raw_sql_type,
4727        } = plan;
4728
4729        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4730        let (_, new_global_id) = self.allocate_user_id().await?;
4731        let ops = vec![catalog::Op::AlterAddColumn {
4732            id: relation_id,
4733            new_global_id,
4734            name: column_name,
4735            typ: column_type,
4736            sql: raw_sql_type,
4737        }];
4738
4739        self.catalog_transact_with_context(None, Some(ctx), ops)
4740            .await?;
4741
4742        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4743    }
4744
4745    /// Prepares to apply a replacement materialized view.
4746    #[instrument]
4747    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4748        &mut self,
4749        ctx: ExecuteContext,
4750        plan: AlterMaterializedViewApplyReplacementPlan,
4751    ) {
4752        // To ensure there is no time gap in the output, we can only apply a replacement if the
4753        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4754        // might not be the case initially, so we have to wait. To this end, we install a watch set
4755        // waiting for the target MV's write frontier to advance sufficiently.
4756        //
4757        // Note that the replacement's dataflow is not performing any writes, so it can only be
4758        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4759        // replacement's write frontier is always <= the target's.
4760
4761        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4762
4763        let plan_validity = PlanValidity::new(
4764            self.catalog().transient_revision(),
4765            BTreeSet::from_iter([id, replacement_id]),
4766            None,
4767            None,
4768            ctx.session().role_metadata().clone(),
4769        );
4770
4771        let target = self.catalog.get_entry(&id);
4772        let target_gid = target.latest_global_id();
4773
4774        let replacement = self.catalog.get_entry(&replacement_id);
4775        let replacement_gid = replacement.latest_global_id();
4776
4777        let target_upper = self
4778            .controller
4779            .storage_collections
4780            .collection_frontiers(target_gid)
4781            .expect("target MV exists")
4782            .write_frontier;
4783        let replacement_upper = self
4784            .controller
4785            .compute
4786            .collection_frontiers(replacement_gid, replacement.cluster_id())
4787            .expect("replacement MV exists")
4788            .write_frontier;
4789
4790        info!(
4791            %id, %replacement_id, ?target_upper, ?replacement_upper,
4792            "preparing materialized view replacement application",
4793        );
4794
4795        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4796            // A replacement's write frontier can only become empty if the target's write frontier
4797            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4798            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4799            // the user by returning an error, rather than applying the useless replacement.
4800            //
4801            // Note that we can't assert on `target_upper` being empty here, because the reporting
4802            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4803            // from persist, which we cannot do without incurring I/O.
4804            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4805                name: target.name().item.clone(),
4806            }));
4807            return;
4808        };
4809
4810        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4811        // specified timestamp. Since we only need to wait until the target frontier is >= the
4812        // replacement's frontier, we can step back the timestamp.
4813        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4814
4815        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4816        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4817        // command.
4818        self.install_storage_watch_set(
4819            ctx.session().conn_id().clone(),
4820            BTreeSet::from_iter([target_gid]),
4821            replacement_upper_ts,
4822            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4823                ctx: Some(ctx),
4824                otel_ctx: OpenTelemetryContext::obtain(),
4825                plan,
4826                plan_validity,
4827            }),
4828        )
4829        .expect("target collection exists");
4830    }
4831
4832    /// Finishes applying a replacement materialized view after the frontier wait completed.
4833    #[instrument]
4834    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4835        &mut self,
4836        mut ctx: AlterMaterializedViewReadyContext,
4837    ) {
4838        ctx.otel_ctx.attach_as_parent();
4839
4840        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4841
4842        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4843        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4844        // world has arbitrarily changed since we performed planning, and we must re-assert
4845        // that it still matches our requirements.
4846        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4847            ctx.retire(Err(err));
4848            return;
4849        }
4850
4851        info!(
4852            %id, %replacement_id,
4853            "finishing materialized view replacement application",
4854        );
4855
4856        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4857        match self
4858            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4859            .await
4860        {
4861            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4862                ObjectType::MaterializedView,
4863            ))),
4864            Err(err) => ctx.retire(Err(err)),
4865        }
4866    }
4867
4868    pub(super) async fn statistics_oracle(
4869        &self,
4870        session: &Session,
4871        source_ids: &BTreeSet<GlobalId>,
4872        query_as_of: &Antichain<Timestamp>,
4873        is_oneshot: bool,
4874    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4875        super::statistics_oracle(
4876            session,
4877            source_ids,
4878            query_as_of,
4879            is_oneshot,
4880            self.catalog().system_config(),
4881            self.controller.storage_collections.as_ref(),
4882        )
4883        .await
4884    }
4885}
4886
4887impl Coordinator {
4888    /// Emit the raw optimizer notices in `notices` to the user's session, if
4889    /// any.
4890    ///
4891    /// This intentionally consumes `RawOptimizerNotice`s (not pre-rendered
4892    /// ones) because the user-facing rendering goes through the user's
4893    /// session-aware humanizer, which produces e.g. schema-qualified names
4894    /// relative to the user's current database/schema.
4895    pub(crate) fn emit_raw_optimizer_notices_to_user(
4896        &self,
4897        ctx: &ExecuteContext,
4898        notices: &[RawOptimizerNotice],
4899    ) {
4900        emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4901    }
4902
4903    /// Persist already-rendered optimizer notices for a newly created
4904    /// non-transient dataflow.
4905    ///
4906    /// This:
4907    /// - packs builtin-table updates for `mz_optimizer_notices` (if enabled),
4908    /// - stores the rendered metainfo on the catalog object via
4909    ///   `set_dataflow_metainfo`,
4910    /// - and returns a future that resolves once the builtin-table append
4911    ///   has been observed, or `None` if nothing was appended.
4912    async fn persist_dataflow_metainfo(
4913        &mut self,
4914        df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4915        export_id: GlobalId,
4916    ) -> Option<BuiltinTableAppendNotify> {
4917        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4918        // in-memory state.
4919        if self.catalog().state().system_config().enable_mz_notices()
4920            && !df_meta.optimizer_notices.is_empty()
4921        {
4922            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4923            self.catalog().state().pack_optimizer_notices(
4924                &mut builtin_table_updates,
4925                df_meta.optimizer_notices.iter(),
4926                Diff::ONE,
4927            );
4928
4929            // Save the metainfo.
4930            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4931
4932            Some(
4933                self.builtin_table_update()
4934                    .execute(builtin_table_updates)
4935                    .await
4936                    .0,
4937            )
4938        } else {
4939            // Save the metainfo.
4940            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4941
4942            None
4943        }
4944    }
4945}