Skip to main content

mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26    CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43    CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44    RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49    SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54    CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
55    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59    SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62    AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63    StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
101use crate::coord::read_then_write::validate_read_then_write_dependencies;
102use crate::coord::sequencer::emit_optimizer_notices;
103use crate::coord::{
104    AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
105    Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
106    ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
107    PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
108    WatchSetResponse, validate_ip_with_policy_rules,
109};
110use crate::error::AdapterError;
111use crate::notice::{AdapterNotice, DroppedInUseIndex};
112use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
113use crate::optimize::{self, Optimize};
114use crate::session::{
115    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
116    WriteLocks, WriteOp,
117};
118use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
119use crate::{PeekResponseUnary, ReadHolds};
120
121/// A future that resolves to a real-time recency timestamp.
122type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
123
124mod cluster;
125mod copy_from;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134/// Attempts to evaluate an expression. If an error is returned then the error is sent
135/// to the client and the function is exited.
136macro_rules! return_if_err {
137    ($expr:expr, $ctx:expr) => {
138        match $expr {
139            Ok(v) => v,
140            Err(e) => return $ctx.retire(Err(e.into())),
141        }
142    };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148    ops: Vec<catalog::Op>,
149    dropped_active_db: bool,
150    dropped_active_cluster: bool,
151    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154// A bundle of values returned from create_source_inner
155struct CreateSourceInner {
156    ops: Vec<catalog::Op>,
157    sources: Vec<(CatalogItemId, Source)>,
158    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
163    /// execute both on and off of the coordinator thread. Stages can either produce another stage
164    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
165    /// tracing.
166    pub(crate) async fn sequence_staged<S>(
167        &mut self,
168        mut ctx: S::Ctx,
169        parent_span: Span,
170        mut stage: S,
171    ) where
172        S: Staged + 'static,
173        S::Ctx: Send + 'static,
174    {
175        return_if_err!(stage.validity().check(self.catalog()), ctx);
176        loop {
177            let mut cancel_enabled = stage.cancel_enabled();
178            if let Some(session) = ctx.session() {
179                if cancel_enabled {
180                    // Channel to await cancellation. Insert a new channel, but check if the previous one
181                    // was already canceled.
182                    if let Some((_prev_tx, prev_rx)) = self
183                        .staged_cancellation
184                        .insert(session.conn_id().clone(), watch::channel(false))
185                    {
186                        let was_canceled = *prev_rx.borrow();
187                        if was_canceled {
188                            ctx.retire(Err(AdapterError::Canceled));
189                            return;
190                        }
191                    }
192                } else {
193                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
194                    // when cancel_enabled may have been true on an earlier stage.
195                    self.staged_cancellation.remove(session.conn_id());
196                }
197            } else {
198                cancel_enabled = false
199            };
200            let next = stage
201                .stage(self, &mut ctx)
202                .instrument(parent_span.clone())
203                .await;
204            let res = return_if_err!(next, ctx);
205            stage = match res {
206                StageResult::Handle(handle) => {
207                    let internal_cmd_tx = self.internal_cmd_tx.clone();
208                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210                    });
211                    return;
212                }
213                StageResult::HandleRetire(handle) => {
214                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215                        ctx.retire(Ok(resp));
216                    });
217                    return;
218                }
219                StageResult::Response(resp) => {
220                    ctx.retire(Ok(resp));
221                    return;
222                }
223                StageResult::Immediate(stage) => *stage,
224            }
225        }
226    }
227
228    fn handle_spawn<C, T, F>(
229        &self,
230        ctx: C,
231        handle: JoinHandle<Result<T, AdapterError>>,
232        cancel_enabled: bool,
233        f: F,
234    ) where
235        C: StagedContext + Send + 'static,
236        T: Send + 'static,
237        F: FnOnce(C, T) + Send + 'static,
238    {
239        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240            .session()
241            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242        {
243            let mut rx = rx.clone();
244            Box::pin(async move {
245                // Wait for true or dropped sender.
246                let _ = rx.wait_for(|v| *v).await;
247                ()
248            })
249        } else {
250            Box::pin(future::pending())
251        };
252        spawn(|| "sequence_staged", async move {
253            tokio::select! {
254                res = handle => {
255                    let next = return_if_err!(res, ctx);
256                    f(ctx, next);
257                }
258                _ = rx, if cancel_enabled => {
259                    ctx.retire(Err(AdapterError::Canceled));
260                }
261            }
262        });
263    }
264
265    async fn create_source_inner(
266        &self,
267        session: &Session,
268        plans: Vec<plan::CreateSourcePlanBundle>,
269    ) -> Result<CreateSourceInner, AdapterError> {
270        let mut ops = vec![];
271        let mut sources = vec![];
272
273        let if_not_exists_ids = plans
274            .iter()
275            .filter_map(
276                |plan::CreateSourcePlanBundle {
277                     item_id,
278                     global_id: _,
279                     plan,
280                     resolved_ids: _,
281                     available_source_references: _,
282                 }| {
283                    if plan.if_not_exists {
284                        Some((*item_id, plan.name.clone()))
285                    } else {
286                        None
287                    }
288                },
289            )
290            .collect::<BTreeMap<_, _>>();
291
292        for plan::CreateSourcePlanBundle {
293            item_id,
294            global_id,
295            mut plan,
296            resolved_ids,
297            available_source_references,
298        } in plans
299        {
300            let name = plan.name.clone();
301
302            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
303            if let mz_sql::plan::DataSourceDesc::Webhook {
304                validate_using: Some(validate),
305                ..
306            } = &mut plan.source.data_source
307            {
308                if let Err(reason) = validate.reduce_expression().await {
309                    self.metrics
310                        .webhook_validation_reduce_failures
311                        .with_label_values(&[reason])
312                        .inc();
313                    return Err(AdapterError::Internal(format!(
314                        "failed to reduce check expression, {reason}"
315                    )));
316                }
317            }
318
319            // If this source contained a set of available source references, update the
320            // source references catalog table.
321            let mut reference_ops = vec![];
322            if let Some(references) = &available_source_references {
323                reference_ops.push(catalog::Op::UpdateSourceReferences {
324                    source_id: item_id,
325                    references: references.clone().into(),
326                });
327            }
328
329            let source = Source::new(plan, global_id, resolved_ids, None, false);
330            ops.push(catalog::Op::CreateItem {
331                id: item_id,
332                name,
333                item: CatalogItem::Source(source.clone()),
334                owner_id: *session.current_role_id(),
335            });
336            sources.push((item_id, source));
337            // These operations must be executed after the source is added to the catalog.
338            ops.extend(reference_ops);
339        }
340
341        Ok(CreateSourceInner {
342            ops,
343            sources,
344            if_not_exists_ids,
345        })
346    }
347
348    /// Subsources are planned differently from other statements because they
349    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
350    /// Because of this, we have usually "missed" the opportunity to plan them
351    /// through the normal statement execution life cycle (the exception being
352    /// during bootstrapping).
353    ///
354    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
355    pub(crate) fn plan_subsource(
356        &self,
357        session: &Session,
358        params: &mz_sql::plan::Params,
359        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
360        item_id: CatalogItemId,
361        global_id: GlobalId,
362    ) -> Result<CreateSourcePlanBundle, AdapterError> {
363        let catalog = self.catalog().for_session(session);
364        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
365
366        let plan = self.plan_statement(
367            session,
368            Statement::CreateSubsource(subsource_stmt),
369            params,
370            &resolved_ids,
371        )?;
372        let plan = match plan {
373            Plan::CreateSource(plan) => plan,
374            _ => unreachable!(),
375        };
376        Ok(CreateSourcePlanBundle {
377            item_id,
378            global_id,
379            plan,
380            resolved_ids,
381            available_source_references: None,
382        })
383    }
384
385    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
386    pub(crate) async fn plan_purified_alter_source_add_subsource(
387        &mut self,
388        session: &Session,
389        params: Params,
390        source_name: ResolvedItemName,
391        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
392        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
393    ) -> Result<(Plan, ResolvedIds), AdapterError> {
394        let mut subsource_plans = Vec::with_capacity(subsources.len());
395
396        // Generate subsource statements
397        let conn_catalog = self.catalog().for_system_session();
398        let pcx = plan::PlanContext::zero();
399        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
400
401        let entry = self.catalog().get_entry(source_name.item_id());
402        let source = entry.source().ok_or_else(|| {
403            AdapterError::internal(
404                "plan alter source",
405                format!("expected Source found {entry:?}"),
406            )
407        })?;
408
409        let item_id = entry.id();
410        let ingestion_id = source.global_id();
411        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
412
413        let ids = self
414            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
415            .await?;
416        for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
417            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
418            subsource_plans.push(s);
419        }
420
421        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
422            subsources: subsource_plans,
423            options,
424        };
425
426        Ok((
427            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
428                item_id,
429                ingestion_id,
430                action,
431            }),
432            ResolvedIds::empty(),
433        ))
434    }
435
436    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
437    pub(crate) fn plan_purified_alter_source_refresh_references(
438        &self,
439        _session: &Session,
440        _params: Params,
441        source_name: ResolvedItemName,
442        available_source_references: plan::SourceReferences,
443    ) -> Result<(Plan, ResolvedIds), AdapterError> {
444        let entry = self.catalog().get_entry(source_name.item_id());
445        let source = entry.source().ok_or_else(|| {
446            AdapterError::internal(
447                "plan alter source",
448                format!("expected Source found {entry:?}"),
449            )
450        })?;
451        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
452            references: available_source_references,
453        };
454
455        Ok((
456            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
457                item_id: entry.id(),
458                ingestion_id: source.global_id(),
459                action,
460            }),
461            ResolvedIds::empty(),
462        ))
463    }
464
465    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
466    /// the primary source, and any ingestion export subsources (e.g. PG
467    /// tables).
468    pub(crate) async fn plan_purified_create_source(
469        &mut self,
470        ctx: &ExecuteContext,
471        params: Params,
472        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
473        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
474        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
475        available_source_references: plan::SourceReferences,
476    ) -> Result<(Plan, ResolvedIds), AdapterError> {
477        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
478
479        // 1. First plan the progress subsource, if any.
480        if let Some(progress_stmt) = progress_stmt {
481            // The primary source depends on this subsource because the primary
482            // source needs to know its shard ID, and the easiest way of
483            // guaranteeing that the shard ID is discoverable is to create this
484            // collection first.
485            assert_none!(progress_stmt.of_source);
486            let (item_id, global_id) = self.allocate_user_id().await?;
487            let progress_plan =
488                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
489            let progress_full_name = self
490                .catalog()
491                .resolve_full_name(&progress_plan.plan.name, None);
492            let progress_subsource = ResolvedItemName::Item {
493                id: progress_plan.item_id,
494                qualifiers: progress_plan.plan.name.qualifiers.clone(),
495                full_name: progress_full_name,
496                print_id: true,
497                version: RelationVersionSelector::Latest,
498            };
499
500            create_source_plans.push(progress_plan);
501
502            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
503        }
504
505        let catalog = self.catalog().for_session(ctx.session());
506        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
507
508        let propagated_with_options: Vec<_> = source_stmt
509            .with_options
510            .iter()
511            .filter_map(|opt| match opt.name {
512                CreateSourceOptionName::TimestampInterval => None,
513                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
514                    name: CreateSubsourceOptionName::RetainHistory,
515                    value: opt.value.clone(),
516                }),
517            })
518            .collect();
519
520        // 2. Then plan the main source.
521        let source_plan = match self.plan_statement(
522            ctx.session(),
523            Statement::CreateSource(source_stmt),
524            &params,
525            &resolved_ids,
526        )? {
527            Plan::CreateSource(plan) => plan,
528            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
529        };
530
531        let (item_id, global_id) = self.allocate_user_id().await?;
532
533        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
534        let of_source = ResolvedItemName::Item {
535            id: item_id,
536            qualifiers: source_plan.name.qualifiers.clone(),
537            full_name: source_full_name,
538            print_id: true,
539            version: RelationVersionSelector::Latest,
540        };
541
542        // Generate subsource statements
543        let conn_catalog = self.catalog().for_system_session();
544        let pcx = plan::PlanContext::zero();
545        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
546
547        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
548
549        for subsource_stmt in subsource_stmts.iter_mut() {
550            subsource_stmt
551                .with_options
552                .extend(propagated_with_options.iter().cloned())
553        }
554
555        create_source_plans.push(CreateSourcePlanBundle {
556            item_id,
557            global_id,
558            plan: source_plan,
559            resolved_ids: resolved_ids.clone(),
560            available_source_references: Some(available_source_references),
561        });
562
563        // 3. Finally, plan all the subsources
564        let ids = self
565            .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
566            .await?;
567        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
568            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
569            create_source_plans.push(plan);
570        }
571
572        Ok((
573            Plan::CreateSources(create_source_plans),
574            ResolvedIds::empty(),
575        ))
576    }
577
578    #[instrument]
579    pub(super) async fn sequence_create_source(
580        &mut self,
581        ctx: &mut ExecuteContext,
582        plans: Vec<plan::CreateSourcePlanBundle>,
583    ) -> Result<ExecuteResponse, AdapterError> {
584        let CreateSourceInner {
585            ops,
586            sources,
587            if_not_exists_ids,
588        } = self.create_source_inner(ctx.session(), plans).await?;
589
590        let transact_result = self
591            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
592            .await;
593
594        // Check if any sources are webhook sources and report them as created.
595        for (item_id, source) in &sources {
596            if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
597                if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
598                    ctx.session()
599                        .add_notice(AdapterNotice::WebhookSourceCreated { url });
600                }
601            }
602        }
603
604        match transact_result {
605            Ok(()) => Ok(ExecuteResponse::CreatedSource),
606            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
607                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
608            })) if if_not_exists_ids.contains_key(&id) => {
609                ctx.session()
610                    .add_notice(AdapterNotice::ObjectAlreadyExists {
611                        name: if_not_exists_ids[&id].item.clone(),
612                        ty: "source",
613                    });
614                Ok(ExecuteResponse::CreatedSource)
615            }
616            Err(err) => Err(err),
617        }
618    }
619
620    #[instrument]
621    pub(super) async fn sequence_create_connection(
622        &mut self,
623        mut ctx: ExecuteContext,
624        plan: plan::CreateConnectionPlan,
625        resolved_ids: ResolvedIds,
626    ) {
627        let (connection_id, connection_gid) = match self.allocate_user_id().await {
628            Ok(item_id) => item_id,
629            Err(err) => return ctx.retire(Err(err)),
630        };
631
632        match &plan.connection.details {
633            ConnectionDetails::Ssh { key_1, key_2, .. } => {
634                let key_1 = match key_1.as_key_pair() {
635                    Some(key_1) => key_1.clone(),
636                    None => {
637                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
638                            "the PUBLIC KEY 1 option cannot be explicitly specified"
639                        ))));
640                    }
641                };
642
643                let key_2 = match key_2.as_key_pair() {
644                    Some(key_2) => key_2.clone(),
645                    None => {
646                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
647                            "the PUBLIC KEY 2 option cannot be explicitly specified"
648                        ))));
649                    }
650                };
651
652                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
653                let secret = key_set.to_bytes();
654                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
655                    return ctx.retire(Err(err.into()));
656                }
657                self.caching_secrets_reader.invalidate(connection_id);
658            }
659            _ => (),
660        };
661
662        if plan.validate {
663            let internal_cmd_tx = self.internal_cmd_tx.clone();
664            let transient_revision = self.catalog().transient_revision();
665            let conn_id = ctx.session().conn_id().clone();
666            let otel_ctx = OpenTelemetryContext::obtain();
667            let role_metadata = ctx.session().role_metadata().clone();
668
669            let connection = plan
670                .connection
671                .details
672                .to_connection()
673                .into_inline_connection(self.catalog().state());
674
675            let current_storage_parameters = self.controller.storage.config().clone();
676            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
677                let result = match std::panic::AssertUnwindSafe(
678                    connection.validate(connection_id, &current_storage_parameters),
679                )
680                .ore_catch_unwind()
681                .await
682                {
683                    Ok(Ok(())) => Ok(plan),
684                    Ok(Err(err)) => Err(err.into()),
685                    Err(_panic) => {
686                        tracing::error!("connection validation panicked");
687                        Err(AdapterError::Internal(
688                            "connection validation panicked".into(),
689                        ))
690                    }
691                };
692
693                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
694                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
695                    CreateConnectionValidationReady {
696                        ctx,
697                        result,
698                        connection_id,
699                        connection_gid,
700                        plan_validity: PlanValidity::new(
701                            transient_revision,
702                            resolved_ids.items().copied().collect(),
703                            None,
704                            None,
705                            role_metadata,
706                        ),
707                        otel_ctx,
708                        resolved_ids: resolved_ids.clone(),
709                    },
710                ));
711                if let Err(e) = result {
712                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
713                }
714            });
715        } else {
716            let result = self
717                .sequence_create_connection_stage_finish(
718                    &mut ctx,
719                    connection_id,
720                    connection_gid,
721                    plan,
722                    resolved_ids,
723                )
724                .await;
725            ctx.retire(result);
726        }
727    }
728
729    #[instrument]
730    pub(crate) async fn sequence_create_connection_stage_finish(
731        &mut self,
732        ctx: &mut ExecuteContext,
733        connection_id: CatalogItemId,
734        connection_gid: GlobalId,
735        plan: plan::CreateConnectionPlan,
736        resolved_ids: ResolvedIds,
737    ) -> Result<ExecuteResponse, AdapterError> {
738        let ops = vec![catalog::Op::CreateItem {
739            id: connection_id,
740            name: plan.name.clone(),
741            item: CatalogItem::Connection(Connection {
742                create_sql: plan.connection.create_sql,
743                global_id: connection_gid,
744                details: plan.connection.details.clone(),
745                resolved_ids,
746            }),
747            owner_id: *ctx.session().current_role_id(),
748        }];
749
750        // VPC endpoint creation for AWS PrivateLink connections is now handled
751        // in apply_catalog_implications.
752        let conn_id = ctx.session().conn_id().clone();
753        let transact_result = self
754            .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
755            .await;
756
757        match transact_result {
758            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
759            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
760                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
761            })) if plan.if_not_exists => {
762                // Clean up SSH key material if it was persisted, since the
763                // catalog item was not created.
764                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
765                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
766                        tracing::warn!(
767                            "Dropping SSH secret for existing connection has encountered an error: {}",
768                            e
769                        );
770                    } else {
771                        self.caching_secrets_reader.invalidate(connection_id);
772                    }
773                }
774                ctx.session()
775                    .add_notice(AdapterNotice::ObjectAlreadyExists {
776                        name: plan.name.item,
777                        ty: "connection",
778                    });
779                Ok(ExecuteResponse::CreatedConnection)
780            }
781            Err(err) => {
782                // Clean up SSH key material if it was persisted, since the
783                // catalog item was not created.
784                if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
785                    if let Err(e) = self.secrets_controller.delete(connection_id).await {
786                        tracing::warn!(
787                            "Dropping SSH secret for failed connection has encountered an error: {}",
788                            e
789                        );
790                    } else {
791                        self.caching_secrets_reader.invalidate(connection_id);
792                    }
793                }
794                Err(err)
795            }
796        }
797    }
798
799    #[instrument]
800    pub(super) async fn sequence_create_database(
801        &mut self,
802        session: &Session,
803        plan: plan::CreateDatabasePlan,
804    ) -> Result<ExecuteResponse, AdapterError> {
805        let ops = vec![catalog::Op::CreateDatabase {
806            name: plan.name.clone(),
807            owner_id: *session.current_role_id(),
808        }];
809        match self.catalog_transact(Some(session), ops).await {
810            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
811            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
812                kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
813            })) if plan.if_not_exists => {
814                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
815                Ok(ExecuteResponse::CreatedDatabase)
816            }
817            Err(err) => Err(err),
818        }
819    }
820
821    #[instrument]
822    pub(super) async fn sequence_create_schema(
823        &mut self,
824        session: &Session,
825        plan: plan::CreateSchemaPlan,
826    ) -> Result<ExecuteResponse, AdapterError> {
827        let op = catalog::Op::CreateSchema {
828            database_id: plan.database_spec,
829            schema_name: plan.schema_name.clone(),
830            owner_id: *session.current_role_id(),
831        };
832        match self.catalog_transact(Some(session), vec![op]).await {
833            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
834            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
835                kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
836            })) if plan.if_not_exists => {
837                session.add_notice(AdapterNotice::SchemaAlreadyExists {
838                    name: plan.schema_name,
839                });
840                Ok(ExecuteResponse::CreatedSchema)
841            }
842            Err(err) => Err(err),
843        }
844    }
845
846    /// Validates the role attributes for a `CREATE ROLE` statement.
847    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
848        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
849            if attributes.superuser.is_some() || attributes.password.is_some() {
850                return Err(AdapterError::UnavailableFeature {
851                    feature: "SUPERUSER and PASSWORD attributes".to_string(),
852                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
853                });
854            }
855        }
856        Ok(())
857    }
858
859    #[instrument]
860    pub(super) async fn sequence_create_role(
861        &mut self,
862        conn_id: Option<&ConnectionId>,
863        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
864    ) -> Result<ExecuteResponse, AdapterError> {
865        self.validate_role_attributes(&attributes.clone())?;
866        let op = catalog::Op::CreateRole { name, attributes };
867        self.catalog_transact_with_context(conn_id, None, vec![op])
868            .await
869            .map(|_| ExecuteResponse::CreatedRole)
870    }
871
872    #[instrument]
873    pub(super) async fn sequence_create_network_policy(
874        &mut self,
875        session: &Session,
876        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
877    ) -> Result<ExecuteResponse, AdapterError> {
878        let op = catalog::Op::CreateNetworkPolicy {
879            rules,
880            name,
881            owner_id: *session.current_role_id(),
882        };
883        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
884            .await
885            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
886    }
887
888    #[instrument]
889    pub(super) async fn sequence_alter_network_policy(
890        &mut self,
891        session: &Session,
892        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
893    ) -> Result<ExecuteResponse, AdapterError> {
894        // TODO(network_policy): Consider role based network policies here.
895        let current_network_policy_name =
896            self.catalog().system_config().default_network_policy_name();
897        // Check if the way we're alerting the policy is still valid for the current connection.
898        if current_network_policy_name == name {
899            self.validate_alter_network_policy(session, &rules)?;
900        }
901
902        let op = catalog::Op::AlterNetworkPolicy {
903            id,
904            rules,
905            name,
906            owner_id: *session.current_role_id(),
907        };
908        self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
909            .await
910            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
911    }
912
913    #[instrument]
914    pub(super) async fn sequence_create_table(
915        &mut self,
916        ctx: &mut ExecuteContext,
917        plan: plan::CreateTablePlan,
918        resolved_ids: ResolvedIds,
919    ) -> Result<ExecuteResponse, AdapterError> {
920        let plan::CreateTablePlan {
921            name,
922            table,
923            if_not_exists,
924        } = plan;
925
926        let conn_id = if table.temporary {
927            Some(ctx.session().conn_id())
928        } else {
929            None
930        };
931        let (table_id, global_id) = self.allocate_user_id().await?;
932        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
933
934        let data_source = match table.data_source {
935            plan::TableDataSource::TableWrites { defaults } => {
936                TableDataSource::TableWrites { defaults }
937            }
938            plan::TableDataSource::DataSource {
939                desc: data_source_plan,
940                timeline,
941            } => match data_source_plan {
942                plan::DataSourceDesc::IngestionExport {
943                    ingestion_id,
944                    external_reference,
945                    details,
946                    data_config,
947                } => TableDataSource::DataSource {
948                    desc: DataSourceDesc::IngestionExport {
949                        ingestion_id,
950                        external_reference,
951                        details,
952                        data_config,
953                    },
954                    timeline,
955                },
956                plan::DataSourceDesc::Webhook {
957                    validate_using,
958                    body_format,
959                    headers,
960                    cluster_id,
961                } => TableDataSource::DataSource {
962                    desc: DataSourceDesc::Webhook {
963                        validate_using,
964                        body_format,
965                        headers,
966                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
967                    },
968                    timeline,
969                },
970                o => {
971                    unreachable!("CREATE TABLE data source got {:?}", o)
972                }
973            },
974        };
975
976        let is_webhook = if let TableDataSource::DataSource {
977            desc: DataSourceDesc::Webhook { .. },
978            timeline: _,
979        } = &data_source
980        {
981            true
982        } else {
983            false
984        };
985
986        let table = Table {
987            create_sql: Some(table.create_sql),
988            desc: table.desc,
989            collections,
990            conn_id: conn_id.cloned(),
991            resolved_ids,
992            custom_logical_compaction_window: table.compaction_window,
993            is_retained_metrics_object: false,
994            data_source,
995        };
996        let ops = vec![catalog::Op::CreateItem {
997            id: table_id,
998            name: name.clone(),
999            item: CatalogItem::Table(table.clone()),
1000            owner_id: *ctx.session().current_role_id(),
1001        }];
1002
1003        let catalog_result = self
1004            .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1005            .await;
1006
1007        if is_webhook {
1008            // try_get_webhook_url will make up a URL for things that are not
1009            // webhooks, so we guard against that here.
1010            if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1011                ctx.session()
1012                    .add_notice(AdapterNotice::WebhookSourceCreated { url })
1013            }
1014        }
1015
1016        match catalog_result {
1017            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1018            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1019                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1020            })) if if_not_exists => {
1021                ctx.session_mut()
1022                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1023                        name: name.item,
1024                        ty: "table",
1025                    });
1026                Ok(ExecuteResponse::CreatedTable)
1027            }
1028            Err(err) => Err(err),
1029        }
1030    }
1031
1032    #[instrument]
1033    pub(super) async fn sequence_create_sink(
1034        &mut self,
1035        ctx: ExecuteContext,
1036        plan: plan::CreateSinkPlan,
1037        resolved_ids: ResolvedIds,
1038    ) {
1039        let plan::CreateSinkPlan {
1040            name,
1041            sink,
1042            with_snapshot,
1043            if_not_exists,
1044            in_cluster,
1045        } = plan;
1046
1047        // First try to allocate an ID and an OID. If either fails, we're done.
1048        let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1049
1050        let catalog_sink = Sink {
1051            create_sql: sink.create_sql,
1052            global_id,
1053            from: sink.from,
1054            connection: sink.connection,
1055            envelope: sink.envelope,
1056            version: sink.version,
1057            with_snapshot,
1058            resolved_ids,
1059            cluster_id: in_cluster,
1060            commit_interval: sink.commit_interval,
1061        };
1062
1063        let ops = vec![catalog::Op::CreateItem {
1064            id: item_id,
1065            name: name.clone(),
1066            item: CatalogItem::Sink(catalog_sink.clone()),
1067            owner_id: *ctx.session().current_role_id(),
1068        }];
1069
1070        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1071
1072        match result {
1073            Ok(()) => {}
1074            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1075                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1076            })) if if_not_exists => {
1077                ctx.session()
1078                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1079                        name: name.item,
1080                        ty: "sink",
1081                    });
1082                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1083                return;
1084            }
1085            Err(e) => {
1086                ctx.retire(Err(e));
1087                return;
1088            }
1089        };
1090
1091        self.create_storage_export(global_id, &catalog_sink)
1092            .await
1093            .unwrap_or_terminate("cannot fail to create exports");
1094
1095        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1096            .await;
1097
1098        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1099    }
1100
1101    /// Validates that a view definition does not contain any expressions that may lead to
1102    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1103    ///
1104    /// We prevent these expressions so that we can add columns to system tables without
1105    /// changing the definition of the view.
1106    ///
1107    /// Here is a bit of a hand wavy proof as to why we only need to check the
1108    /// immediate view definition for system objects and ambiguous column
1109    /// references, and not the entire dependency tree:
1110    ///
1111    ///   - A view with no object references cannot have any ambiguous column
1112    ///   references to a system object, because it has no system objects.
1113    ///   - A view with a direct reference to a system object and a * or
1114    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1115    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1116    ///   any ambiguous column references to a system object, because all column
1117    ///   references are explicitly named.
1118    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1119    ///   system object cannot have any ambiguous column references to a system
1120    ///   object, because there are no system objects in the top level view and
1121    ///   all sub-views are guaranteed to have no ambiguous column references to
1122    ///   system objects.
1123    pub(super) fn validate_system_column_references(
1124        &self,
1125        uses_ambiguous_columns: bool,
1126        depends_on: &BTreeSet<GlobalId>,
1127    ) -> Result<(), AdapterError> {
1128        if uses_ambiguous_columns
1129            && depends_on
1130                .iter()
1131                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1132        {
1133            Err(AdapterError::AmbiguousSystemColumnReference)
1134        } else {
1135            Ok(())
1136        }
1137    }
1138
1139    #[instrument]
1140    pub(super) async fn sequence_create_type(
1141        &mut self,
1142        session: &Session,
1143        plan: plan::CreateTypePlan,
1144        resolved_ids: ResolvedIds,
1145    ) -> Result<ExecuteResponse, AdapterError> {
1146        let (item_id, global_id) = self.allocate_user_id().await?;
1147        // Validate the type definition (e.g., composite columns) before storing.
1148        plan.typ
1149            .inner
1150            .desc(&self.catalog().for_session(session))
1151            .map_err(AdapterError::from)?;
1152        let typ = Type {
1153            create_sql: Some(plan.typ.create_sql),
1154            global_id,
1155            details: CatalogTypeDetails {
1156                array_id: None,
1157                typ: plan.typ.inner,
1158                pg_metadata: None,
1159            },
1160            resolved_ids,
1161        };
1162        let op = catalog::Op::CreateItem {
1163            id: item_id,
1164            name: plan.name,
1165            item: CatalogItem::Type(typ),
1166            owner_id: *session.current_role_id(),
1167        };
1168        match self.catalog_transact(Some(session), vec![op]).await {
1169            Ok(()) => Ok(ExecuteResponse::CreatedType),
1170            Err(err) => Err(err),
1171        }
1172    }
1173
1174    #[instrument]
1175    pub(super) async fn sequence_comment_on(
1176        &mut self,
1177        session: &Session,
1178        plan: plan::CommentPlan,
1179    ) -> Result<ExecuteResponse, AdapterError> {
1180        let op = catalog::Op::Comment {
1181            object_id: plan.object_id,
1182            sub_component: plan.sub_component,
1183            comment: plan.comment,
1184        };
1185        self.catalog_transact(Some(session), vec![op]).await?;
1186        Ok(ExecuteResponse::Comment)
1187    }
1188
1189    #[instrument]
1190    pub(super) async fn sequence_drop_objects(
1191        &mut self,
1192        ctx: &mut ExecuteContext,
1193        plan::DropObjectsPlan {
1194            drop_ids,
1195            object_type,
1196            referenced_ids,
1197        }: plan::DropObjectsPlan,
1198    ) -> Result<ExecuteResponse, AdapterError> {
1199        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1200        let mut objects = Vec::new();
1201        for obj_id in &drop_ids {
1202            if !referenced_ids_hashset.contains(obj_id) {
1203                let object_info = ErrorMessageObjectDescription::from_id(
1204                    obj_id,
1205                    &self.catalog().for_session(ctx.session()),
1206                )
1207                .to_string();
1208                objects.push(object_info);
1209            }
1210        }
1211
1212        if !objects.is_empty() {
1213            ctx.session()
1214                .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1215        }
1216
1217        // Collect GlobalIds for expression cache invalidation.
1218        let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1219            .iter()
1220            .filter_map(|id| match id {
1221                ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1222                _ => None,
1223            })
1224            .flatten()
1225            .collect();
1226
1227        let DropOps {
1228            ops,
1229            dropped_active_db,
1230            dropped_active_cluster,
1231            dropped_in_use_indexes,
1232        } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1233
1234        self.catalog_transact_with_context(None, Some(ctx), ops)
1235            .await?;
1236
1237        // Invalidate expression cache entries for dropped objects.
1238        if !expr_cache_invalidate_ids.is_empty() {
1239            let _fut = self.catalog().update_expression_cache(
1240                Default::default(),
1241                Default::default(),
1242                expr_cache_invalidate_ids,
1243            );
1244        }
1245
1246        fail::fail_point!("after_sequencer_drop_replica");
1247
1248        if dropped_active_db {
1249            ctx.session()
1250                .add_notice(AdapterNotice::DroppedActiveDatabase {
1251                    name: ctx.session().vars().database().to_string(),
1252                });
1253        }
1254        if dropped_active_cluster {
1255            ctx.session()
1256                .add_notice(AdapterNotice::DroppedActiveCluster {
1257                    name: ctx.session().vars().cluster().to_string(),
1258                });
1259        }
1260        for dropped_in_use_index in dropped_in_use_indexes {
1261            ctx.session()
1262                .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1263            self.metrics
1264                .optimization_notices
1265                .with_label_values(&["DroppedInUseIndex"])
1266                .inc_by(1);
1267        }
1268        Ok(ExecuteResponse::DroppedObject(object_type))
1269    }
1270
1271    fn validate_dropped_role_ownership(
1272        &self,
1273        session: &Session,
1274        dropped_roles: &BTreeMap<RoleId, &str>,
1275    ) -> Result<(), AdapterError> {
1276        fn privilege_check(
1277            privileges: &PrivilegeMap,
1278            dropped_roles: &BTreeMap<RoleId, &str>,
1279            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1280            object_id: &SystemObjectId,
1281            catalog: &ConnCatalog,
1282        ) {
1283            for privilege in privileges.all_values() {
1284                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1285                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1286                    let object_description =
1287                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1288                    dependent_objects
1289                        .entry(role_name.to_string())
1290                        .or_default()
1291                        .push(format!(
1292                            "privileges on {object_description} granted by {grantor_name}",
1293                        ));
1294                }
1295                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1296                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1297                    let object_description =
1298                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1299                    dependent_objects
1300                        .entry(role_name.to_string())
1301                        .or_default()
1302                        .push(format!(
1303                            "privileges granted on {object_description} to {grantee_name}"
1304                        ));
1305                }
1306            }
1307        }
1308
1309        let catalog = self.catalog().for_session(session);
1310        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1311        for entry in self.catalog.entries() {
1312            let id = SystemObjectId::Object(entry.id().into());
1313            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1314                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1315                dependent_objects
1316                    .entry(role_name.to_string())
1317                    .or_default()
1318                    .push(format!("owner of {object_description}"));
1319            }
1320            privilege_check(
1321                entry.privileges(),
1322                dropped_roles,
1323                &mut dependent_objects,
1324                &id,
1325                &catalog,
1326            );
1327        }
1328        for database in self.catalog.databases() {
1329            let database_id = SystemObjectId::Object(database.id().into());
1330            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1331                let object_description =
1332                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1333                dependent_objects
1334                    .entry(role_name.to_string())
1335                    .or_default()
1336                    .push(format!("owner of {object_description}"));
1337            }
1338            privilege_check(
1339                &database.privileges,
1340                dropped_roles,
1341                &mut dependent_objects,
1342                &database_id,
1343                &catalog,
1344            );
1345            for schema in database.schemas_by_id.values() {
1346                let schema_id = SystemObjectId::Object(
1347                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1348                );
1349                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1350                    let object_description =
1351                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1352                    dependent_objects
1353                        .entry(role_name.to_string())
1354                        .or_default()
1355                        .push(format!("owner of {object_description}"));
1356                }
1357                privilege_check(
1358                    &schema.privileges,
1359                    dropped_roles,
1360                    &mut dependent_objects,
1361                    &schema_id,
1362                    &catalog,
1363                );
1364            }
1365        }
1366        for cluster in self.catalog.clusters() {
1367            let cluster_id = SystemObjectId::Object(cluster.id().into());
1368            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1369                let object_description =
1370                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1371                dependent_objects
1372                    .entry(role_name.to_string())
1373                    .or_default()
1374                    .push(format!("owner of {object_description}"));
1375            }
1376            privilege_check(
1377                &cluster.privileges,
1378                dropped_roles,
1379                &mut dependent_objects,
1380                &cluster_id,
1381                &catalog,
1382            );
1383            for replica in cluster.replicas() {
1384                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1385                    let replica_id =
1386                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1387                    let object_description =
1388                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1389                    dependent_objects
1390                        .entry(role_name.to_string())
1391                        .or_default()
1392                        .push(format!("owner of {object_description}"));
1393                }
1394            }
1395        }
1396        privilege_check(
1397            self.catalog().system_privileges(),
1398            dropped_roles,
1399            &mut dependent_objects,
1400            &SystemObjectId::System,
1401            &catalog,
1402        );
1403        for (default_privilege_object, default_privilege_acl_items) in
1404            self.catalog.default_privileges()
1405        {
1406            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1407                dependent_objects
1408                    .entry(role_name.to_string())
1409                    .or_default()
1410                    .push(format!(
1411                        "default privileges on {}S created by {}",
1412                        default_privilege_object.object_type, role_name
1413                    ));
1414            }
1415            for default_privilege_acl_item in default_privilege_acl_items {
1416                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1417                    dependent_objects
1418                        .entry(role_name.to_string())
1419                        .or_default()
1420                        .push(format!(
1421                            "default privileges on {}S granted to {}",
1422                            default_privilege_object.object_type, role_name
1423                        ));
1424                }
1425            }
1426        }
1427
1428        if !dependent_objects.is_empty() {
1429            Err(AdapterError::DependentObject(dependent_objects))
1430        } else {
1431            Ok(())
1432        }
1433    }
1434
1435    #[instrument]
1436    pub(super) async fn sequence_drop_owned(
1437        &mut self,
1438        session: &Session,
1439        plan: plan::DropOwnedPlan,
1440    ) -> Result<ExecuteResponse, AdapterError> {
1441        for role_id in &plan.role_ids {
1442            self.catalog().ensure_not_reserved_role(role_id)?;
1443        }
1444
1445        let mut privilege_revokes = plan.privilege_revokes;
1446
1447        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1448        let session_catalog = self.catalog().for_session(session);
1449        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1450            && !session.is_superuser()
1451        {
1452            // Obtain all roles that the current session is a member of.
1453            let role_membership =
1454                session_catalog.collect_role_membership(session.current_role_id());
1455            let invalid_revokes: BTreeSet<_> = privilege_revokes
1456                .extract_if(.., |(_, privilege)| {
1457                    !role_membership.contains(&privilege.grantor)
1458                })
1459                .map(|(object_id, _)| object_id)
1460                .collect();
1461            for invalid_revoke in invalid_revokes {
1462                let object_description =
1463                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1464                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1465            }
1466        }
1467
1468        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1469            catalog::Op::UpdatePrivilege {
1470                target_id: object_id,
1471                privilege,
1472                variant: UpdatePrivilegeVariant::Revoke,
1473            }
1474        });
1475        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1476            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1477                privilege_object,
1478                privilege_acl_item,
1479                variant: UpdatePrivilegeVariant::Revoke,
1480            },
1481        );
1482        let DropOps {
1483            ops: drop_ops,
1484            dropped_active_db,
1485            dropped_active_cluster,
1486            dropped_in_use_indexes,
1487        } = self.sequence_drop_common(session, plan.drop_ids)?;
1488
1489        let ops = privilege_revoke_ops
1490            .chain(default_privilege_revoke_ops)
1491            .chain(drop_ops.into_iter())
1492            .collect();
1493
1494        self.catalog_transact(Some(session), ops).await?;
1495
1496        if dropped_active_db {
1497            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1498                name: session.vars().database().to_string(),
1499            });
1500        }
1501        if dropped_active_cluster {
1502            session.add_notice(AdapterNotice::DroppedActiveCluster {
1503                name: session.vars().cluster().to_string(),
1504            });
1505        }
1506        for dropped_in_use_index in dropped_in_use_indexes {
1507            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1508        }
1509        Ok(ExecuteResponse::DroppedOwned)
1510    }
1511
1512    fn sequence_drop_common(
1513        &self,
1514        session: &Session,
1515        ids: Vec<ObjectId>,
1516    ) -> Result<DropOps, AdapterError> {
1517        let mut dropped_active_db = false;
1518        let mut dropped_active_cluster = false;
1519        let mut dropped_in_use_indexes = Vec::new();
1520        let mut dropped_roles = BTreeMap::new();
1521        let mut dropped_databases = BTreeSet::new();
1522        let mut dropped_schemas = BTreeSet::new();
1523        // Dropping either the group role or the member role of a role membership will trigger a
1524        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1525        // role membership twice.
1526        let mut role_revokes = BTreeSet::new();
1527        // Dropping a database or a schema will revoke all default roles associated with that
1528        // database or schema.
1529        let mut default_privilege_revokes = BTreeSet::new();
1530
1531        // Clusters we're dropping
1532        let mut clusters_to_drop = BTreeSet::new();
1533
1534        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1535        for id in &ids {
1536            match id {
1537                ObjectId::Database(id) => {
1538                    let name = self.catalog().get_database(id).name();
1539                    if name == session.vars().database() {
1540                        dropped_active_db = true;
1541                    }
1542                    dropped_databases.insert(id);
1543                }
1544                ObjectId::Schema((_, spec)) => {
1545                    if let SchemaSpecifier::Id(id) = spec {
1546                        dropped_schemas.insert(id);
1547                    }
1548                }
1549                ObjectId::Cluster(id) => {
1550                    clusters_to_drop.insert(*id);
1551                    if let Some(active_id) = self
1552                        .catalog()
1553                        .active_cluster(session)
1554                        .ok()
1555                        .map(|cluster| cluster.id())
1556                    {
1557                        if id == &active_id {
1558                            dropped_active_cluster = true;
1559                        }
1560                    }
1561                }
1562                ObjectId::Role(id) => {
1563                    let role = self.catalog().get_role(id);
1564                    let name = role.name();
1565                    dropped_roles.insert(*id, name);
1566                    // We must revoke all role memberships that the dropped roles belongs to.
1567                    for (group_id, grantor_id) in &role.membership.map {
1568                        role_revokes.insert((*group_id, *id, *grantor_id));
1569                    }
1570                }
1571                ObjectId::Item(id) => {
1572                    if let Some(index) = self.catalog().get_entry(id).index() {
1573                        let humanizer = self.catalog().for_session(session);
1574                        let dependants = self
1575                            .controller
1576                            .compute
1577                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1578                            .ok()
1579                            .into_iter()
1580                            .flatten()
1581                            .filter(|dependant_id| {
1582                                // Transient Ids belong to Peeks. We are not interested for now in
1583                                // peeks depending on a dropped index.
1584                                // TODO: show a different notice in this case. Something like
1585                                // "There is an in-progress ad hoc SELECT that uses the dropped
1586                                // index. The resources used by the index will be freed when all
1587                                // such SELECTs complete."
1588                                if dependant_id.is_transient() {
1589                                    return false;
1590                                }
1591                                // The item should exist, but don't panic if it doesn't.
1592                                let Some(dependent_id) = humanizer
1593                                    .try_get_item_by_global_id(dependant_id)
1594                                    .map(|item| item.id())
1595                                else {
1596                                    return false;
1597                                };
1598                                // If the dependent object is also being dropped, then there is no
1599                                // problem, so we don't want a notice.
1600                                !ids_set.contains(&ObjectId::Item(dependent_id))
1601                            })
1602                            .flat_map(|dependant_id| {
1603                                // If we are not able to find a name for this ID it probably means
1604                                // we have already dropped the compute collection, in which case we
1605                                // can ignore it.
1606                                humanizer.humanize_id(dependant_id)
1607                            })
1608                            .collect_vec();
1609                        if !dependants.is_empty() {
1610                            dropped_in_use_indexes.push(DroppedInUseIndex {
1611                                index_name: humanizer
1612                                    .humanize_id(index.global_id())
1613                                    .unwrap_or_else(|| id.to_string()),
1614                                dependant_objects: dependants,
1615                            });
1616                        }
1617                    }
1618                }
1619                _ => {}
1620            }
1621        }
1622
1623        for id in &ids {
1624            match id {
1625                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1626                // unless they are internal replicas, which exist outside the scope
1627                // of managed clusters.
1628                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1629                    if !clusters_to_drop.contains(cluster_id) {
1630                        let cluster = self.catalog.get_cluster(*cluster_id);
1631                        if cluster.is_managed() {
1632                            let replica =
1633                                cluster.replica(*replica_id).expect("Catalog out of sync");
1634                            if !replica.config.location.internal() {
1635                                coord_bail!("cannot drop replica of managed cluster");
1636                            }
1637                        }
1638                    }
1639                }
1640                _ => {}
1641            }
1642        }
1643
1644        for role_id in dropped_roles.keys() {
1645            self.catalog().ensure_not_reserved_role(role_id)?;
1646        }
1647        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1648        // If any role is a member of a dropped role, then we must revoke that membership.
1649        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1650        for role in self.catalog().user_roles() {
1651            for dropped_role_id in
1652                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1653            {
1654                role_revokes.insert((
1655                    **dropped_role_id,
1656                    role.id(),
1657                    *role
1658                        .membership
1659                        .map
1660                        .get(*dropped_role_id)
1661                        .expect("included in keys above"),
1662                ));
1663            }
1664        }
1665
1666        for (default_privilege_object, default_privilege_acls) in
1667            self.catalog().default_privileges()
1668        {
1669            if matches!(
1670                &default_privilege_object.database_id,
1671                Some(database_id) if dropped_databases.contains(database_id),
1672            ) || matches!(
1673                &default_privilege_object.schema_id,
1674                Some(schema_id) if dropped_schemas.contains(schema_id),
1675            ) {
1676                for default_privilege_acl in default_privilege_acls {
1677                    default_privilege_revokes.insert((
1678                        default_privilege_object.clone(),
1679                        default_privilege_acl.clone(),
1680                    ));
1681                }
1682            }
1683        }
1684
1685        let ops = role_revokes
1686            .into_iter()
1687            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1688                role_id,
1689                member_id,
1690                grantor_id,
1691            })
1692            .chain(default_privilege_revokes.into_iter().map(
1693                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1694                    privilege_object,
1695                    privilege_acl_item,
1696                    variant: UpdatePrivilegeVariant::Revoke,
1697                },
1698            ))
1699            .chain(iter::once(catalog::Op::DropObjects(
1700                ids.into_iter()
1701                    .map(DropObjectInfo::manual_drop_from_object_id)
1702                    .collect(),
1703            )))
1704            .collect();
1705
1706        Ok(DropOps {
1707            ops,
1708            dropped_active_db,
1709            dropped_active_cluster,
1710            dropped_in_use_indexes,
1711        })
1712    }
1713
1714    pub(super) fn sequence_explain_schema(
1715        &self,
1716        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1717    ) -> Result<ExecuteResponse, AdapterError> {
1718        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1719            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1720        })?;
1721
1722        let json_string = json_string(&json_value);
1723        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1724        Ok(Self::send_immediate_rows(row))
1725    }
1726
1727    pub(super) fn sequence_show_all_variables(
1728        &self,
1729        session: &Session,
1730    ) -> Result<ExecuteResponse, AdapterError> {
1731        let mut rows = viewable_variables(self.catalog().state(), session)
1732            .map(|v| (v.name(), v.value(), v.description()))
1733            .collect::<Vec<_>>();
1734        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1735
1736        // TODO(parkmycar): Pack all of these into a single RowCollection.
1737        let rows: Vec<_> = rows
1738            .into_iter()
1739            .map(|(name, val, desc)| {
1740                Row::pack_slice(&[
1741                    Datum::String(name),
1742                    Datum::String(&val),
1743                    Datum::String(desc),
1744                ])
1745            })
1746            .collect();
1747        Ok(Self::send_immediate_rows(rows))
1748    }
1749
1750    pub(super) fn sequence_show_variable(
1751        &self,
1752        session: &Session,
1753        plan: plan::ShowVariablePlan,
1754    ) -> Result<ExecuteResponse, AdapterError> {
1755        if &plan.name == SCHEMA_ALIAS {
1756            let schemas = self.catalog.resolve_search_path(session);
1757            let schema = schemas.first();
1758            return match schema {
1759                Some((database_spec, schema_spec)) => {
1760                    let schema_name = &self
1761                        .catalog
1762                        .get_schema(database_spec, schema_spec, session.conn_id())
1763                        .name()
1764                        .schema;
1765                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
1766                    Ok(Self::send_immediate_rows(row))
1767                }
1768                None => {
1769                    if session.vars().current_object_missing_warnings() {
1770                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1771                            search_path: session
1772                                .vars()
1773                                .search_path()
1774                                .into_iter()
1775                                .map(|schema| schema.to_string())
1776                                .collect(),
1777                        });
1778                    }
1779                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1780                }
1781            };
1782        }
1783
1784        let variable = session
1785            .vars()
1786            .get(self.catalog().system_config(), &plan.name)
1787            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1788
1789        // In lieu of plumbing the user to all system config functions, just check that the var is
1790        // visible.
1791        variable.visible(session.user(), self.catalog().system_config())?;
1792
1793        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1794        if variable.name() == vars::DATABASE.name()
1795            && matches!(
1796                self.catalog().resolve_database(&variable.value()),
1797                Err(CatalogError::UnknownDatabase(_))
1798            )
1799            && session.vars().current_object_missing_warnings()
1800        {
1801            let name = variable.value();
1802            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1803        } else if variable.name() == vars::CLUSTER.name()
1804            && matches!(
1805                self.catalog().resolve_cluster(&variable.value()),
1806                Err(CatalogError::UnknownCluster(_))
1807            )
1808            && session.vars().current_object_missing_warnings()
1809        {
1810            let name = variable.value();
1811            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1812        }
1813        Ok(Self::send_immediate_rows(row))
1814    }
1815
1816    #[instrument]
1817    pub(super) async fn sequence_inspect_shard(
1818        &self,
1819        session: &Session,
1820        plan: plan::InspectShardPlan,
1821    ) -> Result<ExecuteResponse, AdapterError> {
1822        // TODO: Not thrilled about this rbac special case here, but probably
1823        // sufficient for now.
1824        if !session.user().is_internal() {
1825            return Err(AdapterError::Unauthorized(
1826                rbac::UnauthorizedError::MzSystem {
1827                    action: "inspect".into(),
1828                },
1829            ));
1830        }
1831        let state = self
1832            .controller
1833            .storage
1834            .inspect_persist_state(plan.id)
1835            .await?;
1836        let jsonb = Jsonb::from_serde_json(state)?;
1837        Ok(Self::send_immediate_rows(jsonb.into_row()))
1838    }
1839
1840    #[instrument]
1841    pub(super) fn sequence_set_variable(
1842        &self,
1843        session: &mut Session,
1844        plan: plan::SetVariablePlan,
1845    ) -> Result<ExecuteResponse, AdapterError> {
1846        let (name, local) = (plan.name, plan.local);
1847        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1848            self.validate_set_isolation_level(session)?;
1849        }
1850        if &name == vars::CLUSTER.name() {
1851            self.validate_set_cluster(session)?;
1852        }
1853
1854        let vars = session.vars_mut();
1855        let values = match plan.value {
1856            plan::VariableValue::Default => None,
1857            plan::VariableValue::Values(values) => Some(values),
1858        };
1859
1860        match values {
1861            Some(values) => {
1862                vars.set(
1863                    self.catalog().system_config(),
1864                    &name,
1865                    VarInput::SqlSet(&values),
1866                    local,
1867                )?;
1868
1869                let vars = session.vars();
1870
1871                // Emit a warning when deprecated variables are used.
1872                // TODO(database-issues#8069) remove this after sufficient time has passed
1873                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1874                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1875                } else if name == vars::CLUSTER.name()
1876                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1877                {
1878                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1879                }
1880
1881                // Database or cluster value does not correspond to a catalog item.
1882                if name.as_str() == vars::DATABASE.name()
1883                    && matches!(
1884                        self.catalog().resolve_database(vars.database()),
1885                        Err(CatalogError::UnknownDatabase(_))
1886                    )
1887                    && session.vars().current_object_missing_warnings()
1888                {
1889                    let name = vars.database().to_string();
1890                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1891                } else if name.as_str() == vars::CLUSTER.name()
1892                    && matches!(
1893                        self.catalog().resolve_cluster(vars.cluster()),
1894                        Err(CatalogError::UnknownCluster(_))
1895                    )
1896                    && session.vars().current_object_missing_warnings()
1897                {
1898                    let name = vars.cluster().to_string();
1899                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1900                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1901                    let v = values.into_first().to_lowercase();
1902                    if v == IsolationLevel::ReadUncommitted.as_str()
1903                        || v == IsolationLevel::ReadCommitted.as_str()
1904                        || v == IsolationLevel::RepeatableRead.as_str()
1905                    {
1906                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1907                            isolation_level: v,
1908                        });
1909                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1910                        session.add_notice(AdapterNotice::StrongSessionSerializable);
1911                    }
1912                }
1913            }
1914            None => vars.reset(self.catalog().system_config(), &name, local)?,
1915        }
1916
1917        Ok(ExecuteResponse::SetVariable { name, reset: false })
1918    }
1919
1920    pub(super) fn sequence_reset_variable(
1921        &self,
1922        session: &mut Session,
1923        plan: plan::ResetVariablePlan,
1924    ) -> Result<ExecuteResponse, AdapterError> {
1925        let name = plan.name;
1926        if &name == TRANSACTION_ISOLATION_VAR_NAME {
1927            self.validate_set_isolation_level(session)?;
1928        }
1929        if &name == vars::CLUSTER.name() {
1930            self.validate_set_cluster(session)?;
1931        }
1932        session
1933            .vars_mut()
1934            .reset(self.catalog().system_config(), &name, false)?;
1935        Ok(ExecuteResponse::SetVariable { name, reset: true })
1936    }
1937
1938    pub(super) fn sequence_set_transaction(
1939        &self,
1940        session: &mut Session,
1941        plan: plan::SetTransactionPlan,
1942    ) -> Result<ExecuteResponse, AdapterError> {
1943        // TODO(jkosh44) Only supports isolation levels for now.
1944        for mode in plan.modes {
1945            match mode {
1946                TransactionMode::AccessMode(_) => {
1947                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1948                }
1949                TransactionMode::IsolationLevel(isolation_level) => {
1950                    self.validate_set_isolation_level(session)?;
1951
1952                    session.vars_mut().set(
1953                        self.catalog().system_config(),
1954                        TRANSACTION_ISOLATION_VAR_NAME,
1955                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
1956                        plan.local,
1957                    )?
1958                }
1959            }
1960        }
1961        Ok(ExecuteResponse::SetVariable {
1962            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1963            reset: false,
1964        })
1965    }
1966
1967    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1968        if session.transaction().contains_ops() {
1969            Err(AdapterError::InvalidSetIsolationLevel)
1970        } else {
1971            Ok(())
1972        }
1973    }
1974
1975    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1976        if session.transaction().contains_ops() {
1977            Err(AdapterError::InvalidSetCluster)
1978        } else {
1979            Ok(())
1980        }
1981    }
1982
1983    #[instrument]
1984    pub(super) async fn sequence_end_transaction(
1985        &mut self,
1986        mut ctx: ExecuteContext,
1987        mut action: EndTransactionAction,
1988    ) {
1989        // If the transaction has failed, we can only rollback.
1990        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1991            (&action, ctx.session().transaction())
1992        {
1993            action = EndTransactionAction::Rollback;
1994        }
1995        let response = match action {
1996            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
1997                params: BTreeMap::new(),
1998            }),
1999            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2000                params: BTreeMap::new(),
2001            }),
2002        };
2003
2004        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2005
2006        let (response, action) = match result {
2007            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2008                (response, action)
2009            }
2010            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2011                // Make sure we have the correct set of write locks for this transaction.
2012                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2013                // transactions.
2014                let validated_locks = match write_lock_guards {
2015                    None => None,
2016                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2017                        Ok(locks) => Some(locks),
2018                        Err(missing) => {
2019                            tracing::error!(?missing, "programming error, missing write locks");
2020                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2021                        }
2022                    },
2023                };
2024
2025                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2026                for WriteOp { id, rows } in writes {
2027                    let total_rows = collected_writes.entry(id).or_default();
2028                    total_rows.push(rows);
2029                }
2030
2031                self.submit_write(PendingWriteTxn::User {
2032                    span: Span::current(),
2033                    writes: collected_writes,
2034                    write_locks: validated_locks,
2035                    pending_txn: PendingTxn {
2036                        ctx,
2037                        response,
2038                        action,
2039                    },
2040                });
2041                return;
2042            }
2043            Ok((
2044                Some(TransactionOps::Peeks {
2045                    determination,
2046                    requires_linearization: RequireLinearization::Required,
2047                    ..
2048                }),
2049                _,
2050            )) if ctx.session().vars().transaction_isolation()
2051                == &IsolationLevel::StrictSerializable =>
2052            {
2053                let conn_id = ctx.session().conn_id().clone();
2054                let pending_read_txn = PendingReadTxn {
2055                    txn: PendingRead::Read {
2056                        txn: PendingTxn {
2057                            ctx,
2058                            response,
2059                            action,
2060                        },
2061                    },
2062                    timestamp_context: determination.timestamp_context,
2063                    created: Instant::now(),
2064                    num_requeues: 0,
2065                    otel_ctx: OpenTelemetryContext::obtain(),
2066                };
2067                self.strict_serializable_reads_tx
2068                    .send((conn_id, pending_read_txn))
2069                    .expect("sending to strict_serializable_reads_tx cannot fail");
2070                return;
2071            }
2072            Ok((
2073                Some(TransactionOps::Peeks {
2074                    determination,
2075                    requires_linearization: RequireLinearization::Required,
2076                    ..
2077                }),
2078                _,
2079            )) if ctx.session().vars().transaction_isolation()
2080                == &IsolationLevel::StrongSessionSerializable =>
2081            {
2082                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2083                    ctx.session_mut()
2084                        .ensure_timestamp_oracle(timeline.clone())
2085                        .apply_write(*ts);
2086                }
2087                (response, action)
2088            }
2089            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2090                self.internal_cmd_tx
2091                    .send(Message::ExecuteSingleStatementTransaction {
2092                        ctx,
2093                        otel_ctx: OpenTelemetryContext::obtain(),
2094                        stmt,
2095                        params,
2096                    })
2097                    .expect("must send");
2098                return;
2099            }
2100            Ok((_, _)) => (response, action),
2101            Err(err) => (Err(err), EndTransactionAction::Rollback),
2102        };
2103        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2104        // Append any parameters that changed to the response.
2105        let response = response.map(|mut r| {
2106            r.extend_params(changed);
2107            ExecuteResponse::from(r)
2108        });
2109
2110        ctx.retire(response);
2111    }
2112
2113    #[instrument]
2114    async fn sequence_end_transaction_inner(
2115        &mut self,
2116        ctx: &mut ExecuteContext,
2117        action: EndTransactionAction,
2118    ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2119        let txn = self.clear_transaction(ctx.session_mut()).await;
2120
2121        if let EndTransactionAction::Commit = action {
2122            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2123                match &mut ops {
2124                    TransactionOps::Writes(writes) => {
2125                        for WriteOp { id, .. } in &mut writes.iter() {
2126                            // Re-verify this id exists.
2127                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2128                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2129                                    kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2130                                })
2131                            })?;
2132                        }
2133
2134                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2135                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2136                    }
2137                    TransactionOps::DDL {
2138                        ops,
2139                        state: _,
2140                        side_effects,
2141                        revision,
2142                        snapshot: _,
2143                    } => {
2144                        // Make sure our catalog hasn't changed.
2145                        if *revision != self.catalog().transient_revision() {
2146                            return Err(AdapterError::DDLTransactionRace);
2147                        }
2148                        // Commit all of our queued ops.
2149                        let ops = std::mem::take(ops);
2150                        let side_effects = std::mem::take(side_effects);
2151                        self.catalog_transact_with_side_effects(
2152                            Some(ctx),
2153                            ops,
2154                            move |a, mut ctx| {
2155                                Box::pin(async move {
2156                                    for side_effect in side_effects {
2157                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2158                                    }
2159                                })
2160                            },
2161                        )
2162                        .await?;
2163                    }
2164                    _ => (),
2165                }
2166                return Ok((Some(ops), write_lock_guards));
2167            }
2168        }
2169
2170        Ok((None, None))
2171    }
2172
2173    pub(super) async fn sequence_side_effecting_func(
2174        &mut self,
2175        ctx: ExecuteContext,
2176        plan: SideEffectingFunc,
2177    ) {
2178        match plan {
2179            SideEffectingFunc::PgCancelBackend { connection_id } => {
2180                if ctx.session().conn_id().unhandled() == connection_id {
2181                    // As a special case, if we're canceling ourselves, we send
2182                    // back a canceled resposne to the client issuing the query,
2183                    // and so we need to do no further processing of the cancel.
2184                    ctx.retire(Err(AdapterError::Canceled));
2185                    return;
2186                }
2187
2188                let res = if let Some((id_handle, _conn_meta)) =
2189                    self.active_conns.get_key_value(&connection_id)
2190                {
2191                    // check_plan already verified role membership.
2192                    self.handle_privileged_cancel(id_handle.clone()).await;
2193                    Datum::True
2194                } else {
2195                    Datum::False
2196                };
2197                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2198            }
2199        }
2200    }
2201
2202    /// Execute a side-effecting function from the frontend peek path.
2203    /// This is separate from `sequence_side_effecting_func` because
2204    /// - It doesn't have an ExecuteContext.
2205    /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
2206    ///   peek sequencing can't look at `active_conns`.
2207    ///
2208    /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
2209    /// sequencing.
2210    pub(crate) async fn execute_side_effecting_func(
2211        &mut self,
2212        plan: SideEffectingFunc,
2213        conn_id: ConnectionId,
2214        current_role: RoleId,
2215    ) -> Result<ExecuteResponse, AdapterError> {
2216        match plan {
2217            SideEffectingFunc::PgCancelBackend { connection_id } => {
2218                if conn_id.unhandled() == connection_id {
2219                    // As a special case, if we're canceling ourselves, we return
2220                    // a canceled response to the client issuing the query,
2221                    // and so we need to do no further processing of the cancel.
2222                    return Err(AdapterError::Canceled);
2223                }
2224
2225                // Perform RBAC check: the current user must be a member of the role
2226                // that owns the connection being cancelled.
2227                if let Some((_id_handle, conn_meta)) =
2228                    self.active_conns.get_key_value(&connection_id)
2229                {
2230                    let target_role = *conn_meta.authenticated_role_id();
2231                    let role_membership = self
2232                        .catalog()
2233                        .state()
2234                        .collect_role_membership(&current_role);
2235                    if !role_membership.contains(&target_role) {
2236                        let target_role_name = self
2237                            .catalog()
2238                            .try_get_role(&target_role)
2239                            .map(|role| role.name().to_string())
2240                            .unwrap_or_else(|| target_role.to_string());
2241                        return Err(AdapterError::Unauthorized(
2242                            rbac::UnauthorizedError::RoleMembership {
2243                                role_names: vec![target_role_name],
2244                            },
2245                        ));
2246                    }
2247
2248                    // RBAC check passed, proceed with cancellation.
2249                    let id_handle = self
2250                        .active_conns
2251                        .get_key_value(&connection_id)
2252                        .map(|(id, _)| id.clone())
2253                        .expect("checked above");
2254                    self.handle_privileged_cancel(id_handle).await;
2255                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2256                } else {
2257                    // Connection not found, return false.
2258                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2259                }
2260            }
2261        }
2262    }
2263
2264    /// Inner method that performs the actual real-time recency timestamp determination.
2265    /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2266    /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2267    pub(crate) async fn determine_real_time_recent_timestamp(
2268        &self,
2269        source_ids: impl Iterator<Item = GlobalId>,
2270        real_time_recency_timeout: Duration,
2271    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2272        let item_ids = source_ids
2273            .map(|gid| {
2274                self.catalog
2275                    .try_resolve_item_id(&gid)
2276                    .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2277            })
2278            .collect::<Result<Vec<_>, _>>()?;
2279
2280        // Find all dependencies transitively because we need to ensure that
2281        // RTR queries determine the timestamp from the sources' (i.e.
2282        // storage objects that ingest data from external systems) remap
2283        // data. We "cheat" a little bit and filter out any IDs that aren't
2284        // user objects because we know they are not a RTR source.
2285        let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2286        // If none of the sources are user objects, we don't need to provide
2287        // a RTR timestamp.
2288        if to_visit.is_empty() {
2289            return Ok(None);
2290        }
2291
2292        let mut timestamp_objects = BTreeSet::new();
2293
2294        while let Some(id) = to_visit.pop_front() {
2295            timestamp_objects.insert(id);
2296            to_visit.extend(
2297                self.catalog()
2298                    .get_entry(&id)
2299                    .uses()
2300                    .into_iter()
2301                    .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2302            );
2303        }
2304        let timestamp_objects = timestamp_objects
2305            .into_iter()
2306            .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2307            .collect();
2308
2309        let r = self
2310            .controller
2311            .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2312            .await?;
2313
2314        Ok(Some(r))
2315    }
2316
2317    /// Checks to see if the session needs a real time recency timestamp and if so returns
2318    /// a future that will return the timestamp.
2319    pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2320        &self,
2321        session: &Session,
2322        source_ids: impl Iterator<Item = GlobalId>,
2323    ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2324        let vars = session.vars();
2325
2326        if vars.real_time_recency()
2327            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2328            && !session.contains_read_timestamp()
2329        {
2330            self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2331                .await
2332        } else {
2333            Ok(None)
2334        }
2335    }
2336
2337    #[instrument]
2338    pub(super) async fn sequence_explain_plan(
2339        &mut self,
2340        ctx: ExecuteContext,
2341        plan: plan::ExplainPlanPlan,
2342        target_cluster: TargetCluster,
2343    ) {
2344        match &plan.explainee {
2345            plan::Explainee::Statement(stmt) => match stmt {
2346                plan::ExplaineeStatement::CreateView { .. } => {
2347                    self.explain_create_view(ctx, plan).await;
2348                }
2349                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2350                    self.explain_create_materialized_view(ctx, plan).await;
2351                }
2352                plan::ExplaineeStatement::CreateIndex { .. } => {
2353                    self.explain_create_index(ctx, plan).await;
2354                }
2355                plan::ExplaineeStatement::Select { .. } => {
2356                    self.explain_peek(ctx, plan, target_cluster).await;
2357                }
2358                plan::ExplaineeStatement::Subscribe { .. } => {
2359                    self.explain_subscribe(ctx, plan, target_cluster).await;
2360                }
2361            },
2362            plan::Explainee::View(_) => {
2363                let result = self.explain_view(&ctx, plan);
2364                ctx.retire(result);
2365            }
2366            plan::Explainee::MaterializedView(_) => {
2367                let result = self.explain_materialized_view(&ctx, plan);
2368                ctx.retire(result);
2369            }
2370            plan::Explainee::Index(_) => {
2371                let result = self.explain_index(&ctx, plan);
2372                ctx.retire(result);
2373            }
2374            plan::Explainee::ReplanView(_) => {
2375                self.explain_replan_view(ctx, plan).await;
2376            }
2377            plan::Explainee::ReplanMaterializedView(_) => {
2378                self.explain_replan_materialized_view(ctx, plan).await;
2379            }
2380            plan::Explainee::ReplanIndex(_) => {
2381                self.explain_replan_index(ctx, plan).await;
2382            }
2383        };
2384    }
2385
2386    pub(super) async fn sequence_explain_pushdown(
2387        &mut self,
2388        ctx: ExecuteContext,
2389        plan: plan::ExplainPushdownPlan,
2390        target_cluster: TargetCluster,
2391    ) {
2392        match plan.explainee {
2393            Explainee::Statement(ExplaineeStatement::Select {
2394                broken: false,
2395                plan,
2396                desc: _,
2397            }) => {
2398                let stage = return_if_err!(
2399                    self.peek_validate(
2400                        ctx.session(),
2401                        plan,
2402                        target_cluster,
2403                        None,
2404                        ExplainContext::Pushdown,
2405                        Some(ctx.session().vars().max_query_result_size()),
2406                    ),
2407                    ctx
2408                );
2409                self.sequence_staged(ctx, Span::current(), stage).await;
2410            }
2411            Explainee::MaterializedView(item_id) => {
2412                self.explain_pushdown_materialized_view(ctx, item_id).await;
2413            }
2414            _ => {
2415                ctx.retire(Err(AdapterError::Unsupported(
2416                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2417                )));
2418            }
2419        };
2420    }
2421
2422    /// Executes an EXPLAIN FILTER PUSHDOWN, with read holds passed in.
2423    async fn execute_explain_pushdown_with_read_holds(
2424        &self,
2425        ctx: ExecuteContext,
2426        as_of: Antichain<Timestamp>,
2427        mz_now: ResultSpec<'static>,
2428        read_holds: Option<ReadHolds>,
2429        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2430    ) {
2431        let fut = self
2432            .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2433            .await;
2434        task::spawn(|| "render explain pushdown", async move {
2435            // Transfer the necessary read holds over to the background task
2436            let _read_holds = read_holds;
2437            let res = fut.await;
2438            ctx.retire(res);
2439        });
2440    }
2441
2442    /// Returns a future that will execute EXPLAIN FILTER PUSHDOWN.
2443    async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2444        &self,
2445        session: &Session,
2446        as_of: Antichain<Timestamp>,
2447        mz_now: ResultSpec<'static>,
2448        imports: I,
2449    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2450        // Get the needed Coordinator stuff and call the freestanding, shared helper.
2451        super::explain_pushdown_future_inner(
2452            session,
2453            &self.catalog,
2454            &self.controller.storage_collections,
2455            as_of,
2456            mz_now,
2457            imports,
2458        )
2459        .await
2460    }
2461
2462    #[instrument]
2463    pub(super) async fn sequence_insert(
2464        &mut self,
2465        mut ctx: ExecuteContext,
2466        plan: plan::InsertPlan,
2467    ) {
2468        // Normally, this would get checked when trying to add "write ops" to
2469        // the transaction but we go down diverging paths below, based on
2470        // whether the INSERT is only constant values or not.
2471        //
2472        // For the non-constant case we sequence an implicit read-then-write,
2473        // which messes with the transaction ops and would allow an implicit
2474        // read-then-write to sneak into a read-only transaction.
2475        if !ctx.session_mut().transaction().allows_writes() {
2476            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2477            return;
2478        }
2479
2480        // The structure of this code originates from a time where
2481        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2482        // optimized `MirRelationExpr`.
2483        //
2484        // Ideally, we would like to make the `selection.as_const().is_some()`
2485        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2486        // are planned as a Wrap($n, $vals) call, so until we can reduce
2487        // HirRelationExpr this will always returns false.
2488        //
2489        // Unfortunately, hitting the default path of the match below also
2490        // causes a lot of tests to fail, so we opted to go with the extra
2491        // `plan.values.clone()` statements when producing the `optimized_mir`
2492        // and re-optimize the values in the `sequence_read_then_write` call.
2493        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2494            // We don't perform any optimizations on an expression that is already
2495            // a constant for writes, as we want to maximize bulk-insert throughput.
2496            let expr = return_if_err!(
2497                plan.values
2498                    .clone()
2499                    .lower(self.catalog().system_config(), None),
2500                ctx
2501            );
2502            OptimizedMirRelationExpr(expr)
2503        } else {
2504            // Collect optimizer parameters.
2505            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2506
2507            // (`optimize::view::Optimizer` has a special case for constant queries.)
2508            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2509
2510            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2511            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2512        };
2513
2514        match optimized_mir.into_inner() {
2515            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2516                let catalog = self.owned_catalog();
2517                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2518                    let result =
2519                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2520                    ctx.retire(result);
2521                });
2522            }
2523            // All non-constant values must be planned as read-then-writes.
2524            _ => {
2525                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2526                    Some(table) => {
2527                        // Inserts always occur at the latest version of the table.
2528                        let desc = table.relation_desc_latest().expect("table has a desc");
2529                        desc.arity()
2530                    }
2531                    None => {
2532                        ctx.retire(Err(AdapterError::Catalog(
2533                            mz_catalog::memory::error::Error {
2534                                kind: ErrorKind::Sql(CatalogError::UnknownItem(
2535                                    plan.id.to_string(),
2536                                )),
2537                            },
2538                        )));
2539                        return;
2540                    }
2541                };
2542
2543                let finishing = RowSetFinishing {
2544                    order_by: vec![],
2545                    limit: None,
2546                    offset: 0,
2547                    project: (0..desc_arity).collect(),
2548                };
2549
2550                let read_then_write_plan = plan::ReadThenWritePlan {
2551                    id: plan.id,
2552                    selection: plan.values,
2553                    finishing,
2554                    assignments: BTreeMap::new(),
2555                    kind: MutationKind::Insert,
2556                    returning: plan.returning,
2557                };
2558
2559                self.sequence_read_then_write(ctx, read_then_write_plan)
2560                    .await;
2561            }
2562        }
2563    }
2564
2565    /// ReadThenWrite is a plan whose writes depend on the results of a
2566    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2567    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2568    /// serializability violation could occur.
2569    #[instrument]
2570    pub(super) async fn sequence_read_then_write(
2571        &mut self,
2572        mut ctx: ExecuteContext,
2573        plan: plan::ReadThenWritePlan,
2574    ) {
2575        let mut source_ids: BTreeSet<_> = plan
2576            .selection
2577            .depends_on()
2578            .into_iter()
2579            .map(|gid| self.catalog().resolve_item_id(&gid))
2580            .collect();
2581        source_ids.insert(plan.id);
2582
2583        // If the transaction doesn't already have write locks, acquire them.
2584        if ctx.session().transaction().write_locks().is_none() {
2585            // Pre-define all of the locks we need.
2586            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2587
2588            // Try acquiring all of our locks.
2589            for id in &source_ids {
2590                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2591                    write_locks.insert_lock(*id, lock);
2592                }
2593            }
2594
2595            // See if we acquired all of the neccessary locks.
2596            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2597                Ok(locks) => locks,
2598                Err(missing) => {
2599                    // Defer our write if we couldn't acquire all of the locks.
2600                    let role_metadata = ctx.session().role_metadata().clone();
2601                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2602                    let plan = DeferredPlan {
2603                        ctx,
2604                        plan: Plan::ReadThenWrite(plan),
2605                        validity: PlanValidity::new(
2606                            self.catalog.transient_revision(),
2607                            source_ids.clone(),
2608                            None,
2609                            None,
2610                            role_metadata,
2611                        ),
2612                        requires_locks: source_ids,
2613                    };
2614                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2615                }
2616            };
2617
2618            ctx.session_mut()
2619                .try_grant_write_locks(write_locks)
2620                .expect("session has already been granted write locks");
2621        }
2622
2623        let plan::ReadThenWritePlan {
2624            id,
2625            kind,
2626            selection,
2627            mut assignments,
2628            finishing,
2629            mut returning,
2630        } = plan;
2631
2632        // Read then writes can be queued, so re-verify the id exists.
2633        let desc = match self.catalog().try_get_entry(&id) {
2634            Some(table) => {
2635                // Inserts always occur at the latest version of the table.
2636                table
2637                    .relation_desc_latest()
2638                    .expect("table has a desc")
2639                    .into_owned()
2640            }
2641            None => {
2642                ctx.retire(Err(AdapterError::Catalog(
2643                    mz_catalog::memory::error::Error {
2644                        kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2645                    },
2646                )));
2647                return;
2648            }
2649        };
2650
2651        // Disallow mz_now in any position because read time and write time differ.
2652        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2653            || assignments.values().any(|e| e.contains_temporal())
2654            || returning.iter().any(|e| e.contains_temporal());
2655        if contains_temporal {
2656            ctx.retire(Err(AdapterError::Unsupported(
2657                "calls to mz_now in write statements",
2658            )));
2659            return;
2660        }
2661
2662        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations.
2663        for gid in selection.depends_on() {
2664            let item_id = self.catalog().resolve_item_id(&gid);
2665            if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2666                ctx.retire(Err(err));
2667                return;
2668            }
2669        }
2670
2671        let (peek_tx, peek_rx) = oneshot::channel();
2672        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2673        let (tx, _, session, extra) = ctx.into_parts();
2674        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
2675        // execution context, because this peek does not directly correspond to an execute,
2676        // and so we don't need to take any action on its retirement.
2677        // TODO[btv]: we might consider extending statement logging to log the inner
2678        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
2679        // and mint a new "real" execution context here. We'd also have to add some logic to
2680        // make sure such "sub-statements" are always sampled when the top-level statement is
2681        //
2682        // It's debatable whether this makes sense conceptually,
2683        // because the inner fragment here is not actually a
2684        // "statement" in its own right.
2685        let peek_ctx = ExecuteContext::from_parts(
2686            peek_client_tx,
2687            self.internal_cmd_tx.clone(),
2688            session,
2689            Default::default(),
2690        );
2691
2692        self.sequence_peek(
2693            peek_ctx,
2694            plan::SelectPlan {
2695                select: None,
2696                source: selection,
2697                when: QueryWhen::FreshestTableWrite,
2698                finishing,
2699                copy_to: None,
2700            },
2701            TargetCluster::Active,
2702            None,
2703        )
2704        .await;
2705
2706        let internal_cmd_tx = self.internal_cmd_tx.clone();
2707        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2708        let catalog = self.owned_catalog();
2709        let max_result_size = self.catalog().system_config().max_result_size();
2710
2711        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2712            let (peek_response, session) = match peek_rx.await {
2713                Ok(Response {
2714                    result: Ok(resp),
2715                    session,
2716                    otel_ctx,
2717                }) => {
2718                    otel_ctx.attach_as_parent();
2719                    (resp, session)
2720                }
2721                Ok(Response {
2722                    result: Err(e),
2723                    session,
2724                    otel_ctx,
2725                }) => {
2726                    let ctx =
2727                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2728                    otel_ctx.attach_as_parent();
2729                    ctx.retire(Err(e));
2730                    return;
2731                }
2732                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
2733                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2734            };
2735            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2736            let mut timeout_dur = *ctx.session().vars().statement_timeout();
2737
2738            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
2739            if timeout_dur == Duration::ZERO {
2740                timeout_dur = Duration::MAX;
2741            }
2742
2743            let style = ExprPrepOneShot {
2744                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
2745                session: ctx.session(),
2746                catalog_state: catalog.state(),
2747            };
2748            for expr in assignments.values_mut().chain(returning.iter_mut()) {
2749                return_if_err!(style.prep_scalar_expr(expr), ctx);
2750            }
2751
2752            let make_diffs = move |mut rows: Box<dyn RowIterator>|
2753                  -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2754                    let arena = RowArena::new();
2755                    let mut diffs = Vec::new();
2756                    let mut datum_vec = mz_repr::DatumVec::new();
2757
2758                    while let Some(row) = rows.next() {
2759                        if !assignments.is_empty() {
2760                            assert!(
2761                                matches!(kind, MutationKind::Update),
2762                                "only updates support assignments"
2763                            );
2764                            let mut datums = datum_vec.borrow_with(row);
2765                            let mut updates = vec![];
2766                            for (idx, expr) in &assignments {
2767                                let updated = match expr.eval(&datums, &arena) {
2768                                    Ok(updated) => updated,
2769                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2770                                };
2771                                updates.push((*idx, updated));
2772                            }
2773                            for (idx, new_value) in updates {
2774                                datums[idx] = new_value;
2775                            }
2776                            let updated = Row::pack_slice(&datums);
2777                            diffs.push((updated, Diff::ONE));
2778                        }
2779                        match kind {
2780                            // Updates and deletes always remove the
2781                            // current row. Updates will also add an
2782                            // updated value.
2783                            MutationKind::Update | MutationKind::Delete => {
2784                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
2785                            }
2786                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2787                        }
2788                    }
2789
2790                    // Sum of all the rows' byte size, for checking if we go
2791                    // above the max_result_size threshold.
2792                    let mut byte_size: u64 = 0;
2793                    for (row, diff) in &diffs {
2794                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2795                        if diff.is_positive() {
2796                            for (idx, datum) in row.iter().enumerate() {
2797                                desc.constraints_met(idx, &datum)?;
2798                            }
2799                        }
2800                    }
2801                    Ok((diffs, byte_size))
2802                };
2803
2804            let diffs = match peek_response {
2805                ExecuteResponse::SendingRowsStreaming {
2806                    rows: mut rows_stream,
2807                    ..
2808                } => {
2809                    let mut byte_size: u64 = 0;
2810                    let mut diffs = Vec::new();
2811                    let result = loop {
2812                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2813                            Ok(Some(res)) => match res {
2814                                PeekResponseUnary::Rows(new_rows) => {
2815                                    match make_diffs(new_rows) {
2816                                        Ok((mut new_diffs, new_byte_size)) => {
2817                                            byte_size = byte_size.saturating_add(new_byte_size);
2818                                            if byte_size > max_result_size {
2819                                                break Err(AdapterError::ResultSize(format!(
2820                                                    "result exceeds max size of {max_result_size}"
2821                                                )));
2822                                            }
2823                                            diffs.append(&mut new_diffs)
2824                                        }
2825                                        Err(e) => break Err(e),
2826                                    };
2827                                }
2828                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2829                                PeekResponseUnary::Error(e) => {
2830                                    break Err(AdapterError::Unstructured(anyhow!(e)));
2831                                }
2832                                PeekResponseUnary::DependencyDropped(dep) => {
2833                                    break Err(dep.to_concurrent_dependency_drop());
2834                                }
2835                            },
2836                            Ok(None) => break Ok(diffs),
2837                            Err(_) => {
2838                                // We timed out, so remove the pending peek. This is
2839                                // best-effort and doesn't guarantee we won't
2840                                // receive a response.
2841                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
2842                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2843                                    conn_id: ctx.session().conn_id().clone(),
2844                                });
2845                                if let Err(e) = result {
2846                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2847                                }
2848                                break Err(AdapterError::StatementTimeout);
2849                            }
2850                        }
2851                    };
2852
2853                    result
2854                }
2855                ExecuteResponse::SendingRowsImmediate { rows } => {
2856                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2857                }
2858                resp => Err(AdapterError::Unstructured(anyhow!(
2859                    "unexpected peek response: {resp:?}"
2860                ))),
2861            };
2862
2863            let mut returning_rows = Vec::new();
2864            let mut diff_err: Option<AdapterError> = None;
2865            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2866                let arena = RowArena::new();
2867                for (row, diff) in diffs {
2868                    if !diff.is_positive() {
2869                        continue;
2870                    }
2871                    let mut returning_row = Row::with_capacity(returning.len());
2872                    let mut packer = returning_row.packer();
2873                    for expr in &returning {
2874                        let datums: Vec<_> = row.iter().collect();
2875                        match expr.eval(&datums, &arena) {
2876                            Ok(datum) => {
2877                                packer.push(datum);
2878                            }
2879                            Err(err) => {
2880                                diff_err = Some(err.into());
2881                                break;
2882                            }
2883                        }
2884                    }
2885                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2886                    let diff = match NonZeroUsize::try_from(diff) {
2887                        Ok(diff) => diff,
2888                        Err(err) => {
2889                            diff_err = Some(err.into());
2890                            break;
2891                        }
2892                    };
2893                    returning_rows.push((returning_row, diff));
2894                    if diff_err.is_some() {
2895                        break;
2896                    }
2897                }
2898            }
2899            let diffs = if let Some(err) = diff_err {
2900                Err(err)
2901            } else {
2902                diffs
2903            };
2904
2905            // We need to clear out the timestamp context so the write doesn't fail due to a
2906            // read only transaction.
2907            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2908            // No matter what isolation level the client is using, we must linearize this
2909            // read. The write will be performed right after this, as part of a single
2910            // transaction, so the write must have a timestamp greater than or equal to the
2911            // read.
2912            //
2913            // Note: It's only OK for the write to have a greater timestamp than the read
2914            // because the write lock prevents any other writes from happening in between
2915            // the read and write.
2916            if let Some(timestamp_context) = timestamp_context {
2917                let (tx, rx) = tokio::sync::oneshot::channel();
2918                let conn_id = ctx.session().conn_id().clone();
2919                let pending_read_txn = PendingReadTxn {
2920                    txn: PendingRead::ReadThenWrite { ctx, tx },
2921                    timestamp_context,
2922                    created: Instant::now(),
2923                    num_requeues: 0,
2924                    otel_ctx: OpenTelemetryContext::obtain(),
2925                };
2926                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2927                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
2928                if let Err(e) = result {
2929                    warn!(
2930                        "strict_serializable_reads_tx dropped before we could send: {:?}",
2931                        e
2932                    );
2933                    return;
2934                }
2935                let result = rx.await;
2936                // It is not an error for these results to be ready after `tx` has been dropped.
2937                ctx = match result {
2938                    Ok(Some(ctx)) => ctx,
2939                    Ok(None) => {
2940                        // Coordinator took our context and will handle responding to the client.
2941                        // This usually indicates that our transaction was aborted.
2942                        return;
2943                    }
2944                    Err(e) => {
2945                        warn!(
2946                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2947                            e
2948                        );
2949                        return;
2950                    }
2951                };
2952            }
2953
2954            match diffs {
2955                Ok(diffs) => {
2956                    let result = Self::send_diffs(
2957                        ctx.session_mut(),
2958                        plan::SendDiffsPlan {
2959                            id,
2960                            updates: diffs,
2961                            kind,
2962                            returning: returning_rows,
2963                            max_result_size,
2964                        },
2965                    );
2966                    ctx.retire(result);
2967                }
2968                Err(e) => {
2969                    ctx.retire(Err(e));
2970                }
2971            }
2972        });
2973    }
2974
2975    #[instrument]
2976    pub(super) async fn sequence_alter_item_rename(
2977        &mut self,
2978        ctx: &mut ExecuteContext,
2979        plan: plan::AlterItemRenamePlan,
2980    ) -> Result<ExecuteResponse, AdapterError> {
2981        let op = catalog::Op::RenameItem {
2982            id: plan.id,
2983            current_full_name: plan.current_full_name,
2984            to_name: plan.to_name,
2985        };
2986        match self
2987            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
2988            .await
2989        {
2990            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
2991            Err(err) => Err(err),
2992        }
2993    }
2994
2995    #[instrument]
2996    pub(super) async fn sequence_alter_retain_history(
2997        &mut self,
2998        ctx: &mut ExecuteContext,
2999        plan: plan::AlterRetainHistoryPlan,
3000    ) -> Result<ExecuteResponse, AdapterError> {
3001        let ops = vec![catalog::Op::AlterRetainHistory {
3002            id: plan.id,
3003            value: plan.value,
3004            window: plan.window,
3005        }];
3006        self.catalog_transact_with_context(None, Some(ctx), ops)
3007            .await?;
3008        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3009    }
3010
3011    #[instrument]
3012    pub(super) async fn sequence_alter_source_timestamp_interval(
3013        &mut self,
3014        ctx: &mut ExecuteContext,
3015        plan: plan::AlterSourceTimestampIntervalPlan,
3016    ) -> Result<ExecuteResponse, AdapterError> {
3017        let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3018            id: plan.id,
3019            value: plan.value,
3020            interval: plan.interval,
3021        }];
3022        self.catalog_transact_with_context(None, Some(ctx), ops)
3023            .await?;
3024        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3025    }
3026
3027    #[instrument]
3028    pub(super) async fn sequence_alter_schema_rename(
3029        &mut self,
3030        ctx: &mut ExecuteContext,
3031        plan: plan::AlterSchemaRenamePlan,
3032    ) -> Result<ExecuteResponse, AdapterError> {
3033        let (database_spec, schema_spec) = plan.cur_schema_spec;
3034        let op = catalog::Op::RenameSchema {
3035            database_spec,
3036            schema_spec,
3037            new_name: plan.new_schema_name,
3038            check_reserved_names: true,
3039        };
3040        match self
3041            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3042            .await
3043        {
3044            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3045            Err(err) => Err(err),
3046        }
3047    }
3048
3049    #[instrument]
3050    pub(super) async fn sequence_alter_schema_swap(
3051        &mut self,
3052        ctx: &mut ExecuteContext,
3053        plan: plan::AlterSchemaSwapPlan,
3054    ) -> Result<ExecuteResponse, AdapterError> {
3055        let plan::AlterSchemaSwapPlan {
3056            schema_a_spec: (schema_a_db, schema_a),
3057            schema_a_name,
3058            schema_b_spec: (schema_b_db, schema_b),
3059            schema_b_name,
3060            name_temp,
3061        } = plan;
3062
3063        let op_a = catalog::Op::RenameSchema {
3064            database_spec: schema_a_db,
3065            schema_spec: schema_a,
3066            new_name: name_temp,
3067            check_reserved_names: false,
3068        };
3069        let op_b = catalog::Op::RenameSchema {
3070            database_spec: schema_b_db,
3071            schema_spec: schema_b,
3072            new_name: schema_a_name,
3073            check_reserved_names: false,
3074        };
3075        let op_c = catalog::Op::RenameSchema {
3076            database_spec: schema_a_db,
3077            schema_spec: schema_a,
3078            new_name: schema_b_name,
3079            check_reserved_names: false,
3080        };
3081
3082        match self
3083            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3084                Box::pin(async {})
3085            })
3086            .await
3087        {
3088            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3089            Err(err) => Err(err),
3090        }
3091    }
3092
3093    #[instrument]
3094    pub(super) async fn sequence_alter_role(
3095        &mut self,
3096        session: &Session,
3097        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3098    ) -> Result<ExecuteResponse, AdapterError> {
3099        let catalog = self.catalog().for_session(session);
3100        let role = catalog.get_role(&id);
3101
3102        // We'll send these notices to the user, if the operation is successful.
3103        let mut notices = vec![];
3104
3105        // Get the attributes and variables from the role, as they currently are.
3106        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3107        let mut vars = role.vars().clone();
3108
3109        // Whether to set the password to NULL. This is a special case since the existing
3110        // password is not stored in the role attributes.
3111        let mut nopassword = false;
3112
3113        // Apply our updates.
3114        match option {
3115            PlannedAlterRoleOption::Attributes(attrs) => {
3116                self.validate_role_attributes(&attrs.clone().into())?;
3117
3118                if let Some(inherit) = attrs.inherit {
3119                    attributes.inherit = inherit;
3120                }
3121
3122                if let Some(password) = attrs.password {
3123                    attributes.password = Some(password);
3124                    attributes.scram_iterations =
3125                        Some(self.catalog().system_config().scram_iterations())
3126                }
3127
3128                if let Some(superuser) = attrs.superuser {
3129                    attributes.superuser = Some(superuser);
3130                }
3131
3132                if let Some(login) = attrs.login {
3133                    attributes.login = Some(login);
3134                }
3135
3136                if attrs.nopassword.unwrap_or(false) {
3137                    nopassword = true;
3138                }
3139
3140                if let Some(notice) = self.should_emit_rbac_notice(session) {
3141                    notices.push(notice);
3142                }
3143            }
3144            PlannedAlterRoleOption::Variable(variable) => {
3145                // Get the variable to make sure it's valid and visible.
3146                let session_var = session.vars().inspect(variable.name())?;
3147                // Return early if it's not visible.
3148                session_var.visible(session.user(), catalog.system_vars())?;
3149
3150                // Emit a warning when deprecated variables are used.
3151                // TODO(database-issues#8069) remove this after sufficient time has passed
3152                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3153                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3154                } else if let PlannedRoleVariable::Set {
3155                    name,
3156                    value: VariableValue::Values(vals),
3157                } = &variable
3158                {
3159                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3160                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3161                    }
3162                }
3163
3164                let var_name = match variable {
3165                    PlannedRoleVariable::Set { name, value } => {
3166                        // Update our persisted set.
3167                        match &value {
3168                            VariableValue::Default => {
3169                                vars.remove(&name);
3170                            }
3171                            VariableValue::Values(vals) => {
3172                                let var = match &vals[..] {
3173                                    [val] => OwnedVarInput::Flat(val.clone()),
3174                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3175                                };
3176                                // Make sure the input is valid.
3177                                session_var.check(var.borrow())?;
3178
3179                                vars.insert(name.clone(), var);
3180                            }
3181                        };
3182                        name
3183                    }
3184                    PlannedRoleVariable::Reset { name } => {
3185                        // Remove it from our persisted values.
3186                        vars.remove(&name);
3187                        name
3188                    }
3189                };
3190
3191                // Emit a notice that they need to reconnect to see the change take effect.
3192                notices.push(AdapterNotice::VarDefaultUpdated {
3193                    role: Some(name.clone()),
3194                    var_name: Some(var_name),
3195                });
3196            }
3197        }
3198
3199        let op = catalog::Op::AlterRole {
3200            id,
3201            name,
3202            attributes,
3203            nopassword,
3204            vars: RoleVars { map: vars },
3205        };
3206        let response = self
3207            .catalog_transact(Some(session), vec![op])
3208            .await
3209            .map(|_| ExecuteResponse::AlteredRole)?;
3210
3211        // Send all of our queued notices.
3212        session.add_notices(notices);
3213
3214        Ok(response)
3215    }
3216
3217    #[instrument]
3218    pub(super) async fn sequence_alter_sink_prepare(
3219        &mut self,
3220        ctx: ExecuteContext,
3221        plan: plan::AlterSinkPlan,
3222    ) {
3223        // Put a read hold on the new relation
3224        let id_bundle = crate::CollectionIdBundle {
3225            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3226            compute_ids: BTreeMap::new(),
3227        };
3228        let read_hold = self.acquire_read_holds(&id_bundle);
3229
3230        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3231            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3232            return;
3233        };
3234
3235        let otel_ctx = OpenTelemetryContext::obtain();
3236        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3237
3238        let plan_validity = PlanValidity::new(
3239            self.catalog().transient_revision(),
3240            BTreeSet::from_iter([plan.item_id, from_item_id]),
3241            Some(plan.in_cluster),
3242            None,
3243            ctx.session().role_metadata().clone(),
3244        );
3245
3246        info!(
3247            "preparing alter sink for {}: frontiers={:?} export={:?}",
3248            plan.global_id,
3249            self.controller
3250                .storage_collections
3251                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3252            self.controller.storage.export(plan.global_id)
3253        );
3254
3255        // Now we must wait for the sink to make enough progress such that there is overlap between
3256        // the new `from` collection's read hold and the sink's write frontier.
3257        //
3258        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3259        // the watch set never completes and neither does the `ALTER SINK` command.
3260        self.install_storage_watch_set(
3261            ctx.session().conn_id().clone(),
3262            BTreeSet::from_iter([plan.global_id]),
3263            read_ts,
3264            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3265                ctx: Some(ctx),
3266                otel_ctx,
3267                plan,
3268                plan_validity,
3269                read_hold,
3270            }),
3271        ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3272    }
3273
3274    #[instrument]
3275    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3276        ctx.otel_ctx.attach_as_parent();
3277
3278        let plan::AlterSinkPlan {
3279            item_id,
3280            global_id,
3281            sink: sink_plan,
3282            with_snapshot,
3283            in_cluster,
3284        } = ctx.plan.clone();
3285
3286        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3287        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3288        // arbitrarily changed since we performed planning, and we must re-assert that it still
3289        // matches our requirements.
3290        //
3291        // The `PlanValidity` check ensures that both the sink and the new source relation still
3292        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3293        // time, which we do by comparing the catalog sink version to the one in the plan.
3294        match ctx.plan_validity.check(self.catalog()) {
3295            Ok(()) => {}
3296            Err(err) => {
3297                ctx.retire(Err(err));
3298                return;
3299            }
3300        }
3301
3302        let entry = self.catalog().get_entry(&item_id);
3303        let CatalogItem::Sink(old_sink) = entry.item() else {
3304            panic!("invalid item kind for `AlterSinkPlan`");
3305        };
3306
3307        if sink_plan.version != old_sink.version + 1 {
3308            ctx.retire(Err(AdapterError::ChangedPlan(
3309                "sink was altered concurrently".into(),
3310            )));
3311            return;
3312        }
3313
3314        info!(
3315            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3316            self.controller
3317                .storage_collections
3318                .collections_frontiers(vec![global_id, sink_plan.from]),
3319            self.controller.storage.export(global_id),
3320        );
3321
3322        // Assert that we can recover the updates that happened at the timestamps of the write
3323        // frontier. This must be true in this call.
3324        let write_frontier = &self
3325            .controller
3326            .storage
3327            .export(global_id)
3328            .expect("sink known to exist")
3329            .write_frontier;
3330        let as_of = ctx.read_hold.least_valid_read();
3331        assert!(
3332            write_frontier.iter().all(|t| as_of.less_than(t)),
3333            "{:?} should be strictly less than {:?}",
3334            &*as_of,
3335            &**write_frontier
3336        );
3337
3338        // Parse the `create_sql` so we can update it to the new sink definition.
3339        //
3340        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3341        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3342        // names in the `create_sql` may have changed, for example due to a schema swap.
3343        let create_sql = &old_sink.create_sql;
3344        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3345        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3346            unreachable!("invalid statement kind for sink");
3347        };
3348
3349        // Update the sink version.
3350        stmt.with_options
3351            .retain(|o| o.name != CreateSinkOptionName::Version);
3352        stmt.with_options.push(CreateSinkOption {
3353            name: CreateSinkOptionName::Version,
3354            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3355                sink_plan.version.to_string(),
3356            ))),
3357        });
3358
3359        let conn_catalog = self.catalog().for_system_session();
3360        let (mut stmt, resolved_ids) =
3361            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3362
3363        // Update the `from` relation.
3364        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3365        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3366        stmt.from = ResolvedItemName::Item {
3367            id: from_entry.id(),
3368            qualifiers: from_entry.name.qualifiers.clone(),
3369            full_name,
3370            print_id: true,
3371            version: from_entry.version,
3372        };
3373
3374        let new_sink = Sink {
3375            create_sql: stmt.to_ast_string_stable(),
3376            global_id,
3377            from: sink_plan.from,
3378            connection: sink_plan.connection.clone(),
3379            envelope: sink_plan.envelope,
3380            version: sink_plan.version,
3381            with_snapshot,
3382            resolved_ids: resolved_ids.clone(),
3383            cluster_id: in_cluster,
3384            commit_interval: sink_plan.commit_interval,
3385        };
3386
3387        let ops = vec![catalog::Op::UpdateItem {
3388            id: item_id,
3389            name: entry.name().clone(),
3390            to_item: CatalogItem::Sink(new_sink),
3391        }];
3392
3393        match self
3394            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3395            .await
3396        {
3397            Ok(()) => {}
3398            Err(err) => {
3399                ctx.retire(Err(err));
3400                return;
3401            }
3402        }
3403
3404        let storage_sink_desc = StorageSinkDesc {
3405            from: sink_plan.from,
3406            from_desc: from_entry
3407                .relation_desc()
3408                .expect("sinks can only be built on items with descs")
3409                .into_owned(),
3410            connection: sink_plan
3411                .connection
3412                .clone()
3413                .into_inline_connection(self.catalog().state()),
3414            envelope: sink_plan.envelope,
3415            as_of,
3416            with_snapshot,
3417            version: sink_plan.version,
3418            from_storage_metadata: (),
3419            to_storage_metadata: (),
3420            commit_interval: sink_plan.commit_interval,
3421        };
3422
3423        self.controller
3424            .storage
3425            .alter_export(
3426                global_id,
3427                ExportDescription {
3428                    sink: storage_sink_desc,
3429                    instance_id: in_cluster,
3430                },
3431            )
3432            .await
3433            .unwrap_or_terminate("cannot fail to alter source desc");
3434
3435        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3436    }
3437
3438    #[instrument]
3439    pub(super) async fn sequence_alter_connection(
3440        &mut self,
3441        ctx: ExecuteContext,
3442        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3443    ) {
3444        match action {
3445            AlterConnectionAction::RotateKeys => {
3446                self.sequence_rotate_keys(ctx, id).await;
3447            }
3448            AlterConnectionAction::AlterOptions {
3449                set_options,
3450                drop_options,
3451                validate,
3452            } => {
3453                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3454                    .await
3455            }
3456        }
3457    }
3458
3459    #[instrument]
3460    async fn sequence_alter_connection_options(
3461        &mut self,
3462        mut ctx: ExecuteContext,
3463        id: CatalogItemId,
3464        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3465        drop_options: BTreeSet<ConnectionOptionName>,
3466        validate: bool,
3467    ) {
3468        let cur_entry = self.catalog().get_entry(&id);
3469        let cur_conn = cur_entry.connection().expect("known to be connection");
3470        let connection_gid = cur_conn.global_id();
3471
3472        let inner = || -> Result<Connection, AdapterError> {
3473            // Parse statement.
3474            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3475                .expect("invalid create sql persisted to catalog")
3476                .into_element()
3477                .ast
3478            {
3479                Statement::CreateConnection(stmt) => stmt,
3480                _ => unreachable!("proved type is source"),
3481            };
3482
3483            let catalog = self.catalog().for_system_session();
3484
3485            // Resolve items in statement
3486            let (mut create_conn_stmt, resolved_ids) =
3487                mz_sql::names::resolve(&catalog, create_conn_stmt)
3488                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3489
3490            // Retain options that are neither set nor dropped.
3491            create_conn_stmt
3492                .values
3493                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3494
3495            // Set new values
3496            create_conn_stmt.values.extend(
3497                set_options
3498                    .into_iter()
3499                    .map(|(name, value)| ConnectionOption { name, value }),
3500            );
3501
3502            // Open a new catalog, which we will use to re-plan our
3503            // statement with the desired config.
3504            let mut catalog = self.catalog().for_system_session();
3505            catalog.mark_id_unresolvable_for_replanning(id);
3506
3507            // Re-define our source in terms of the amended statement
3508            let plan = match mz_sql::plan::plan(
3509                None,
3510                &catalog,
3511                Statement::CreateConnection(create_conn_stmt),
3512                &Params::empty(),
3513                &resolved_ids,
3514            )
3515            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3516            {
3517                Plan::CreateConnection(plan) => plan,
3518                _ => unreachable!("create source plan is only valid response"),
3519            };
3520
3521            // Parse statement.
3522            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3523                .expect("invalid create sql persisted to catalog")
3524                .into_element()
3525                .ast
3526            {
3527                Statement::CreateConnection(stmt) => stmt,
3528                _ => unreachable!("proved type is source"),
3529            };
3530
3531            let catalog = self.catalog().for_system_session();
3532
3533            // Resolve items in statement
3534            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3535                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3536
3537            Ok(Connection {
3538                create_sql: plan.connection.create_sql,
3539                global_id: cur_conn.global_id,
3540                details: plan.connection.details,
3541                resolved_ids: new_deps,
3542            })
3543        };
3544
3545        let conn = match inner() {
3546            Ok(conn) => conn,
3547            Err(e) => {
3548                return ctx.retire(Err(e));
3549            }
3550        };
3551
3552        if validate {
3553            let connection = conn
3554                .details
3555                .to_connection()
3556                .into_inline_connection(self.catalog().state());
3557
3558            let internal_cmd_tx = self.internal_cmd_tx.clone();
3559            let transient_revision = self.catalog().transient_revision();
3560            let conn_id = ctx.session().conn_id().clone();
3561            let otel_ctx = OpenTelemetryContext::obtain();
3562            let role_metadata = ctx.session().role_metadata().clone();
3563            let current_storage_parameters = self.controller.storage.config().clone();
3564
3565            task::spawn(
3566                || format!("validate_alter_connection:{conn_id}"),
3567                async move {
3568                    let resolved_ids = conn.resolved_ids.clone();
3569                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3570                    let result = match std::panic::AssertUnwindSafe(
3571                        connection.validate(id, &current_storage_parameters),
3572                    )
3573                    .ore_catch_unwind()
3574                    .await
3575                    {
3576                        Ok(Ok(())) => Ok(conn),
3577                        Ok(Err(err)) => Err(err.into()),
3578                        Err(_panic) => {
3579                            tracing::error!("alter connection validation panicked");
3580                            Err(AdapterError::Internal(
3581                                "connection validation panicked".into(),
3582                            ))
3583                        }
3584                    };
3585
3586                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3587                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3588                        AlterConnectionValidationReady {
3589                            ctx,
3590                            result,
3591                            connection_id: id,
3592                            connection_gid,
3593                            plan_validity: PlanValidity::new(
3594                                transient_revision,
3595                                dependency_ids.clone(),
3596                                None,
3597                                None,
3598                                role_metadata,
3599                            ),
3600                            otel_ctx,
3601                            resolved_ids,
3602                        },
3603                    ));
3604                    if let Err(e) = result {
3605                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3606                    }
3607                },
3608            );
3609        } else {
3610            let result = self
3611                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3612                .await;
3613            ctx.retire(result);
3614        }
3615    }
3616
3617    #[instrument]
3618    pub(crate) async fn sequence_alter_connection_stage_finish(
3619        &mut self,
3620        session: &Session,
3621        id: CatalogItemId,
3622        connection: Connection,
3623    ) -> Result<ExecuteResponse, AdapterError> {
3624        match self.catalog.get_entry(&id).item() {
3625            CatalogItem::Connection(curr_conn) => {
3626                curr_conn
3627                    .details
3628                    .to_connection()
3629                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3630                    .map_err(StorageError::from)?;
3631            }
3632            _ => unreachable!("known to be a connection"),
3633        };
3634
3635        let ops = vec![catalog::Op::UpdateItem {
3636            id,
3637            name: self.catalog.get_entry(&id).name().clone(),
3638            to_item: CatalogItem::Connection(connection.clone()),
3639        }];
3640
3641        self.catalog_transact(Some(session), ops).await?;
3642
3643        // NOTE: The rest of the alter connection logic (updating VPC endpoints
3644        // and propagating connection changes to dependent sources, sinks, and
3645        // tables) is handled in `apply_catalog_implications` via
3646        // `handle_alter_connection`. The catalog transact above triggers that
3647        // code path.
3648
3649        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3650    }
3651
3652    #[instrument]
3653    pub(super) async fn sequence_alter_source(
3654        &mut self,
3655        session: &Session,
3656        plan::AlterSourcePlan {
3657            item_id,
3658            ingestion_id,
3659            action,
3660        }: plan::AlterSourcePlan,
3661    ) -> Result<ExecuteResponse, AdapterError> {
3662        let cur_entry = self.catalog().get_entry(&item_id);
3663        let cur_source = cur_entry.source().expect("known to be source");
3664
3665        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3666            // Parse statement.
3667            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3668                .expect("invalid create sql persisted to catalog")
3669                .into_element()
3670                .ast
3671            {
3672                Statement::CreateSource(stmt) => stmt,
3673                _ => unreachable!("proved type is source"),
3674            };
3675
3676            let catalog = coord.catalog().for_system_session();
3677
3678            // Resolve items in statement
3679            mz_sql::names::resolve(&catalog, create_source_stmt)
3680                .map_err(|e| AdapterError::internal(err_cx, e))
3681        };
3682
3683        match action {
3684            plan::AlterSourceAction::AddSubsourceExports {
3685                subsources,
3686                options,
3687            } => {
3688                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3689
3690                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3691                    text_columns: mut new_text_columns,
3692                    exclude_columns: mut new_exclude_columns,
3693                    ..
3694                } = options.try_into()?;
3695
3696                // Resolve items in statement
3697                let (mut create_source_stmt, resolved_ids) =
3698                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3699
3700                // Get all currently referred-to items
3701                let catalog = self.catalog();
3702                let curr_references: BTreeSet<_> = catalog
3703                    .get_entry(&item_id)
3704                    .used_by()
3705                    .into_iter()
3706                    .filter_map(|subsource| {
3707                        catalog
3708                            .get_entry(subsource)
3709                            .subsource_details()
3710                            .map(|(_id, reference, _details)| reference)
3711                    })
3712                    .collect();
3713
3714                // We are doing a lot of unwrapping, so just make an error to reference; all of
3715                // these invariants are guaranteed to be true because of how we plan subsources.
3716                let purification_err =
3717                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3718
3719                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
3720                // we remove support for implicitly created subsources from a `CREATE SOURCE`
3721                // statement.
3722                match &mut create_source_stmt.connection {
3723                    CreateSourceConnection::Postgres {
3724                        options: curr_options,
3725                        ..
3726                    } => {
3727                        let mz_sql::plan::PgConfigOptionExtracted {
3728                            mut text_columns, ..
3729                        } = curr_options.clone().try_into()?;
3730
3731                        // Drop text columns; we will add them back in
3732                        // as appropriate below.
3733                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3734
3735                        // Drop all text columns that are not currently referred to.
3736                        text_columns.retain(|column_qualified_reference| {
3737                            mz_ore::soft_assert_eq_or_log!(
3738                                column_qualified_reference.0.len(),
3739                                4,
3740                                "all TEXT COLUMNS values must be column-qualified references"
3741                            );
3742                            let mut table = column_qualified_reference.clone();
3743                            table.0.truncate(3);
3744                            curr_references.contains(&table)
3745                        });
3746
3747                        // Merge the current text columns into the new text columns.
3748                        new_text_columns.extend(text_columns);
3749
3750                        // If we have text columns, add them to the options.
3751                        if !new_text_columns.is_empty() {
3752                            new_text_columns.sort();
3753                            let new_text_columns = new_text_columns
3754                                .into_iter()
3755                                .map(WithOptionValue::UnresolvedItemName)
3756                                .collect();
3757
3758                            curr_options.push(PgConfigOption {
3759                                name: PgConfigOptionName::TextColumns,
3760                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3761                            });
3762                        }
3763                    }
3764                    CreateSourceConnection::MySql {
3765                        options: curr_options,
3766                        ..
3767                    } => {
3768                        let mz_sql::plan::MySqlConfigOptionExtracted {
3769                            mut text_columns,
3770                            mut exclude_columns,
3771                            ..
3772                        } = curr_options.clone().try_into()?;
3773
3774                        // Drop both ignore and text columns; we will add them back in
3775                        // as appropriate below.
3776                        curr_options.retain(|o| {
3777                            !matches!(
3778                                o.name,
3779                                MySqlConfigOptionName::TextColumns
3780                                    | MySqlConfigOptionName::ExcludeColumns
3781                            )
3782                        });
3783
3784                        // Drop all text / exclude columns that are not currently referred to.
3785                        let column_referenced =
3786                            |column_qualified_reference: &UnresolvedItemName| {
3787                                mz_ore::soft_assert_eq_or_log!(
3788                                    column_qualified_reference.0.len(),
3789                                    3,
3790                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3791                                );
3792                                let mut table = column_qualified_reference.clone();
3793                                table.0.truncate(2);
3794                                curr_references.contains(&table)
3795                            };
3796                        text_columns.retain(column_referenced);
3797                        exclude_columns.retain(column_referenced);
3798
3799                        // Merge the current text / exclude columns into the new text / exclude columns.
3800                        new_text_columns.extend(text_columns);
3801                        new_exclude_columns.extend(exclude_columns);
3802
3803                        // If we have text columns, add them to the options.
3804                        if !new_text_columns.is_empty() {
3805                            new_text_columns.sort();
3806                            let new_text_columns = new_text_columns
3807                                .into_iter()
3808                                .map(WithOptionValue::UnresolvedItemName)
3809                                .collect();
3810
3811                            curr_options.push(MySqlConfigOption {
3812                                name: MySqlConfigOptionName::TextColumns,
3813                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3814                            });
3815                        }
3816                        // If we have exclude columns, add them to the options.
3817                        if !new_exclude_columns.is_empty() {
3818                            new_exclude_columns.sort();
3819                            let new_exclude_columns = new_exclude_columns
3820                                .into_iter()
3821                                .map(WithOptionValue::UnresolvedItemName)
3822                                .collect();
3823
3824                            curr_options.push(MySqlConfigOption {
3825                                name: MySqlConfigOptionName::ExcludeColumns,
3826                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3827                            });
3828                        }
3829                    }
3830                    CreateSourceConnection::SqlServer {
3831                        options: curr_options,
3832                        ..
3833                    } => {
3834                        let mz_sql::plan::SqlServerConfigOptionExtracted {
3835                            mut text_columns,
3836                            mut exclude_columns,
3837                            ..
3838                        } = curr_options.clone().try_into()?;
3839
3840                        // Drop both ignore and text columns; we will add them back in
3841                        // as appropriate below.
3842                        curr_options.retain(|o| {
3843                            !matches!(
3844                                o.name,
3845                                SqlServerConfigOptionName::TextColumns
3846                                    | SqlServerConfigOptionName::ExcludeColumns
3847                            )
3848                        });
3849
3850                        // Drop all text / exclude columns that are not currently referred to.
3851                        // SQL Server text/exclude column refs are 3-part (schema.table.col),
3852                        // which truncate to 2-part (schema.table). But external references
3853                        // are 3-part (database.schema.table). Use suffix matching since
3854                        // a SQL Server source connects to a single database.
3855                        let column_referenced =
3856                            |column_qualified_reference: &UnresolvedItemName| {
3857                                mz_ore::soft_assert_eq_or_log!(
3858                                    column_qualified_reference.0.len(),
3859                                    3,
3860                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3861                                );
3862                                let mut table = column_qualified_reference.clone();
3863                                table.0.truncate(2);
3864                                curr_references.iter().any(|r| r.0.ends_with(&table.0))
3865                            };
3866                        text_columns.retain(column_referenced);
3867                        exclude_columns.retain(column_referenced);
3868
3869                        // Merge the current text / exclude columns into the new text / exclude columns.
3870                        new_text_columns.extend(text_columns);
3871                        new_exclude_columns.extend(exclude_columns);
3872
3873                        // If we have text columns, add them to the options.
3874                        if !new_text_columns.is_empty() {
3875                            new_text_columns.sort();
3876                            let new_text_columns = new_text_columns
3877                                .into_iter()
3878                                .map(WithOptionValue::UnresolvedItemName)
3879                                .collect();
3880
3881                            curr_options.push(SqlServerConfigOption {
3882                                name: SqlServerConfigOptionName::TextColumns,
3883                                value: Some(WithOptionValue::Sequence(new_text_columns)),
3884                            });
3885                        }
3886                        // If we have exclude columns, add them to the options.
3887                        if !new_exclude_columns.is_empty() {
3888                            new_exclude_columns.sort();
3889                            let new_exclude_columns = new_exclude_columns
3890                                .into_iter()
3891                                .map(WithOptionValue::UnresolvedItemName)
3892                                .collect();
3893
3894                            curr_options.push(SqlServerConfigOption {
3895                                name: SqlServerConfigOptionName::ExcludeColumns,
3896                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3897                            });
3898                        }
3899                    }
3900                    _ => return Err(purification_err()),
3901                };
3902
3903                let mut catalog = self.catalog().for_system_session();
3904                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3905
3906                // Re-define our source in terms of the amended statement
3907                let planned = mz_sql::plan::plan(
3908                    None,
3909                    &catalog,
3910                    Statement::CreateSource(create_source_stmt),
3911                    &Params::empty(),
3912                    &resolved_ids,
3913                )
3914                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3915                let plan = match planned {
3916                    Plan::CreateSource(plan) => plan,
3917                    _ => unreachable!("create source plan is only valid response"),
3918                };
3919
3920                // Asserting that we've done the right thing with dependencies
3921                // here requires mocking out objects in the catalog, which is a
3922                // large task for an operation we have to cover in tests anyway.
3923                let source = Source::new(
3924                    plan,
3925                    cur_source.global_id,
3926                    resolved_ids,
3927                    cur_source.custom_logical_compaction_window,
3928                    cur_source.is_retained_metrics_object,
3929                );
3930
3931                // Get new ingestion description for storage.
3932                let desc = match &source.data_source {
3933                    DataSourceDesc::Ingestion { desc, .. }
3934                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3935                        desc.clone().into_inline_connection(self.catalog().state())
3936                    }
3937                    _ => unreachable!("already verified of type ingestion"),
3938                };
3939
3940                self.controller
3941                    .storage
3942                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
3943                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3944
3945                // Redefine source. This must be done before we create any new
3946                // subsources so that it has the right ingestion.
3947                let mut ops = vec![catalog::Op::UpdateItem {
3948                    id: item_id,
3949                    // Look this up again so we don't have to hold an immutable reference to the
3950                    // entry for so long.
3951                    name: self.catalog.get_entry(&item_id).name().clone(),
3952                    to_item: CatalogItem::Source(source),
3953                }];
3954
3955                let CreateSourceInner {
3956                    ops: new_ops,
3957                    sources: _,
3958                    if_not_exists_ids,
3959                } = self.create_source_inner(session, subsources).await?;
3960
3961                ops.extend(new_ops.into_iter());
3962
3963                assert!(
3964                    if_not_exists_ids.is_empty(),
3965                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
3966                );
3967
3968                self.catalog_transact(Some(session), ops).await?;
3969            }
3970            plan::AlterSourceAction::RefreshReferences { references } => {
3971                self.catalog_transact(
3972                    Some(session),
3973                    vec![catalog::Op::UpdateSourceReferences {
3974                        source_id: item_id,
3975                        references: references.into(),
3976                    }],
3977                )
3978                .await?;
3979            }
3980        }
3981
3982        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3983    }
3984
3985    #[instrument]
3986    pub(super) async fn sequence_alter_system_set(
3987        &mut self,
3988        session: &Session,
3989        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
3990    ) -> Result<ExecuteResponse, AdapterError> {
3991        self.is_user_allowed_to_alter_system(session, Some(&name))?;
3992        // We want to ensure that the network policy we're switching too actually exists.
3993        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
3994            self.validate_alter_system_network_policy(session, &value)?;
3995        }
3996
3997        let op = match value {
3998            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
3999                name: name.clone(),
4000                value: OwnedVarInput::SqlSet(values),
4001            },
4002            plan::VariableValue::Default => {
4003                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4004            }
4005        };
4006        self.catalog_transact(Some(session), vec![op]).await?;
4007
4008        session.add_notice(AdapterNotice::VarDefaultUpdated {
4009            role: None,
4010            var_name: Some(name),
4011        });
4012        Ok(ExecuteResponse::AlteredSystemConfiguration)
4013    }
4014
4015    #[instrument]
4016    pub(super) async fn sequence_alter_system_reset(
4017        &mut self,
4018        session: &Session,
4019        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4020    ) -> Result<ExecuteResponse, AdapterError> {
4021        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4022        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4023        self.catalog_transact(Some(session), vec![op]).await?;
4024        session.add_notice(AdapterNotice::VarDefaultUpdated {
4025            role: None,
4026            var_name: Some(name),
4027        });
4028        Ok(ExecuteResponse::AlteredSystemConfiguration)
4029    }
4030
4031    #[instrument]
4032    pub(super) async fn sequence_alter_system_reset_all(
4033        &mut self,
4034        session: &Session,
4035        _: plan::AlterSystemResetAllPlan,
4036    ) -> Result<ExecuteResponse, AdapterError> {
4037        self.is_user_allowed_to_alter_system(session, None)?;
4038        let op = catalog::Op::ResetAllSystemConfiguration;
4039        self.catalog_transact(Some(session), vec![op]).await?;
4040        session.add_notice(AdapterNotice::VarDefaultUpdated {
4041            role: None,
4042            var_name: None,
4043        });
4044        Ok(ExecuteResponse::AlteredSystemConfiguration)
4045    }
4046
4047    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4048    fn is_user_allowed_to_alter_system(
4049        &self,
4050        session: &Session,
4051        var_name: Option<&str>,
4052    ) -> Result<(), AdapterError> {
4053        match (session.user().kind(), var_name) {
4054            // Only internal superusers can reset all system variables.
4055            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4056            // Whether or not a variable can be modified depends if we're an internal superuser.
4057            (UserKind::Superuser, Some(name))
4058                if session.user().is_internal()
4059                    || self.catalog().system_config().user_modifiable(name) =>
4060            {
4061                // In lieu of plumbing the user to all system config functions, just check that
4062                // the var is visible.
4063                let var = self.catalog().system_config().get(name)?;
4064                var.visible(session.user(), self.catalog().system_config())?;
4065                Ok(())
4066            }
4067            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4068            // session variables.
4069            (UserKind::Regular, Some(name))
4070                if self.catalog().system_config().user_modifiable(name) =>
4071            {
4072                Err(AdapterError::Unauthorized(
4073                    rbac::UnauthorizedError::Superuser {
4074                        action: format!("toggle the '{name}' system configuration parameter"),
4075                    },
4076                ))
4077            }
4078            _ => Err(AdapterError::Unauthorized(
4079                rbac::UnauthorizedError::MzSystem {
4080                    action: "alter system".into(),
4081                },
4082            )),
4083        }
4084    }
4085
4086    fn validate_alter_system_network_policy(
4087        &self,
4088        session: &Session,
4089        policy_value: &plan::VariableValue,
4090    ) -> Result<(), AdapterError> {
4091        let policy_name = match &policy_value {
4092            // Make sure the compiled in default still exists.
4093            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4094            plan::VariableValue::Values(values) if values.len() == 1 => {
4095                values.iter().next().cloned()
4096            }
4097            plan::VariableValue::Values(values) => {
4098                tracing::warn!(?values, "can't set multiple network policies at once");
4099                None
4100            }
4101        };
4102        let maybe_network_policy = policy_name
4103            .as_ref()
4104            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4105        let Some(network_policy) = maybe_network_policy else {
4106            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4107                VarError::InvalidParameterValue {
4108                    name: NETWORK_POLICY.name(),
4109                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4110                    reason: "no network policy with such name exists".to_string(),
4111                },
4112            )));
4113        };
4114        self.validate_alter_network_policy(session, &network_policy.rules)
4115    }
4116
4117    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4118    ///
4119    /// This helps prevent users from modifying network policies in a way that would lock out their
4120    /// current connection.
4121    fn validate_alter_network_policy(
4122        &self,
4123        session: &Session,
4124        policy_rules: &Vec<NetworkPolicyRule>,
4125    ) -> Result<(), AdapterError> {
4126        // If the user is not an internal user attempt to protect them from
4127        // blocking themselves.
4128        if session.user().is_internal() {
4129            return Ok(());
4130        }
4131        if let Some(ip) = session.meta().client_ip() {
4132            validate_ip_with_policy_rules(ip, policy_rules)
4133                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4134        } else {
4135            // Sessions without IPs are only temporarily constructed for default values
4136            // they should not be permitted here.
4137            return Err(AdapterError::NetworkPolicyDenied(
4138                NetworkPolicyError::MissingIp,
4139            ));
4140        }
4141        Ok(())
4142    }
4143
4144    // Returns the name of the portal to execute.
4145    #[instrument]
4146    pub(super) fn sequence_execute(
4147        &self,
4148        session: &mut Session,
4149        plan: plan::ExecutePlan,
4150    ) -> Result<String, AdapterError> {
4151        // Verify the stmt is still valid.
4152        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4153        let ps = session
4154            .get_prepared_statement_unverified(&plan.name)
4155            .expect("known to exist");
4156        let stmt = ps.stmt().cloned();
4157        let desc = ps.desc().clone();
4158        let state_revision = ps.state_revision;
4159        let logging = Arc::clone(ps.logging());
4160        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4161    }
4162
4163    #[instrument]
4164    pub(super) async fn sequence_grant_privileges(
4165        &mut self,
4166        session: &Session,
4167        plan::GrantPrivilegesPlan {
4168            update_privileges,
4169            grantees,
4170        }: plan::GrantPrivilegesPlan,
4171    ) -> Result<ExecuteResponse, AdapterError> {
4172        self.sequence_update_privileges(
4173            session,
4174            update_privileges,
4175            grantees,
4176            UpdatePrivilegeVariant::Grant,
4177        )
4178        .await
4179    }
4180
4181    #[instrument]
4182    pub(super) async fn sequence_revoke_privileges(
4183        &mut self,
4184        session: &Session,
4185        plan::RevokePrivilegesPlan {
4186            update_privileges,
4187            revokees,
4188        }: plan::RevokePrivilegesPlan,
4189    ) -> Result<ExecuteResponse, AdapterError> {
4190        self.sequence_update_privileges(
4191            session,
4192            update_privileges,
4193            revokees,
4194            UpdatePrivilegeVariant::Revoke,
4195        )
4196        .await
4197    }
4198
4199    #[instrument]
4200    async fn sequence_update_privileges(
4201        &mut self,
4202        session: &Session,
4203        update_privileges: Vec<UpdatePrivilege>,
4204        grantees: Vec<RoleId>,
4205        variant: UpdatePrivilegeVariant,
4206    ) -> Result<ExecuteResponse, AdapterError> {
4207        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4208        let mut warnings = Vec::new();
4209        let catalog = self.catalog().for_session(session);
4210
4211        for UpdatePrivilege {
4212            acl_mode,
4213            target_id,
4214            grantor,
4215        } in update_privileges
4216        {
4217            let actual_object_type = catalog.get_system_object_type(&target_id);
4218            // For all relations we allow all applicable table privileges, but send a warning if the
4219            // privilege isn't actually applicable to the object type.
4220            if actual_object_type.is_relation() {
4221                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4222                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4223                if !non_applicable_privileges.is_empty() {
4224                    let object_description =
4225                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4226                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4227                        non_applicable_privileges,
4228                        object_description,
4229                    })
4230                }
4231            }
4232
4233            if let SystemObjectId::Object(object_id) = &target_id {
4234                self.catalog()
4235                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4236            }
4237
4238            let privileges = self
4239                .catalog()
4240                .get_privileges(&target_id, session.conn_id())
4241                // Should be unreachable since the parser will refuse to parse grant/revoke
4242                // statements on objects without privileges.
4243                .ok_or(AdapterError::Unsupported(
4244                    "GRANTs/REVOKEs on an object type with no privileges",
4245                ))?;
4246
4247            for grantee in &grantees {
4248                self.catalog().ensure_not_system_role(grantee)?;
4249                self.catalog().ensure_not_predefined_role(grantee)?;
4250                let existing_privilege = privileges
4251                    .get_acl_item(grantee, &grantor)
4252                    .map(Cow::Borrowed)
4253                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4254
4255                match variant {
4256                    UpdatePrivilegeVariant::Grant
4257                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4258                    {
4259                        ops.push(catalog::Op::UpdatePrivilege {
4260                            target_id: target_id.clone(),
4261                            privilege: MzAclItem {
4262                                grantee: *grantee,
4263                                grantor,
4264                                acl_mode,
4265                            },
4266                            variant,
4267                        });
4268                    }
4269                    UpdatePrivilegeVariant::Revoke
4270                        if !existing_privilege
4271                            .acl_mode
4272                            .intersection(acl_mode)
4273                            .is_empty() =>
4274                    {
4275                        ops.push(catalog::Op::UpdatePrivilege {
4276                            target_id: target_id.clone(),
4277                            privilege: MzAclItem {
4278                                grantee: *grantee,
4279                                grantor,
4280                                acl_mode,
4281                            },
4282                            variant,
4283                        });
4284                    }
4285                    // no-op
4286                    _ => {}
4287                }
4288            }
4289        }
4290
4291        if ops.is_empty() {
4292            session.add_notices(warnings);
4293            return Ok(variant.into());
4294        }
4295
4296        let res = self
4297            .catalog_transact(Some(session), ops)
4298            .await
4299            .map(|_| match variant {
4300                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4301                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4302            });
4303        if res.is_ok() {
4304            session.add_notices(warnings);
4305        }
4306        res
4307    }
4308
4309    #[instrument]
4310    pub(super) async fn sequence_alter_default_privileges(
4311        &mut self,
4312        session: &Session,
4313        plan::AlterDefaultPrivilegesPlan {
4314            privilege_objects,
4315            privilege_acl_items,
4316            is_grant,
4317        }: plan::AlterDefaultPrivilegesPlan,
4318    ) -> Result<ExecuteResponse, AdapterError> {
4319        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4320        let variant = if is_grant {
4321            UpdatePrivilegeVariant::Grant
4322        } else {
4323            UpdatePrivilegeVariant::Revoke
4324        };
4325        for privilege_object in &privilege_objects {
4326            self.catalog()
4327                .ensure_not_system_role(&privilege_object.role_id)?;
4328            self.catalog()
4329                .ensure_not_predefined_role(&privilege_object.role_id)?;
4330            if let Some(database_id) = privilege_object.database_id {
4331                self.catalog()
4332                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4333            }
4334            if let Some(schema_id) = privilege_object.schema_id {
4335                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4336                let schema_spec: SchemaSpecifier = schema_id.into();
4337
4338                self.catalog().ensure_not_reserved_object(
4339                    &(database_spec, schema_spec).into(),
4340                    session.conn_id(),
4341                )?;
4342            }
4343            for privilege_acl_item in &privilege_acl_items {
4344                self.catalog()
4345                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4346                self.catalog()
4347                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4348                ops.push(catalog::Op::UpdateDefaultPrivilege {
4349                    privilege_object: privilege_object.clone(),
4350                    privilege_acl_item: privilege_acl_item.clone(),
4351                    variant,
4352                })
4353            }
4354        }
4355
4356        self.catalog_transact(Some(session), ops).await?;
4357        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4358    }
4359
4360    #[instrument]
4361    pub(super) async fn sequence_grant_role(
4362        &mut self,
4363        session: &Session,
4364        plan::GrantRolePlan {
4365            role_ids,
4366            member_ids,
4367            grantor_id,
4368        }: plan::GrantRolePlan,
4369    ) -> Result<ExecuteResponse, AdapterError> {
4370        let catalog = self.catalog();
4371        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4372        for role_id in role_ids {
4373            for member_id in &member_ids {
4374                let member_membership: BTreeSet<_> =
4375                    catalog.get_role(member_id).membership().keys().collect();
4376                if member_membership.contains(&role_id) {
4377                    let role_name = catalog.get_role(&role_id).name().to_string();
4378                    let member_name = catalog.get_role(member_id).name().to_string();
4379                    // We need this check so we don't accidentally return a success on a reserved role.
4380                    catalog.ensure_not_reserved_role(member_id)?;
4381                    catalog.ensure_grantable_role(&role_id)?;
4382                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4383                        role_name,
4384                        member_name,
4385                    });
4386                } else {
4387                    ops.push(catalog::Op::GrantRole {
4388                        role_id,
4389                        member_id: *member_id,
4390                        grantor_id,
4391                    });
4392                }
4393            }
4394        }
4395
4396        if ops.is_empty() {
4397            return Ok(ExecuteResponse::GrantedRole);
4398        }
4399
4400        self.catalog_transact(Some(session), ops)
4401            .await
4402            .map(|_| ExecuteResponse::GrantedRole)
4403    }
4404
4405    #[instrument]
4406    pub(super) async fn sequence_revoke_role(
4407        &mut self,
4408        session: &Session,
4409        plan::RevokeRolePlan {
4410            role_ids,
4411            member_ids,
4412            grantor_id,
4413        }: plan::RevokeRolePlan,
4414    ) -> Result<ExecuteResponse, AdapterError> {
4415        let catalog = self.catalog();
4416        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4417        for role_id in role_ids {
4418            for member_id in &member_ids {
4419                let member_membership: BTreeSet<_> =
4420                    catalog.get_role(member_id).membership().keys().collect();
4421                if !member_membership.contains(&role_id) {
4422                    let role_name = catalog.get_role(&role_id).name().to_string();
4423                    let member_name = catalog.get_role(member_id).name().to_string();
4424                    // We need this check so we don't accidentally return a success on a reserved role.
4425                    catalog.ensure_not_reserved_role(member_id)?;
4426                    catalog.ensure_grantable_role(&role_id)?;
4427                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4428                        role_name,
4429                        member_name,
4430                    });
4431                } else {
4432                    ops.push(catalog::Op::RevokeRole {
4433                        role_id,
4434                        member_id: *member_id,
4435                        grantor_id,
4436                    });
4437                }
4438            }
4439        }
4440
4441        if ops.is_empty() {
4442            return Ok(ExecuteResponse::RevokedRole);
4443        }
4444
4445        self.catalog_transact(Some(session), ops)
4446            .await
4447            .map(|_| ExecuteResponse::RevokedRole)
4448    }
4449
4450    #[instrument]
4451    pub(super) async fn sequence_alter_owner(
4452        &mut self,
4453        session: &Session,
4454        plan::AlterOwnerPlan {
4455            id,
4456            object_type,
4457            new_owner,
4458        }: plan::AlterOwnerPlan,
4459    ) -> Result<ExecuteResponse, AdapterError> {
4460        let mut ops = vec![catalog::Op::UpdateOwner {
4461            id: id.clone(),
4462            new_owner,
4463        }];
4464
4465        match &id {
4466            ObjectId::Item(global_id) => {
4467                let entry = self.catalog().get_entry(global_id);
4468
4469                // Cannot directly change the owner of an index.
4470                if entry.is_index() {
4471                    let name = self
4472                        .catalog()
4473                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4474                        .to_string();
4475                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4476                    return Ok(ExecuteResponse::AlteredObject(object_type));
4477                }
4478
4479                // Alter owner cascades down to dependent indexes.
4480                let dependent_index_ops = entry
4481                    .used_by()
4482                    .into_iter()
4483                    .filter(|id| self.catalog().get_entry(id).is_index())
4484                    .map(|id| catalog::Op::UpdateOwner {
4485                        id: ObjectId::Item(*id),
4486                        new_owner,
4487                    });
4488                ops.extend(dependent_index_ops);
4489
4490                // Alter owner cascades down to progress collections.
4491                let dependent_subsources =
4492                    entry
4493                        .progress_id()
4494                        .into_iter()
4495                        .map(|item_id| catalog::Op::UpdateOwner {
4496                            id: ObjectId::Item(item_id),
4497                            new_owner,
4498                        });
4499                ops.extend(dependent_subsources);
4500            }
4501            ObjectId::Cluster(cluster_id) => {
4502                let cluster = self.catalog().get_cluster(*cluster_id);
4503                // Alter owner cascades down to cluster replicas.
4504                let managed_cluster_replica_ops =
4505                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4506                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4507                        new_owner,
4508                    });
4509                ops.extend(managed_cluster_replica_ops);
4510            }
4511            _ => {}
4512        }
4513
4514        self.catalog_transact(Some(session), ops)
4515            .await
4516            .map(|_| ExecuteResponse::AlteredObject(object_type))
4517    }
4518
4519    #[instrument]
4520    pub(super) async fn sequence_reassign_owned(
4521        &mut self,
4522        session: &Session,
4523        plan::ReassignOwnedPlan {
4524            old_roles,
4525            new_role,
4526            reassign_ids,
4527        }: plan::ReassignOwnedPlan,
4528    ) -> Result<ExecuteResponse, AdapterError> {
4529        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4530            self.catalog().ensure_not_reserved_role(role_id)?;
4531        }
4532
4533        let ops = reassign_ids
4534            .into_iter()
4535            .map(|id| catalog::Op::UpdateOwner {
4536                id,
4537                new_owner: new_role,
4538            })
4539            .collect();
4540
4541        self.catalog_transact(Some(session), ops)
4542            .await
4543            .map(|_| ExecuteResponse::ReassignOwned)
4544    }
4545
4546    #[instrument]
4547    pub(crate) async fn handle_deferred_statement(&mut self) {
4548        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4549        // was processed, removing the single element from deferred_statements, so it is expected
4550        // that this is sometimes empty.
4551        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4552            return;
4553        };
4554        match ps {
4555            crate::coord::PlanStatement::Statement { stmt, params } => {
4556                self.handle_execute_inner(stmt, params, ctx).await;
4557            }
4558            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4559                self.sequence_plan(ctx, plan, resolved_ids).await;
4560            }
4561        }
4562    }
4563
4564    #[instrument]
4565    // TODO(parkmycar): Remove this once we have an actual implementation.
4566    #[allow(clippy::unused_async)]
4567    pub(super) async fn sequence_alter_table(
4568        &mut self,
4569        ctx: &mut ExecuteContext,
4570        plan: plan::AlterTablePlan,
4571    ) -> Result<ExecuteResponse, AdapterError> {
4572        let plan::AlterTablePlan {
4573            relation_id,
4574            column_name,
4575            column_type,
4576            raw_sql_type,
4577        } = plan;
4578
4579        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4580        let (_, new_global_id) = self.allocate_user_id().await?;
4581        let ops = vec![catalog::Op::AlterAddColumn {
4582            id: relation_id,
4583            new_global_id,
4584            name: column_name,
4585            typ: column_type,
4586            sql: raw_sql_type,
4587        }];
4588
4589        self.catalog_transact_with_context(None, Some(ctx), ops)
4590            .await?;
4591
4592        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4593    }
4594
4595    /// Prepares to apply a replacement materialized view.
4596    #[instrument]
4597    pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4598        &mut self,
4599        ctx: ExecuteContext,
4600        plan: AlterMaterializedViewApplyReplacementPlan,
4601    ) {
4602        // To ensure there is no time gap in the output, we can only apply a replacement if the
4603        // target's write frontier has caught up to the replacement dataflow's write frontier. This
4604        // might not be the case initially, so we have to wait. To this end, we install a watch set
4605        // waiting for the target MV's write frontier to advance sufficiently.
4606        //
4607        // Note that the replacement's dataflow is not performing any writes, so it can only be
4608        // ahead of the target initially due to as-of selection. Once the target has caught up, the
4609        // replacement's write frontier is always <= the target's.
4610
4611        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4612
4613        let plan_validity = PlanValidity::new(
4614            self.catalog().transient_revision(),
4615            BTreeSet::from_iter([id, replacement_id]),
4616            None,
4617            None,
4618            ctx.session().role_metadata().clone(),
4619        );
4620
4621        let target = self.catalog.get_entry(&id);
4622        let target_gid = target.latest_global_id();
4623
4624        let replacement = self.catalog.get_entry(&replacement_id);
4625        let replacement_gid = replacement.latest_global_id();
4626
4627        let target_upper = self
4628            .controller
4629            .storage_collections
4630            .collection_frontiers(target_gid)
4631            .expect("target MV exists")
4632            .write_frontier;
4633        let replacement_upper = self
4634            .controller
4635            .compute
4636            .collection_frontiers(replacement_gid, replacement.cluster_id())
4637            .expect("replacement MV exists")
4638            .write_frontier;
4639
4640        info!(
4641            %id, %replacement_id, ?target_upper, ?replacement_upper,
4642            "preparing materialized view replacement application",
4643        );
4644
4645        let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4646            // A replacement's write frontier can only become empty if the target's write frontier
4647            // has advanced to the empty frontier. In this case the MV is sealed for all times and
4648            // applying the replacement wouldn't have any effect. We use this opportunity to alert
4649            // the user by returning an error, rather than applying the useless replacement.
4650            //
4651            // Note that we can't assert on `target_upper` being empty here, because the reporting
4652            // of the target's frontier might be delayed. We'd have to fetch the current frontier
4653            // from persist, which we cannot do without incurring I/O.
4654            ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4655                name: target.name().item.clone(),
4656            }));
4657            return;
4658        };
4659
4660        // A watch set resolves when the watched objects' frontier becomes _greater_ than the
4661        // specified timestamp. Since we only need to wait until the target frontier is >= the
4662        // replacement's frontier, we can step back the timestamp.
4663        let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4664
4665        // TODO(database-issues#9820): If the target MV is dropped while we are waiting for
4666        // progress, the watch set never completes and neither does the `ALTER MATERIALIZED VIEW`
4667        // command.
4668        self.install_storage_watch_set(
4669            ctx.session().conn_id().clone(),
4670            BTreeSet::from_iter([target_gid]),
4671            replacement_upper_ts,
4672            WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4673                ctx: Some(ctx),
4674                otel_ctx: OpenTelemetryContext::obtain(),
4675                plan,
4676                plan_validity,
4677            }),
4678        )
4679        .expect("target collection exists");
4680    }
4681
4682    /// Finishes applying a replacement materialized view after the frontier wait completed.
4683    #[instrument]
4684    pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4685        &mut self,
4686        mut ctx: AlterMaterializedViewReadyContext,
4687    ) {
4688        ctx.otel_ctx.attach_as_parent();
4689
4690        let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4691
4692        // We avoid taking the DDL lock for `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT`
4693        // commands, see `Coordinator::must_serialize_ddl`. We therefore must assume that the
4694        // world has arbitrarily changed since we performed planning, and we must re-assert
4695        // that it still matches our requirements.
4696        if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4697            ctx.retire(Err(err));
4698            return;
4699        }
4700
4701        info!(
4702            %id, %replacement_id,
4703            "finishing materialized view replacement application",
4704        );
4705
4706        let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4707        match self
4708            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4709            .await
4710        {
4711            Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4712                ObjectType::MaterializedView,
4713            ))),
4714            Err(err) => ctx.retire(Err(err)),
4715        }
4716    }
4717
4718    pub(super) async fn statistics_oracle(
4719        &self,
4720        session: &Session,
4721        source_ids: &BTreeSet<GlobalId>,
4722        query_as_of: &Antichain<Timestamp>,
4723        is_oneshot: bool,
4724    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4725        super::statistics_oracle(
4726            session,
4727            source_ids,
4728            query_as_of,
4729            is_oneshot,
4730            self.catalog().system_config(),
4731            self.controller.storage_collections.as_ref(),
4732        )
4733        .await
4734    }
4735}
4736
4737impl Coordinator {
4738    /// Emit the raw optimizer notices in `notices` to the user's session, if
4739    /// any.
4740    ///
4741    /// This intentionally consumes `RawOptimizerNotice`s (not pre-rendered
4742    /// ones) because the user-facing rendering goes through the user's
4743    /// session-aware humanizer, which produces e.g. schema-qualified names
4744    /// relative to the user's current database/schema.
4745    pub(crate) fn emit_raw_optimizer_notices_to_user(
4746        &self,
4747        ctx: &ExecuteContext,
4748        notices: &[RawOptimizerNotice],
4749    ) {
4750        emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4751    }
4752
4753    /// Persist already-rendered optimizer notices for a newly created
4754    /// non-transient dataflow.
4755    ///
4756    /// This:
4757    /// - packs builtin-table updates for `mz_optimizer_notices` (if enabled),
4758    /// - stores the rendered metainfo on the catalog object via
4759    ///   `set_dataflow_metainfo`,
4760    /// - and returns a future that resolves once the builtin-table append
4761    ///   has been observed, or `None` if nothing was appended.
4762    async fn persist_dataflow_metainfo(
4763        &mut self,
4764        df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4765        export_id: GlobalId,
4766    ) -> Option<BuiltinTableAppendNotify> {
4767        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
4768        // in-memory state.
4769        if self.catalog().state().system_config().enable_mz_notices()
4770            && !df_meta.optimizer_notices.is_empty()
4771        {
4772            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4773            self.catalog().state().pack_optimizer_notices(
4774                &mut builtin_table_updates,
4775                df_meta.optimizer_notices.iter(),
4776                Diff::ONE,
4777            );
4778
4779            // Save the metainfo.
4780            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4781
4782            Some(
4783                self.builtin_table_update()
4784                    .execute(builtin_table_updates)
4785                    .await
4786                    .0,
4787            )
4788        } else {
4789            // Save the metainfo.
4790            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4791
4792            None
4793        }
4794    }
4795}