mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27    CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46    CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47    RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
51    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
52    SqlServerConfigOptionName,
53};
54use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
55use mz_sql::catalog::{
56    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
57    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
58    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
59};
60use mz_sql::names::{
61    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
62    SchemaSpecifier, SystemObjectId,
63};
64use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
68// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
69use mz_sql::plan::{
70    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
78    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85    WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_storage_types::stats::RelationPartStats;
93use mz_transform::EmptyStatisticsOracle;
94use mz_transform::dataflow::DataflowMetainfo;
95use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
96use smallvec::SmallVec;
97use timely::progress::Antichain;
98use timely::progress::Timestamp as TimelyTimestamp;
99use tokio::sync::{oneshot, watch};
100use tracing::{Instrument, Span, info, warn};
101
102use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
103use crate::command::{ExecuteResponse, Response};
104use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
105use crate::coord::{
106    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
107    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
108    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
109    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
110    validate_ip_with_policy_rules,
111};
112use crate::error::AdapterError;
113use crate::notice::{AdapterNotice, DroppedInUseIndex};
114use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
115use crate::optimize::{self, Optimize};
116use crate::session::{
117    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
118    WriteLocks, WriteOp,
119};
120use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
121use crate::{PeekResponseUnary, ReadHolds};
122
123mod cluster;
124mod copy_from;
125mod create_continual_task;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134/// Attempts to evaluate an expression. If an error is returned then the error is sent
135/// to the client and the function is exited.
136macro_rules! return_if_err {
137    ($expr:expr, $ctx:expr) => {
138        match $expr {
139            Ok(v) => v,
140            Err(e) => return $ctx.retire(Err(e.into())),
141        }
142    };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148    ops: Vec<catalog::Op>,
149    dropped_active_db: bool,
150    dropped_active_cluster: bool,
151    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154// A bundle of values returned from create_source_inner
155struct CreateSourceInner {
156    ops: Vec<catalog::Op>,
157    sources: Vec<(CatalogItemId, Source)>,
158    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
163    /// execute both on and off of the coordinator thread. Stages can either produce another stage
164    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
165    /// tracing.
166    pub(crate) async fn sequence_staged<S>(
167        &mut self,
168        mut ctx: S::Ctx,
169        parent_span: Span,
170        mut stage: S,
171    ) where
172        S: Staged + 'static,
173        S::Ctx: Send + 'static,
174    {
175        return_if_err!(stage.validity().check(self.catalog()), ctx);
176        loop {
177            let mut cancel_enabled = stage.cancel_enabled();
178            if let Some(session) = ctx.session() {
179                if cancel_enabled {
180                    // Channel to await cancellation. Insert a new channel, but check if the previous one
181                    // was already canceled.
182                    if let Some((_prev_tx, prev_rx)) = self
183                        .staged_cancellation
184                        .insert(session.conn_id().clone(), watch::channel(false))
185                    {
186                        let was_canceled = *prev_rx.borrow();
187                        if was_canceled {
188                            ctx.retire(Err(AdapterError::Canceled));
189                            return;
190                        }
191                    }
192                } else {
193                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
194                    // when cancel_enabled may have been true on an earlier stage.
195                    self.staged_cancellation.remove(session.conn_id());
196                }
197            } else {
198                cancel_enabled = false
199            };
200            let next = stage
201                .stage(self, &mut ctx)
202                .instrument(parent_span.clone())
203                .await;
204            let res = return_if_err!(next, ctx);
205            stage = match res {
206                StageResult::Handle(handle) => {
207                    let internal_cmd_tx = self.internal_cmd_tx.clone();
208                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210                    });
211                    return;
212                }
213                StageResult::HandleRetire(handle) => {
214                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215                        ctx.retire(Ok(resp));
216                    });
217                    return;
218                }
219                StageResult::Response(resp) => {
220                    ctx.retire(Ok(resp));
221                    return;
222                }
223                StageResult::Immediate(stage) => *stage,
224            }
225        }
226    }
227
228    fn handle_spawn<C, T, F>(
229        &self,
230        ctx: C,
231        handle: JoinHandle<Result<T, AdapterError>>,
232        cancel_enabled: bool,
233        f: F,
234    ) where
235        C: StagedContext + Send + 'static,
236        T: Send + 'static,
237        F: FnOnce(C, T) + Send + 'static,
238    {
239        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240            .session()
241            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242        {
243            let mut rx = rx.clone();
244            Box::pin(async move {
245                // Wait for true or dropped sender.
246                let _ = rx.wait_for(|v| *v).await;
247                ()
248            })
249        } else {
250            Box::pin(future::pending())
251        };
252        spawn(|| "sequence_staged", async move {
253            tokio::select! {
254                res = handle => {
255                    let next = match res {
256                        Ok(next) => return_if_err!(next, ctx),
257                        Err(err) => {
258                            tracing::error!("sequence_staged join error {err}");
259                            ctx.retire(Err(AdapterError::Internal(
260                                "sequence_staged join error".into(),
261                            )));
262                            return;
263                        }
264                    };
265                    f(ctx, next);
266                }
267                _ = rx, if cancel_enabled => {
268                    ctx.retire(Err(AdapterError::Canceled));
269                }
270            }
271        });
272    }
273
274    async fn create_source_inner(
275        &self,
276        session: &Session,
277        plans: Vec<plan::CreateSourcePlanBundle>,
278    ) -> Result<CreateSourceInner, AdapterError> {
279        let mut ops = vec![];
280        let mut sources = vec![];
281
282        let if_not_exists_ids = plans
283            .iter()
284            .filter_map(
285                |plan::CreateSourcePlanBundle {
286                     item_id,
287                     global_id: _,
288                     plan,
289                     resolved_ids: _,
290                     available_source_references: _,
291                 }| {
292                    if plan.if_not_exists {
293                        Some((*item_id, plan.name.clone()))
294                    } else {
295                        None
296                    }
297                },
298            )
299            .collect::<BTreeMap<_, _>>();
300
301        for plan::CreateSourcePlanBundle {
302            item_id,
303            global_id,
304            mut plan,
305            resolved_ids,
306            available_source_references,
307        } in plans
308        {
309            let name = plan.name.clone();
310
311            match plan.source.data_source {
312                plan::DataSourceDesc::Ingestion(ref desc)
313                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
314                    let cluster_id = plan
315                        .in_cluster
316                        .expect("ingestion plans must specify cluster");
317                    match desc.connection {
318                        GenericSourceConnection::Postgres(_)
319                        | GenericSourceConnection::MySql(_)
320                        | GenericSourceConnection::SqlServer(_)
321                        | GenericSourceConnection::Kafka(_)
322                        | GenericSourceConnection::LoadGenerator(_) => {
323                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
324                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
325                                    .get(self.catalog().system_config().dyncfgs());
326
327                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
328                                {
329                                    return Err(AdapterError::Unsupported(
330                                        "sources in clusters with >1 replicas",
331                                    ));
332                                }
333                            }
334                        }
335                    }
336                }
337                plan::DataSourceDesc::Webhook { .. } => {
338                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
339                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
340                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
341                            .get(self.catalog().system_config().dyncfgs());
342
343                        if !enable_multi_replica_sources {
344                            if cluster.replica_ids().len() > 1 {
345                                return Err(AdapterError::Unsupported(
346                                    "webhook sources in clusters with >1 replicas",
347                                ));
348                            }
349                        }
350                    }
351                }
352                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
353            }
354
355            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
356            if let mz_sql::plan::DataSourceDesc::Webhook {
357                validate_using: Some(validate),
358                ..
359            } = &mut plan.source.data_source
360            {
361                if let Err(reason) = validate.reduce_expression().await {
362                    self.metrics
363                        .webhook_validation_reduce_failures
364                        .with_label_values(&[reason])
365                        .inc();
366                    return Err(AdapterError::Internal(format!(
367                        "failed to reduce check expression, {reason}"
368                    )));
369                }
370            }
371
372            // If this source contained a set of available source references, update the
373            // source references catalog table.
374            let mut reference_ops = vec![];
375            if let Some(references) = &available_source_references {
376                reference_ops.push(catalog::Op::UpdateSourceReferences {
377                    source_id: item_id,
378                    references: references.clone().into(),
379                });
380            }
381
382            let source = Source::new(plan, global_id, resolved_ids, None, false);
383            ops.push(catalog::Op::CreateItem {
384                id: item_id,
385                name,
386                item: CatalogItem::Source(source.clone()),
387                owner_id: *session.current_role_id(),
388            });
389            sources.push((item_id, source));
390            // These operations must be executed after the source is added to the catalog.
391            ops.extend(reference_ops);
392        }
393
394        Ok(CreateSourceInner {
395            ops,
396            sources,
397            if_not_exists_ids,
398        })
399    }
400
401    /// Subsources are planned differently from other statements because they
402    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
403    /// Because of this, we have usually "missed" the opportunity to plan them
404    /// through the normal statement execution life cycle (the exception being
405    /// during bootstrapping).
406    ///
407    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
408    pub(crate) fn plan_subsource(
409        &self,
410        session: &Session,
411        params: &mz_sql::plan::Params,
412        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
413        item_id: CatalogItemId,
414        global_id: GlobalId,
415    ) -> Result<CreateSourcePlanBundle, AdapterError> {
416        let catalog = self.catalog().for_session(session);
417        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
418
419        let plan = self.plan_statement(
420            session,
421            Statement::CreateSubsource(subsource_stmt),
422            params,
423            &resolved_ids,
424        )?;
425        let plan = match plan {
426            Plan::CreateSource(plan) => plan,
427            _ => unreachable!(),
428        };
429        Ok(CreateSourcePlanBundle {
430            item_id,
431            global_id,
432            plan,
433            resolved_ids,
434            available_source_references: None,
435        })
436    }
437
438    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
439    pub(crate) async fn plan_purified_alter_source_add_subsource(
440        &mut self,
441        session: &Session,
442        params: Params,
443        source_name: ResolvedItemName,
444        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
445        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
446    ) -> Result<(Plan, ResolvedIds), AdapterError> {
447        let mut subsource_plans = Vec::with_capacity(subsources.len());
448
449        // Generate subsource statements
450        let conn_catalog = self.catalog().for_system_session();
451        let pcx = plan::PlanContext::zero();
452        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
453
454        let entry = self.catalog().get_entry(source_name.item_id());
455        let source = entry.source().ok_or_else(|| {
456            AdapterError::internal(
457                "plan alter source",
458                format!("expected Source found {entry:?}"),
459            )
460        })?;
461
462        let item_id = entry.id();
463        let ingestion_id = source.global_id();
464        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
465
466        let id_ts = self.get_catalog_write_ts().await;
467        let ids = self
468            .catalog()
469            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
470            .await?;
471        for (subsource_stmt, (item_id, global_id)) in
472            subsource_stmts.into_iter().zip_eq(ids.into_iter())
473        {
474            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
475            subsource_plans.push(s);
476        }
477
478        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
479            subsources: subsource_plans,
480            options,
481        };
482
483        Ok((
484            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
485                item_id,
486                ingestion_id,
487                action,
488            }),
489            ResolvedIds::empty(),
490        ))
491    }
492
493    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
494    pub(crate) fn plan_purified_alter_source_refresh_references(
495        &self,
496        _session: &Session,
497        _params: Params,
498        source_name: ResolvedItemName,
499        available_source_references: plan::SourceReferences,
500    ) -> Result<(Plan, ResolvedIds), AdapterError> {
501        let entry = self.catalog().get_entry(source_name.item_id());
502        let source = entry.source().ok_or_else(|| {
503            AdapterError::internal(
504                "plan alter source",
505                format!("expected Source found {entry:?}"),
506            )
507        })?;
508        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
509            references: available_source_references,
510        };
511
512        Ok((
513            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
514                item_id: entry.id(),
515                ingestion_id: source.global_id(),
516                action,
517            }),
518            ResolvedIds::empty(),
519        ))
520    }
521
522    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
523    /// the primary source, and any ingestion export subsources (e.g. PG
524    /// tables).
525    pub(crate) async fn plan_purified_create_source(
526        &mut self,
527        ctx: &ExecuteContext,
528        params: Params,
529        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
530        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
531        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
532        available_source_references: plan::SourceReferences,
533    ) -> Result<(Plan, ResolvedIds), AdapterError> {
534        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
535
536        // 1. First plan the progress subsource, if any.
537        if let Some(progress_stmt) = progress_stmt {
538            // The primary source depends on this subsource because the primary
539            // source needs to know its shard ID, and the easiest way of
540            // guaranteeing that the shard ID is discoverable is to create this
541            // collection first.
542            assert_none!(progress_stmt.of_source);
543            let id_ts = self.get_catalog_write_ts().await;
544            let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
545            let progress_plan =
546                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
547            let progress_full_name = self
548                .catalog()
549                .resolve_full_name(&progress_plan.plan.name, None);
550            let progress_subsource = ResolvedItemName::Item {
551                id: progress_plan.item_id,
552                qualifiers: progress_plan.plan.name.qualifiers.clone(),
553                full_name: progress_full_name,
554                print_id: true,
555                version: RelationVersionSelector::Latest,
556            };
557
558            create_source_plans.push(progress_plan);
559
560            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
561        }
562
563        let catalog = self.catalog().for_session(ctx.session());
564        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
565
566        let propagated_with_options: Vec<_> = source_stmt
567            .with_options
568            .iter()
569            .filter_map(|opt| match opt.name {
570                CreateSourceOptionName::TimestampInterval => None,
571                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
572                    name: CreateSubsourceOptionName::RetainHistory,
573                    value: opt.value.clone(),
574                }),
575            })
576            .collect();
577
578        // 2. Then plan the main source.
579        let source_plan = match self.plan_statement(
580            ctx.session(),
581            Statement::CreateSource(source_stmt),
582            &params,
583            &resolved_ids,
584        )? {
585            Plan::CreateSource(plan) => plan,
586            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
587        };
588
589        let id_ts = self.get_catalog_write_ts().await;
590        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
591
592        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
593        let of_source = ResolvedItemName::Item {
594            id: item_id,
595            qualifiers: source_plan.name.qualifiers.clone(),
596            full_name: source_full_name,
597            print_id: true,
598            version: RelationVersionSelector::Latest,
599        };
600
601        // Generate subsource statements
602        let conn_catalog = self.catalog().for_system_session();
603        let pcx = plan::PlanContext::zero();
604        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
605
606        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
607
608        for subsource_stmt in subsource_stmts.iter_mut() {
609            subsource_stmt
610                .with_options
611                .extend(propagated_with_options.iter().cloned())
612        }
613
614        create_source_plans.push(CreateSourcePlanBundle {
615            item_id,
616            global_id,
617            plan: source_plan,
618            resolved_ids: resolved_ids.clone(),
619            available_source_references: Some(available_source_references),
620        });
621
622        // 3. Finally, plan all the subsources
623        let id_ts = self.get_catalog_write_ts().await;
624        let ids = self
625            .catalog()
626            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
627            .await?;
628        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
629            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
630            create_source_plans.push(plan);
631        }
632
633        Ok((
634            Plan::CreateSources(create_source_plans),
635            ResolvedIds::empty(),
636        ))
637    }
638
639    #[instrument]
640    pub(super) async fn sequence_create_source(
641        &mut self,
642        ctx: &mut ExecuteContext,
643        plans: Vec<plan::CreateSourcePlanBundle>,
644    ) -> Result<ExecuteResponse, AdapterError> {
645        let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
646        let CreateSourceInner {
647            ops,
648            sources,
649            if_not_exists_ids,
650        } = self.create_source_inner(ctx.session(), plans).await?;
651
652        let transact_result = self
653            .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
654                Box::pin(async move {
655                    // TODO(roshan): This will be unnecessary when we drop support for
656                    // automatically created subsources.
657                    // To improve the performance of creating a source and many
658                    // subsources, we create all of the collections at once. If we
659                    // created each collection independently, we would reschedule
660                    // the primary source's execution for each subsources, causing
661                    // unncessary thrashing.
662                    //
663                    // Note that creating all of the collections at once should
664                    // never be semantic bearing; we should always be able to break
665                    // this apart into creating the collections sequentially.
666                    let mut collections = Vec::with_capacity(sources.len());
667
668                    for (item_id, source) in sources {
669                        let source_status_item_id =
670                            coord.catalog().resolve_builtin_storage_collection(
671                                &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
672                            );
673                        let source_status_collection_id = Some(
674                            coord
675                                .catalog()
676                                .get_entry(&source_status_item_id)
677                                .latest_global_id(),
678                        );
679
680                        let (data_source, status_collection_id) = match source.data_source {
681                            DataSourceDesc::Ingestion { desc, cluster_id } => {
682                                let desc = desc.into_inline_connection(coord.catalog().state());
683                                let item_global_id =
684                                    coord.catalog().get_entry(&item_id).latest_global_id();
685
686                                let ingestion =
687                                    IngestionDescription::new(desc, cluster_id, item_global_id);
688
689                                (
690                                    DataSource::Ingestion(ingestion),
691                                    source_status_collection_id,
692                                )
693                            }
694                            DataSourceDesc::OldSyntaxIngestion {
695                                desc,
696                                progress_subsource,
697                                data_config,
698                                details,
699                                cluster_id,
700                            } => {
701                                let desc = desc.into_inline_connection(coord.catalog().state());
702                                let data_config =
703                                    data_config.into_inline_connection(coord.catalog().state());
704                                // TODO(parkmycar): We should probably check the type here, but I'm not
705                                // sure if this will always be a Source or a Table.
706                                let progress_subsource = coord
707                                    .catalog()
708                                    .get_entry(&progress_subsource)
709                                    .latest_global_id();
710
711                                let mut ingestion =
712                                    IngestionDescription::new(desc, cluster_id, progress_subsource);
713                                let legacy_export = SourceExport {
714                                    storage_metadata: (),
715                                    data_config,
716                                    details,
717                                };
718                                ingestion
719                                    .source_exports
720                                    .insert(source.global_id, legacy_export);
721
722                                (
723                                    DataSource::Ingestion(ingestion),
724                                    source_status_collection_id,
725                                )
726                            }
727                            DataSourceDesc::IngestionExport {
728                                ingestion_id,
729                                external_reference: _,
730                                details,
731                                data_config,
732                            } => {
733                                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
734                                // this will always be a Source or a Table.
735                                let ingestion_id =
736                                    coord.catalog().get_entry(&ingestion_id).latest_global_id();
737                                (
738                                    DataSource::IngestionExport {
739                                        ingestion_id,
740                                        details,
741                                        data_config: data_config
742                                            .into_inline_connection(coord.catalog().state()),
743                                    },
744                                    source_status_collection_id,
745                                )
746                            }
747                            DataSourceDesc::Progress => (DataSource::Progress, None),
748                            DataSourceDesc::Webhook { .. } => {
749                                if let Some(url) =
750                                    coord.catalog().state().try_get_webhook_url(&item_id)
751                                {
752                                    if let Some(ctx) = ctx.as_ref() {
753                                        ctx.session()
754                                            .add_notice(AdapterNotice::WebhookSourceCreated { url })
755                                    }
756                                }
757
758                                (DataSource::Webhook, None)
759                            }
760                            DataSourceDesc::Introspection(_) => {
761                                unreachable!(
762                                    "cannot create sources with introspection data sources"
763                                )
764                            }
765                        };
766
767                        collections.push((
768                            source.global_id,
769                            CollectionDescription::<Timestamp> {
770                                desc: source.desc.clone(),
771                                data_source,
772                                timeline: Some(source.timeline),
773                                since: None,
774                                status_collection_id,
775                            },
776                        ));
777                    }
778
779                    let storage_metadata = coord.catalog.state().storage_metadata();
780
781                    coord
782                        .controller
783                        .storage
784                        .create_collections(storage_metadata, None, collections)
785                        .await
786                        .unwrap_or_terminate("cannot fail to create collections");
787
788                    // It is _very_ important that we only initialize read policies
789                    // after we have created all the sources/collections. Some of
790                    // the sources created in this collection might have
791                    // dependencies on other sources, so the controller must get a
792                    // chance to install read holds before we set a policy that
793                    // might make the since advance.
794                    //
795                    // One instance of this is the remap shard: it presents as a
796                    // SUBSOURCE, and all other SUBSOURCES of a SOURCE will depend
797                    // on it. Both subsources and sources will show up as a `Source`
798                    // in the above.
799                    // Although there should only be one parent source that sequence_create_source is
800                    // ever called with, hedge our bets a bit and collect the compaction windows for
801                    // each id in the bundle (these should all be identical). This is some extra work
802                    // but seems safer.
803                    let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
804                    for (compaction_window, storage_policies) in read_policies {
805                        coord
806                            .initialize_storage_read_policies(storage_policies, compaction_window)
807                            .await;
808                    }
809                })
810            })
811            .await;
812
813        match transact_result {
814            Ok(()) => Ok(ExecuteResponse::CreatedSource),
815            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
816                kind:
817                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
818            })) if if_not_exists_ids.contains_key(&id) => {
819                ctx.session()
820                    .add_notice(AdapterNotice::ObjectAlreadyExists {
821                        name: if_not_exists_ids[&id].item.clone(),
822                        ty: "source",
823                    });
824                Ok(ExecuteResponse::CreatedSource)
825            }
826            Err(err) => Err(err),
827        }
828    }
829
830    #[instrument]
831    pub(super) async fn sequence_create_connection(
832        &mut self,
833        mut ctx: ExecuteContext,
834        plan: plan::CreateConnectionPlan,
835        resolved_ids: ResolvedIds,
836    ) {
837        let id_ts = self.get_catalog_write_ts().await;
838        let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
839            Ok(item_id) => item_id,
840            Err(err) => return ctx.retire(Err(err.into())),
841        };
842
843        match &plan.connection.details {
844            ConnectionDetails::Ssh { key_1, key_2, .. } => {
845                let key_1 = match key_1.as_key_pair() {
846                    Some(key_1) => key_1.clone(),
847                    None => {
848                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
849                            "the PUBLIC KEY 1 option cannot be explicitly specified"
850                        ))));
851                    }
852                };
853
854                let key_2 = match key_2.as_key_pair() {
855                    Some(key_2) => key_2.clone(),
856                    None => {
857                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
858                            "the PUBLIC KEY 2 option cannot be explicitly specified"
859                        ))));
860                    }
861                };
862
863                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
864                let secret = key_set.to_bytes();
865                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
866                    return ctx.retire(Err(err.into()));
867                }
868            }
869            _ => (),
870        };
871
872        if plan.validate {
873            let internal_cmd_tx = self.internal_cmd_tx.clone();
874            let transient_revision = self.catalog().transient_revision();
875            let conn_id = ctx.session().conn_id().clone();
876            let otel_ctx = OpenTelemetryContext::obtain();
877            let role_metadata = ctx.session().role_metadata().clone();
878
879            let connection = plan
880                .connection
881                .details
882                .to_connection()
883                .into_inline_connection(self.catalog().state());
884
885            let current_storage_parameters = self.controller.storage.config().clone();
886            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
887                let result = match connection
888                    .validate(connection_id, &current_storage_parameters)
889                    .await
890                {
891                    Ok(()) => Ok(plan),
892                    Err(err) => Err(err.into()),
893                };
894
895                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
896                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
897                    CreateConnectionValidationReady {
898                        ctx,
899                        result,
900                        connection_id,
901                        connection_gid,
902                        plan_validity: PlanValidity::new(
903                            transient_revision,
904                            resolved_ids.items().copied().collect(),
905                            None,
906                            None,
907                            role_metadata,
908                        ),
909                        otel_ctx,
910                        resolved_ids: resolved_ids.clone(),
911                    },
912                ));
913                if let Err(e) = result {
914                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
915                }
916            });
917        } else {
918            let result = self
919                .sequence_create_connection_stage_finish(
920                    &mut ctx,
921                    connection_id,
922                    connection_gid,
923                    plan,
924                    resolved_ids,
925                )
926                .await;
927            ctx.retire(result);
928        }
929    }
930
931    #[instrument]
932    pub(crate) async fn sequence_create_connection_stage_finish(
933        &mut self,
934        ctx: &mut ExecuteContext,
935        connection_id: CatalogItemId,
936        connection_gid: GlobalId,
937        plan: plan::CreateConnectionPlan,
938        resolved_ids: ResolvedIds,
939    ) -> Result<ExecuteResponse, AdapterError> {
940        let ops = vec![catalog::Op::CreateItem {
941            id: connection_id,
942            name: plan.name.clone(),
943            item: CatalogItem::Connection(Connection {
944                create_sql: plan.connection.create_sql,
945                global_id: connection_gid,
946                details: plan.connection.details.clone(),
947                resolved_ids,
948            }),
949            owner_id: *ctx.session().current_role_id(),
950        }];
951
952        let transact_result = self
953            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
954                Box::pin(async move {
955                    match plan.connection.details {
956                        ConnectionDetails::AwsPrivatelink(ref privatelink) => {
957                            let spec = VpcEndpointConfig {
958                                aws_service_name: privatelink.service_name.to_owned(),
959                                availability_zone_ids: privatelink.availability_zones.to_owned(),
960                            };
961                            let cloud_resource_controller =
962                                match coord.cloud_resource_controller.as_ref().cloned() {
963                                    Some(controller) => controller,
964                                    None => {
965                                        tracing::warn!("AWS PrivateLink connections unsupported");
966                                        return;
967                                    }
968                                };
969                            if let Err(err) = cloud_resource_controller
970                                .ensure_vpc_endpoint(connection_id, spec)
971                                .await
972                            {
973                                tracing::warn!(?err, "failed to ensure vpc endpoint!");
974                            }
975                        }
976                        _ => {}
977                    }
978                })
979            })
980            .await;
981
982        match transact_result {
983            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
984            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
985                kind:
986                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
987            })) if plan.if_not_exists => {
988                ctx.session()
989                    .add_notice(AdapterNotice::ObjectAlreadyExists {
990                        name: plan.name.item,
991                        ty: "connection",
992                    });
993                Ok(ExecuteResponse::CreatedConnection)
994            }
995            Err(err) => Err(err),
996        }
997    }
998
999    #[instrument]
1000    pub(super) async fn sequence_create_database(
1001        &mut self,
1002        session: &mut Session,
1003        plan: plan::CreateDatabasePlan,
1004    ) -> Result<ExecuteResponse, AdapterError> {
1005        let ops = vec![catalog::Op::CreateDatabase {
1006            name: plan.name.clone(),
1007            owner_id: *session.current_role_id(),
1008        }];
1009        match self.catalog_transact(Some(session), ops).await {
1010            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
1011            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1012                kind:
1013                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
1014            })) if plan.if_not_exists => {
1015                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
1016                Ok(ExecuteResponse::CreatedDatabase)
1017            }
1018            Err(err) => Err(err),
1019        }
1020    }
1021
1022    #[instrument]
1023    pub(super) async fn sequence_create_schema(
1024        &mut self,
1025        session: &mut Session,
1026        plan: plan::CreateSchemaPlan,
1027    ) -> Result<ExecuteResponse, AdapterError> {
1028        let op = catalog::Op::CreateSchema {
1029            database_id: plan.database_spec,
1030            schema_name: plan.schema_name.clone(),
1031            owner_id: *session.current_role_id(),
1032        };
1033        match self.catalog_transact(Some(session), vec![op]).await {
1034            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1035            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1036                kind:
1037                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1038            })) if plan.if_not_exists => {
1039                session.add_notice(AdapterNotice::SchemaAlreadyExists {
1040                    name: plan.schema_name,
1041                });
1042                Ok(ExecuteResponse::CreatedSchema)
1043            }
1044            Err(err) => Err(err),
1045        }
1046    }
1047
1048    /// Validates the role attributes for a `CREATE ROLE` statement.
1049    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
1050        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1051            if attributes.superuser.is_some()
1052                || attributes.password.is_some()
1053                || attributes.login.is_some()
1054            {
1055                return Err(AdapterError::UnavailableFeature {
1056                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1057                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1058                });
1059            }
1060        }
1061        Ok(())
1062    }
1063
1064    #[instrument]
1065    pub(super) async fn sequence_create_role(
1066        &mut self,
1067        conn_id: Option<&ConnectionId>,
1068        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1069    ) -> Result<ExecuteResponse, AdapterError> {
1070        self.validate_role_attributes(&attributes.clone())?;
1071        let op = catalog::Op::CreateRole { name, attributes };
1072        self.catalog_transact_conn(conn_id, vec![op])
1073            .await
1074            .map(|_| ExecuteResponse::CreatedRole)
1075    }
1076
1077    #[instrument]
1078    pub(super) async fn sequence_create_network_policy(
1079        &mut self,
1080        session: &Session,
1081        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1082    ) -> Result<ExecuteResponse, AdapterError> {
1083        let op = catalog::Op::CreateNetworkPolicy {
1084            rules,
1085            name,
1086            owner_id: *session.current_role_id(),
1087        };
1088        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1089            .await
1090            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1091    }
1092
1093    #[instrument]
1094    pub(super) async fn sequence_alter_network_policy(
1095        &mut self,
1096        session: &Session,
1097        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1098    ) -> Result<ExecuteResponse, AdapterError> {
1099        // TODO(network_policy): Consider role based network policies here.
1100        let current_network_policy_name =
1101            self.catalog().system_config().default_network_policy_name();
1102        // Check if the way we're alerting the policy is still valid for the current connection.
1103        if current_network_policy_name == name {
1104            self.validate_alter_network_policy(session, &rules)?;
1105        }
1106
1107        let op = catalog::Op::AlterNetworkPolicy {
1108            id,
1109            rules,
1110            name,
1111            owner_id: *session.current_role_id(),
1112        };
1113        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1114            .await
1115            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1116    }
1117
1118    #[instrument]
1119    pub(super) async fn sequence_create_table(
1120        &mut self,
1121        ctx: &mut ExecuteContext,
1122        plan: plan::CreateTablePlan,
1123        resolved_ids: ResolvedIds,
1124    ) -> Result<ExecuteResponse, AdapterError> {
1125        let plan::CreateTablePlan {
1126            name,
1127            table,
1128            if_not_exists,
1129        } = plan;
1130
1131        let conn_id = if table.temporary {
1132            Some(ctx.session().conn_id())
1133        } else {
1134            None
1135        };
1136        let id_ts = self.get_catalog_write_ts().await;
1137        let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1138        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1139
1140        let data_source = match table.data_source {
1141            plan::TableDataSource::TableWrites { defaults } => {
1142                TableDataSource::TableWrites { defaults }
1143            }
1144            plan::TableDataSource::DataSource {
1145                desc: data_source_plan,
1146                timeline,
1147            } => match data_source_plan {
1148                plan::DataSourceDesc::IngestionExport {
1149                    ingestion_id,
1150                    external_reference,
1151                    details,
1152                    data_config,
1153                } => TableDataSource::DataSource {
1154                    desc: DataSourceDesc::IngestionExport {
1155                        ingestion_id,
1156                        external_reference,
1157                        details,
1158                        data_config,
1159                    },
1160                    timeline,
1161                },
1162                plan::DataSourceDesc::Webhook {
1163                    validate_using,
1164                    body_format,
1165                    headers,
1166                    cluster_id,
1167                } => TableDataSource::DataSource {
1168                    desc: DataSourceDesc::Webhook {
1169                        validate_using,
1170                        body_format,
1171                        headers,
1172                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1173                    },
1174                    timeline,
1175                },
1176                o => {
1177                    unreachable!("CREATE TABLE data source got {:?}", o)
1178                }
1179            },
1180        };
1181        let table = Table {
1182            create_sql: Some(table.create_sql),
1183            desc: table.desc,
1184            collections,
1185            conn_id: conn_id.cloned(),
1186            resolved_ids,
1187            custom_logical_compaction_window: table.compaction_window,
1188            is_retained_metrics_object: false,
1189            data_source,
1190        };
1191        let ops = vec![catalog::Op::CreateItem {
1192            id: table_id,
1193            name: name.clone(),
1194            item: CatalogItem::Table(table.clone()),
1195            owner_id: *ctx.session().current_role_id(),
1196        }];
1197
1198        let catalog_result = self
1199            .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1200                Box::pin(async move {
1201                    // The table data_source determines whether this table will be written to
1202                    // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1203                    // (e.g. a source-fed table).
1204                    let (collections, register_ts, read_policies) = match table.data_source {
1205                        TableDataSource::TableWrites { defaults: _ } => {
1206                            // Determine the initial validity for the table.
1207                            let register_ts = coord.get_local_write_ts().await.timestamp;
1208
1209                            // After acquiring `register_ts` but before using it, we need to
1210                            // be sure we're still the leader. Otherwise a new generation
1211                            // may also be trying to use `register_ts` for a different
1212                            // purpose.
1213                            //
1214                            // See database-issues#8273.
1215                            coord
1216                                .catalog
1217                                .confirm_leadership()
1218                                .await
1219                                .unwrap_or_terminate("unable to confirm leadership");
1220
1221                            if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1222                                coord.set_statement_execution_timestamp(id, register_ts);
1223                            }
1224
1225                            // When initially creating a table it should only have a single version.
1226                            let relation_version = RelationVersion::root();
1227                            assert_eq!(table.desc.latest_version(), relation_version);
1228                            let relation_desc = table
1229                                .desc
1230                                .at_version(RelationVersionSelector::Specific(relation_version));
1231                            // We assert above we have a single version, and thus we are the primary.
1232                            let collection_desc =
1233                                CollectionDescription::for_table(relation_desc, None);
1234                            let collections = vec![(global_id, collection_desc)];
1235
1236                            let compaction_window = table
1237                                .custom_logical_compaction_window
1238                                .unwrap_or(CompactionWindow::Default);
1239                            let read_policies =
1240                                BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1241
1242                            (collections, Some(register_ts), read_policies)
1243                        }
1244                        TableDataSource::DataSource {
1245                            desc: data_source,
1246                            timeline,
1247                        } => {
1248                            match data_source {
1249                                DataSourceDesc::IngestionExport {
1250                                    ingestion_id,
1251                                    external_reference: _,
1252                                    details,
1253                                    data_config,
1254                                } => {
1255                                    // TODO: It's a little weird that a table will be present in this
1256                                    // source status collection, we might want to split out into a separate
1257                                    // status collection.
1258                                    let source_status_item_id =
1259                                        coord.catalog().resolve_builtin_storage_collection(
1260                                            &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1261                                        );
1262                                    let status_collection_id = Some(
1263                                        coord
1264                                            .catalog()
1265                                            .get_entry(&source_status_item_id)
1266                                            .latest_global_id(),
1267                                    );
1268                                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1269                                    // this will always be a Source or a Table.
1270                                    let ingestion_id =
1271                                        coord.catalog().get_entry(&ingestion_id).latest_global_id();
1272                                    // Create the underlying collection with the latest schema from the Table.
1273                                    let collection_desc = CollectionDescription::<Timestamp> {
1274                                        desc: table
1275                                            .desc
1276                                            .at_version(RelationVersionSelector::Latest),
1277                                        data_source: DataSource::IngestionExport {
1278                                            ingestion_id,
1279                                            details,
1280                                            data_config: data_config
1281                                                .into_inline_connection(coord.catalog.state()),
1282                                        },
1283                                        since: None,
1284                                        status_collection_id,
1285                                        timeline: Some(timeline.clone()),
1286                                    };
1287
1288                                    let collections = vec![(global_id, collection_desc)];
1289                                    let read_policies = coord
1290                                        .catalog()
1291                                        .state()
1292                                        .source_compaction_windows(vec![table_id]);
1293
1294                                    (collections, None, read_policies)
1295                                }
1296                                DataSourceDesc::Webhook { .. } => {
1297                                    if let Some(url) =
1298                                        coord.catalog().state().try_get_webhook_url(&table_id)
1299                                    {
1300                                        if let Some(ctx) = ctx.as_ref() {
1301                                            ctx.session().add_notice(
1302                                                AdapterNotice::WebhookSourceCreated { url },
1303                                            )
1304                                        }
1305                                    }
1306
1307                                    // Create the underlying collection with the latest schema from the Table.
1308                                    assert_eq!(
1309                                        table.desc.latest_version(),
1310                                        RelationVersion::root(),
1311                                        "found webhook with more than 1 relation version, {:?}",
1312                                        table.desc
1313                                    );
1314                                    let desc = table.desc.latest();
1315
1316                                    let collection_desc = CollectionDescription {
1317                                        desc,
1318                                        data_source: DataSource::Webhook,
1319                                        since: None,
1320                                        status_collection_id: None,
1321                                        timeline: Some(timeline.clone()),
1322                                    };
1323                                    let collections = vec![(global_id, collection_desc)];
1324                                    let read_policies = coord
1325                                        .catalog()
1326                                        .state()
1327                                        .source_compaction_windows(vec![table_id]);
1328
1329                                    (collections, None, read_policies)
1330                                }
1331                                _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1332                            }
1333                        }
1334                    };
1335
1336                    // Create the collections.
1337                    let storage_metadata = coord.catalog.state().storage_metadata();
1338                    coord
1339                        .controller
1340                        .storage
1341                        .create_collections(storage_metadata, register_ts, collections)
1342                        .await
1343                        .unwrap_or_terminate("cannot fail to create collections");
1344
1345                    // Mark the register timestamp as completed.
1346                    if let Some(register_ts) = register_ts {
1347                        coord.apply_local_write(register_ts).await;
1348                    }
1349
1350                    // Initialize the Read Policies.
1351                    for (compaction_window, storage_policies) in read_policies {
1352                        coord
1353                            .initialize_storage_read_policies(storage_policies, compaction_window)
1354                            .await;
1355                    }
1356                })
1357            })
1358            .await;
1359
1360        match catalog_result {
1361            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1362            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1363                kind:
1364                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1365            })) if if_not_exists => {
1366                ctx.session_mut()
1367                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1368                        name: name.item,
1369                        ty: "table",
1370                    });
1371                Ok(ExecuteResponse::CreatedTable)
1372            }
1373            Err(err) => Err(err),
1374        }
1375    }
1376
1377    #[instrument]
1378    pub(super) async fn sequence_create_sink(
1379        &mut self,
1380        ctx: ExecuteContext,
1381        plan: plan::CreateSinkPlan,
1382        resolved_ids: ResolvedIds,
1383    ) {
1384        let plan::CreateSinkPlan {
1385            name,
1386            sink,
1387            with_snapshot,
1388            if_not_exists,
1389            in_cluster,
1390        } = plan;
1391
1392        // First try to allocate an ID and an OID. If either fails, we're done.
1393        let id_ts = self.get_catalog_write_ts().await;
1394        let (item_id, global_id) =
1395            return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1396
1397        let catalog_sink = Sink {
1398            create_sql: sink.create_sql,
1399            global_id,
1400            from: sink.from,
1401            connection: sink.connection,
1402            envelope: sink.envelope,
1403            version: sink.version,
1404            with_snapshot,
1405            resolved_ids,
1406            cluster_id: in_cluster,
1407        };
1408
1409        let ops = vec![catalog::Op::CreateItem {
1410            id: item_id,
1411            name: name.clone(),
1412            item: CatalogItem::Sink(catalog_sink.clone()),
1413            owner_id: *ctx.session().current_role_id(),
1414        }];
1415
1416        let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1417        if let Err(e) = self
1418            .controller
1419            .storage
1420            .check_exists(sink.from)
1421            .map_err(|e| match e {
1422                StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1423                    "{} is a {}, which cannot be exported as a sink",
1424                    from.name().item.clone(),
1425                    from.item().typ(),
1426                )),
1427                e => AdapterError::Storage(e),
1428            })
1429        {
1430            ctx.retire(Err(e));
1431            return;
1432        }
1433
1434        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1435
1436        match result {
1437            Ok(()) => {}
1438            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1439                kind:
1440                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1441            })) if if_not_exists => {
1442                ctx.session()
1443                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1444                        name: name.item,
1445                        ty: "sink",
1446                    });
1447                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1448                return;
1449            }
1450            Err(e) => {
1451                ctx.retire(Err(e));
1452                return;
1453            }
1454        };
1455
1456        self.create_storage_export(global_id, &catalog_sink)
1457            .await
1458            .unwrap_or_terminate("cannot fail to create exports");
1459
1460        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1461            .await;
1462
1463        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1464    }
1465
1466    /// Validates that a view definition does not contain any expressions that may lead to
1467    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1468    ///
1469    /// We prevent these expressions so that we can add columns to system tables without
1470    /// changing the definition of the view.
1471    ///
1472    /// Here is a bit of a hand wavy proof as to why we only need to check the
1473    /// immediate view definition for system objects and ambiguous column
1474    /// references, and not the entire dependency tree:
1475    ///
1476    ///   - A view with no object references cannot have any ambiguous column
1477    ///   references to a system object, because it has no system objects.
1478    ///   - A view with a direct reference to a system object and a * or
1479    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1480    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1481    ///   any ambiguous column references to a system object, because all column
1482    ///   references are explicitly named.
1483    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1484    ///   system object cannot have any ambiguous column references to a system
1485    ///   object, because there are no system objects in the top level view and
1486    ///   all sub-views are guaranteed to have no ambiguous column references to
1487    ///   system objects.
1488    pub(super) fn validate_system_column_references(
1489        &self,
1490        uses_ambiguous_columns: bool,
1491        depends_on: &BTreeSet<GlobalId>,
1492    ) -> Result<(), AdapterError> {
1493        if uses_ambiguous_columns
1494            && depends_on
1495                .iter()
1496                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1497        {
1498            Err(AdapterError::AmbiguousSystemColumnReference)
1499        } else {
1500            Ok(())
1501        }
1502    }
1503
1504    #[instrument]
1505    pub(super) async fn sequence_create_type(
1506        &mut self,
1507        session: &Session,
1508        plan: plan::CreateTypePlan,
1509        resolved_ids: ResolvedIds,
1510    ) -> Result<ExecuteResponse, AdapterError> {
1511        let id_ts = self.get_catalog_write_ts().await;
1512        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1513        let typ = Type {
1514            create_sql: Some(plan.typ.create_sql),
1515            global_id,
1516            desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1517            details: CatalogTypeDetails {
1518                array_id: None,
1519                typ: plan.typ.inner,
1520                pg_metadata: None,
1521            },
1522            resolved_ids,
1523        };
1524        let op = catalog::Op::CreateItem {
1525            id: item_id,
1526            name: plan.name,
1527            item: CatalogItem::Type(typ),
1528            owner_id: *session.current_role_id(),
1529        };
1530        match self.catalog_transact(Some(session), vec![op]).await {
1531            Ok(()) => Ok(ExecuteResponse::CreatedType),
1532            Err(err) => Err(err),
1533        }
1534    }
1535
1536    #[instrument]
1537    pub(super) async fn sequence_comment_on(
1538        &mut self,
1539        session: &Session,
1540        plan: plan::CommentPlan,
1541    ) -> Result<ExecuteResponse, AdapterError> {
1542        let op = catalog::Op::Comment {
1543            object_id: plan.object_id,
1544            sub_component: plan.sub_component,
1545            comment: plan.comment,
1546        };
1547        self.catalog_transact(Some(session), vec![op]).await?;
1548        Ok(ExecuteResponse::Comment)
1549    }
1550
1551    #[instrument]
1552    pub(super) async fn sequence_drop_objects(
1553        &mut self,
1554        session: &Session,
1555        plan::DropObjectsPlan {
1556            drop_ids,
1557            object_type,
1558            referenced_ids,
1559        }: plan::DropObjectsPlan,
1560    ) -> Result<ExecuteResponse, AdapterError> {
1561        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1562        let mut objects = Vec::new();
1563        for obj_id in &drop_ids {
1564            if !referenced_ids_hashset.contains(obj_id) {
1565                let object_info = ErrorMessageObjectDescription::from_id(
1566                    obj_id,
1567                    &self.catalog().for_session(session),
1568                )
1569                .to_string();
1570                objects.push(object_info);
1571            }
1572        }
1573
1574        if !objects.is_empty() {
1575            session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1576        }
1577
1578        let DropOps {
1579            ops,
1580            dropped_active_db,
1581            dropped_active_cluster,
1582            dropped_in_use_indexes,
1583        } = self.sequence_drop_common(session, drop_ids)?;
1584
1585        self.catalog_transact(Some(session), ops).await?;
1586
1587        fail::fail_point!("after_sequencer_drop_replica");
1588
1589        if dropped_active_db {
1590            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1591                name: session.vars().database().to_string(),
1592            });
1593        }
1594        if dropped_active_cluster {
1595            session.add_notice(AdapterNotice::DroppedActiveCluster {
1596                name: session.vars().cluster().to_string(),
1597            });
1598        }
1599        for dropped_in_use_index in dropped_in_use_indexes {
1600            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1601            self.metrics
1602                .optimization_notices
1603                .with_label_values(&["DroppedInUseIndex"])
1604                .inc_by(1);
1605        }
1606        Ok(ExecuteResponse::DroppedObject(object_type))
1607    }
1608
1609    fn validate_dropped_role_ownership(
1610        &self,
1611        session: &Session,
1612        dropped_roles: &BTreeMap<RoleId, &str>,
1613    ) -> Result<(), AdapterError> {
1614        fn privilege_check(
1615            privileges: &PrivilegeMap,
1616            dropped_roles: &BTreeMap<RoleId, &str>,
1617            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1618            object_id: &SystemObjectId,
1619            catalog: &ConnCatalog,
1620        ) {
1621            for privilege in privileges.all_values() {
1622                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1623                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1624                    let object_description =
1625                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1626                    dependent_objects
1627                        .entry(role_name.to_string())
1628                        .or_default()
1629                        .push(format!(
1630                            "privileges on {object_description} granted by {grantor_name}",
1631                        ));
1632                }
1633                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1634                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1635                    let object_description =
1636                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1637                    dependent_objects
1638                        .entry(role_name.to_string())
1639                        .or_default()
1640                        .push(format!(
1641                            "privileges granted on {object_description} to {grantee_name}"
1642                        ));
1643                }
1644            }
1645        }
1646
1647        let catalog = self.catalog().for_session(session);
1648        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1649        for entry in self.catalog.entries() {
1650            let id = SystemObjectId::Object(entry.id().into());
1651            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1652                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1653                dependent_objects
1654                    .entry(role_name.to_string())
1655                    .or_default()
1656                    .push(format!("owner of {object_description}"));
1657            }
1658            privilege_check(
1659                entry.privileges(),
1660                dropped_roles,
1661                &mut dependent_objects,
1662                &id,
1663                &catalog,
1664            );
1665        }
1666        for database in self.catalog.databases() {
1667            let database_id = SystemObjectId::Object(database.id().into());
1668            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1669                let object_description =
1670                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1671                dependent_objects
1672                    .entry(role_name.to_string())
1673                    .or_default()
1674                    .push(format!("owner of {object_description}"));
1675            }
1676            privilege_check(
1677                &database.privileges,
1678                dropped_roles,
1679                &mut dependent_objects,
1680                &database_id,
1681                &catalog,
1682            );
1683            for schema in database.schemas_by_id.values() {
1684                let schema_id = SystemObjectId::Object(
1685                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1686                );
1687                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1688                    let object_description =
1689                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1690                    dependent_objects
1691                        .entry(role_name.to_string())
1692                        .or_default()
1693                        .push(format!("owner of {object_description}"));
1694                }
1695                privilege_check(
1696                    &schema.privileges,
1697                    dropped_roles,
1698                    &mut dependent_objects,
1699                    &schema_id,
1700                    &catalog,
1701                );
1702            }
1703        }
1704        for cluster in self.catalog.clusters() {
1705            let cluster_id = SystemObjectId::Object(cluster.id().into());
1706            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1707                let object_description =
1708                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1709                dependent_objects
1710                    .entry(role_name.to_string())
1711                    .or_default()
1712                    .push(format!("owner of {object_description}"));
1713            }
1714            privilege_check(
1715                &cluster.privileges,
1716                dropped_roles,
1717                &mut dependent_objects,
1718                &cluster_id,
1719                &catalog,
1720            );
1721            for replica in cluster.replicas() {
1722                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1723                    let replica_id =
1724                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1725                    let object_description =
1726                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1727                    dependent_objects
1728                        .entry(role_name.to_string())
1729                        .or_default()
1730                        .push(format!("owner of {object_description}"));
1731                }
1732            }
1733        }
1734        privilege_check(
1735            self.catalog().system_privileges(),
1736            dropped_roles,
1737            &mut dependent_objects,
1738            &SystemObjectId::System,
1739            &catalog,
1740        );
1741        for (default_privilege_object, default_privilege_acl_items) in
1742            self.catalog.default_privileges()
1743        {
1744            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1745                dependent_objects
1746                    .entry(role_name.to_string())
1747                    .or_default()
1748                    .push(format!(
1749                        "default privileges on {}S created by {}",
1750                        default_privilege_object.object_type, role_name
1751                    ));
1752            }
1753            for default_privilege_acl_item in default_privilege_acl_items {
1754                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1755                    dependent_objects
1756                        .entry(role_name.to_string())
1757                        .or_default()
1758                        .push(format!(
1759                            "default privileges on {}S granted to {}",
1760                            default_privilege_object.object_type, role_name
1761                        ));
1762                }
1763            }
1764        }
1765
1766        if !dependent_objects.is_empty() {
1767            Err(AdapterError::DependentObject(dependent_objects))
1768        } else {
1769            Ok(())
1770        }
1771    }
1772
1773    #[instrument]
1774    pub(super) async fn sequence_drop_owned(
1775        &mut self,
1776        session: &Session,
1777        plan: plan::DropOwnedPlan,
1778    ) -> Result<ExecuteResponse, AdapterError> {
1779        for role_id in &plan.role_ids {
1780            self.catalog().ensure_not_reserved_role(role_id)?;
1781        }
1782
1783        let mut privilege_revokes = plan.privilege_revokes;
1784
1785        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1786        let session_catalog = self.catalog().for_session(session);
1787        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1788            && !session.is_superuser()
1789        {
1790            // Obtain all roles that the current session is a member of.
1791            let role_membership =
1792                session_catalog.collect_role_membership(session.current_role_id());
1793            let invalid_revokes: BTreeSet<_> = privilege_revokes
1794                .extract_if(.., |(_, privilege)| {
1795                    !role_membership.contains(&privilege.grantor)
1796                })
1797                .map(|(object_id, _)| object_id)
1798                .collect();
1799            for invalid_revoke in invalid_revokes {
1800                let object_description =
1801                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1802                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1803            }
1804        }
1805
1806        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1807            catalog::Op::UpdatePrivilege {
1808                target_id: object_id,
1809                privilege,
1810                variant: UpdatePrivilegeVariant::Revoke,
1811            }
1812        });
1813        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1814            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1815                privilege_object,
1816                privilege_acl_item,
1817                variant: UpdatePrivilegeVariant::Revoke,
1818            },
1819        );
1820        let DropOps {
1821            ops: drop_ops,
1822            dropped_active_db,
1823            dropped_active_cluster,
1824            dropped_in_use_indexes,
1825        } = self.sequence_drop_common(session, plan.drop_ids)?;
1826
1827        let ops = privilege_revoke_ops
1828            .chain(default_privilege_revoke_ops)
1829            .chain(drop_ops.into_iter())
1830            .collect();
1831
1832        self.catalog_transact(Some(session), ops).await?;
1833
1834        if dropped_active_db {
1835            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1836                name: session.vars().database().to_string(),
1837            });
1838        }
1839        if dropped_active_cluster {
1840            session.add_notice(AdapterNotice::DroppedActiveCluster {
1841                name: session.vars().cluster().to_string(),
1842            });
1843        }
1844        for dropped_in_use_index in dropped_in_use_indexes {
1845            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1846        }
1847        Ok(ExecuteResponse::DroppedOwned)
1848    }
1849
1850    fn sequence_drop_common(
1851        &self,
1852        session: &Session,
1853        ids: Vec<ObjectId>,
1854    ) -> Result<DropOps, AdapterError> {
1855        let mut dropped_active_db = false;
1856        let mut dropped_active_cluster = false;
1857        let mut dropped_in_use_indexes = Vec::new();
1858        let mut dropped_roles = BTreeMap::new();
1859        let mut dropped_databases = BTreeSet::new();
1860        let mut dropped_schemas = BTreeSet::new();
1861        // Dropping either the group role or the member role of a role membership will trigger a
1862        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1863        // role membership twice.
1864        let mut role_revokes = BTreeSet::new();
1865        // Dropping a database or a schema will revoke all default roles associated with that
1866        // database or schema.
1867        let mut default_privilege_revokes = BTreeSet::new();
1868
1869        // Clusters we're dropping
1870        let mut clusters_to_drop = BTreeSet::new();
1871
1872        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1873        for id in &ids {
1874            match id {
1875                ObjectId::Database(id) => {
1876                    let name = self.catalog().get_database(id).name();
1877                    if name == session.vars().database() {
1878                        dropped_active_db = true;
1879                    }
1880                    dropped_databases.insert(id);
1881                }
1882                ObjectId::Schema((_, spec)) => {
1883                    if let SchemaSpecifier::Id(id) = spec {
1884                        dropped_schemas.insert(id);
1885                    }
1886                }
1887                ObjectId::Cluster(id) => {
1888                    clusters_to_drop.insert(*id);
1889                    if let Some(active_id) = self
1890                        .catalog()
1891                        .active_cluster(session)
1892                        .ok()
1893                        .map(|cluster| cluster.id())
1894                    {
1895                        if id == &active_id {
1896                            dropped_active_cluster = true;
1897                        }
1898                    }
1899                }
1900                ObjectId::Role(id) => {
1901                    let role = self.catalog().get_role(id);
1902                    let name = role.name();
1903                    dropped_roles.insert(*id, name);
1904                    // We must revoke all role memberships that the dropped roles belongs to.
1905                    for (group_id, grantor_id) in &role.membership.map {
1906                        role_revokes.insert((*group_id, *id, *grantor_id));
1907                    }
1908                }
1909                ObjectId::Item(id) => {
1910                    if let Some(index) = self.catalog().get_entry(id).index() {
1911                        let humanizer = self.catalog().for_session(session);
1912                        let dependants = self
1913                            .controller
1914                            .compute
1915                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1916                            .ok()
1917                            .into_iter()
1918                            .flatten()
1919                            .filter(|dependant_id| {
1920                                // Transient Ids belong to Peeks. We are not interested for now in
1921                                // peeks depending on a dropped index.
1922                                // TODO: show a different notice in this case. Something like
1923                                // "There is an in-progress ad hoc SELECT that uses the dropped
1924                                // index. The resources used by the index will be freed when all
1925                                // such SELECTs complete."
1926                                if dependant_id.is_transient() {
1927                                    return false;
1928                                }
1929                                // The item should exist, but don't panic if it doesn't.
1930                                let Some(dependent_id) = humanizer
1931                                    .try_get_item_by_global_id(dependant_id)
1932                                    .map(|item| item.id())
1933                                else {
1934                                    return false;
1935                                };
1936                                // If the dependent object is also being dropped, then there is no
1937                                // problem, so we don't want a notice.
1938                                !ids_set.contains(&ObjectId::Item(dependent_id))
1939                            })
1940                            .flat_map(|dependant_id| {
1941                                // If we are not able to find a name for this ID it probably means
1942                                // we have already dropped the compute collection, in which case we
1943                                // can ignore it.
1944                                humanizer.humanize_id(dependant_id)
1945                            })
1946                            .collect_vec();
1947                        if !dependants.is_empty() {
1948                            dropped_in_use_indexes.push(DroppedInUseIndex {
1949                                index_name: humanizer
1950                                    .humanize_id(index.global_id())
1951                                    .unwrap_or_else(|| id.to_string()),
1952                                dependant_objects: dependants,
1953                            });
1954                        }
1955                    }
1956                }
1957                _ => {}
1958            }
1959        }
1960
1961        for id in &ids {
1962            match id {
1963                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1964                // unless they are internal replicas, which exist outside the scope
1965                // of managed clusters.
1966                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1967                    if !clusters_to_drop.contains(cluster_id) {
1968                        let cluster = self.catalog.get_cluster(*cluster_id);
1969                        if cluster.is_managed() {
1970                            let replica =
1971                                cluster.replica(*replica_id).expect("Catalog out of sync");
1972                            if !replica.config.location.internal() {
1973                                coord_bail!("cannot drop replica of managed cluster");
1974                            }
1975                        }
1976                    }
1977                }
1978                _ => {}
1979            }
1980        }
1981
1982        for role_id in dropped_roles.keys() {
1983            self.catalog().ensure_not_reserved_role(role_id)?;
1984        }
1985        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1986        // If any role is a member of a dropped role, then we must revoke that membership.
1987        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1988        for role in self.catalog().user_roles() {
1989            for dropped_role_id in
1990                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1991            {
1992                role_revokes.insert((
1993                    **dropped_role_id,
1994                    role.id(),
1995                    *role
1996                        .membership
1997                        .map
1998                        .get(*dropped_role_id)
1999                        .expect("included in keys above"),
2000                ));
2001            }
2002        }
2003
2004        for (default_privilege_object, default_privilege_acls) in
2005            self.catalog().default_privileges()
2006        {
2007            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
2008                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
2009            {
2010                for default_privilege_acl in default_privilege_acls {
2011                    default_privilege_revokes.insert((
2012                        default_privilege_object.clone(),
2013                        default_privilege_acl.clone(),
2014                    ));
2015                }
2016            }
2017        }
2018
2019        let ops = role_revokes
2020            .into_iter()
2021            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2022                role_id,
2023                member_id,
2024                grantor_id,
2025            })
2026            .chain(default_privilege_revokes.into_iter().map(
2027                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2028                    privilege_object,
2029                    privilege_acl_item,
2030                    variant: UpdatePrivilegeVariant::Revoke,
2031                },
2032            ))
2033            .chain(iter::once(catalog::Op::DropObjects(
2034                ids.into_iter()
2035                    .map(DropObjectInfo::manual_drop_from_object_id)
2036                    .collect(),
2037            )))
2038            .collect();
2039
2040        Ok(DropOps {
2041            ops,
2042            dropped_active_db,
2043            dropped_active_cluster,
2044            dropped_in_use_indexes,
2045        })
2046    }
2047
2048    pub(super) fn sequence_explain_schema(
2049        &self,
2050        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2051    ) -> Result<ExecuteResponse, AdapterError> {
2052        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2053            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2054        })?;
2055
2056        let json_string = json_string(&json_value);
2057        let row = Row::pack_slice(&[Datum::String(&json_string)]);
2058        Ok(Self::send_immediate_rows(row))
2059    }
2060
2061    pub(super) fn sequence_show_all_variables(
2062        &self,
2063        session: &Session,
2064    ) -> Result<ExecuteResponse, AdapterError> {
2065        let mut rows = viewable_variables(self.catalog().state(), session)
2066            .map(|v| (v.name(), v.value(), v.description()))
2067            .collect::<Vec<_>>();
2068        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2069
2070        // TODO(parkmycar): Pack all of these into a single RowCollection.
2071        let rows: Vec<_> = rows
2072            .into_iter()
2073            .map(|(name, val, desc)| {
2074                Row::pack_slice(&[
2075                    Datum::String(name),
2076                    Datum::String(&val),
2077                    Datum::String(desc),
2078                ])
2079            })
2080            .collect();
2081        Ok(Self::send_immediate_rows(rows))
2082    }
2083
2084    pub(super) fn sequence_show_variable(
2085        &self,
2086        session: &Session,
2087        plan: plan::ShowVariablePlan,
2088    ) -> Result<ExecuteResponse, AdapterError> {
2089        if &plan.name == SCHEMA_ALIAS {
2090            let schemas = self.catalog.resolve_search_path(session);
2091            let schema = schemas.first();
2092            return match schema {
2093                Some((database_spec, schema_spec)) => {
2094                    let schema_name = &self
2095                        .catalog
2096                        .get_schema(database_spec, schema_spec, session.conn_id())
2097                        .name()
2098                        .schema;
2099                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
2100                    Ok(Self::send_immediate_rows(row))
2101                }
2102                None => {
2103                    if session.vars().current_object_missing_warnings() {
2104                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2105                            search_path: session
2106                                .vars()
2107                                .search_path()
2108                                .into_iter()
2109                                .map(|schema| schema.to_string())
2110                                .collect(),
2111                        });
2112                    }
2113                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2114                }
2115            };
2116        }
2117
2118        let variable = session
2119            .vars()
2120            .get(self.catalog().system_config(), &plan.name)
2121            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2122
2123        // In lieu of plumbing the user to all system config functions, just check that the var is
2124        // visible.
2125        variable.visible(session.user(), self.catalog().system_config())?;
2126
2127        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2128        if variable.name() == vars::DATABASE.name()
2129            && matches!(
2130                self.catalog().resolve_database(&variable.value()),
2131                Err(CatalogError::UnknownDatabase(_))
2132            )
2133            && session.vars().current_object_missing_warnings()
2134        {
2135            let name = variable.value();
2136            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2137        } else if variable.name() == vars::CLUSTER.name()
2138            && matches!(
2139                self.catalog().resolve_cluster(&variable.value()),
2140                Err(CatalogError::UnknownCluster(_))
2141            )
2142            && session.vars().current_object_missing_warnings()
2143        {
2144            let name = variable.value();
2145            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2146        }
2147        Ok(Self::send_immediate_rows(row))
2148    }
2149
2150    #[instrument]
2151    pub(super) async fn sequence_inspect_shard(
2152        &self,
2153        session: &Session,
2154        plan: plan::InspectShardPlan,
2155    ) -> Result<ExecuteResponse, AdapterError> {
2156        // TODO: Not thrilled about this rbac special case here, but probably
2157        // sufficient for now.
2158        if !session.user().is_internal() {
2159            return Err(AdapterError::Unauthorized(
2160                rbac::UnauthorizedError::MzSystem {
2161                    action: "inspect".into(),
2162                },
2163            ));
2164        }
2165        let state = self
2166            .controller
2167            .storage
2168            .inspect_persist_state(plan.id)
2169            .await?;
2170        let jsonb = Jsonb::from_serde_json(state)?;
2171        Ok(Self::send_immediate_rows(jsonb.into_row()))
2172    }
2173
2174    #[instrument]
2175    pub(super) fn sequence_set_variable(
2176        &self,
2177        session: &mut Session,
2178        plan: plan::SetVariablePlan,
2179    ) -> Result<ExecuteResponse, AdapterError> {
2180        let (name, local) = (plan.name, plan.local);
2181        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2182            self.validate_set_isolation_level(session)?;
2183        }
2184        if &name == vars::CLUSTER.name() {
2185            self.validate_set_cluster(session)?;
2186        }
2187
2188        let vars = session.vars_mut();
2189        let values = match plan.value {
2190            plan::VariableValue::Default => None,
2191            plan::VariableValue::Values(values) => Some(values),
2192        };
2193
2194        match values {
2195            Some(values) => {
2196                vars.set(
2197                    self.catalog().system_config(),
2198                    &name,
2199                    VarInput::SqlSet(&values),
2200                    local,
2201                )?;
2202
2203                let vars = session.vars();
2204
2205                // Emit a warning when deprecated variables are used.
2206                // TODO(database-issues#8069) remove this after sufficient time has passed
2207                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2208                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2209                } else if name == vars::CLUSTER.name()
2210                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2211                {
2212                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2213                }
2214
2215                // Database or cluster value does not correspond to a catalog item.
2216                if name.as_str() == vars::DATABASE.name()
2217                    && matches!(
2218                        self.catalog().resolve_database(vars.database()),
2219                        Err(CatalogError::UnknownDatabase(_))
2220                    )
2221                    && session.vars().current_object_missing_warnings()
2222                {
2223                    let name = vars.database().to_string();
2224                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2225                } else if name.as_str() == vars::CLUSTER.name()
2226                    && matches!(
2227                        self.catalog().resolve_cluster(vars.cluster()),
2228                        Err(CatalogError::UnknownCluster(_))
2229                    )
2230                    && session.vars().current_object_missing_warnings()
2231                {
2232                    let name = vars.cluster().to_string();
2233                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2234                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2235                    let v = values.into_first().to_lowercase();
2236                    if v == IsolationLevel::ReadUncommitted.as_str()
2237                        || v == IsolationLevel::ReadCommitted.as_str()
2238                        || v == IsolationLevel::RepeatableRead.as_str()
2239                    {
2240                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2241                            isolation_level: v,
2242                        });
2243                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2244                        session.add_notice(AdapterNotice::StrongSessionSerializable);
2245                    }
2246                }
2247            }
2248            None => vars.reset(self.catalog().system_config(), &name, local)?,
2249        }
2250
2251        Ok(ExecuteResponse::SetVariable { name, reset: false })
2252    }
2253
2254    pub(super) fn sequence_reset_variable(
2255        &self,
2256        session: &mut Session,
2257        plan: plan::ResetVariablePlan,
2258    ) -> Result<ExecuteResponse, AdapterError> {
2259        let name = plan.name;
2260        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2261            self.validate_set_isolation_level(session)?;
2262        }
2263        if &name == vars::CLUSTER.name() {
2264            self.validate_set_cluster(session)?;
2265        }
2266        session
2267            .vars_mut()
2268            .reset(self.catalog().system_config(), &name, false)?;
2269        Ok(ExecuteResponse::SetVariable { name, reset: true })
2270    }
2271
2272    pub(super) fn sequence_set_transaction(
2273        &self,
2274        session: &mut Session,
2275        plan: plan::SetTransactionPlan,
2276    ) -> Result<ExecuteResponse, AdapterError> {
2277        // TODO(jkosh44) Only supports isolation levels for now.
2278        for mode in plan.modes {
2279            match mode {
2280                TransactionMode::AccessMode(_) => {
2281                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2282                }
2283                TransactionMode::IsolationLevel(isolation_level) => {
2284                    self.validate_set_isolation_level(session)?;
2285
2286                    session.vars_mut().set(
2287                        self.catalog().system_config(),
2288                        TRANSACTION_ISOLATION_VAR_NAME,
2289                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2290                        plan.local,
2291                    )?
2292                }
2293            }
2294        }
2295        Ok(ExecuteResponse::SetVariable {
2296            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2297            reset: false,
2298        })
2299    }
2300
2301    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2302        if session.transaction().contains_ops() {
2303            Err(AdapterError::InvalidSetIsolationLevel)
2304        } else {
2305            Ok(())
2306        }
2307    }
2308
2309    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2310        if session.transaction().contains_ops() {
2311            Err(AdapterError::InvalidSetCluster)
2312        } else {
2313            Ok(())
2314        }
2315    }
2316
2317    #[instrument]
2318    pub(super) async fn sequence_end_transaction(
2319        &mut self,
2320        mut ctx: ExecuteContext,
2321        mut action: EndTransactionAction,
2322    ) {
2323        // If the transaction has failed, we can only rollback.
2324        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2325            (&action, ctx.session().transaction())
2326        {
2327            action = EndTransactionAction::Rollback;
2328        }
2329        let response = match action {
2330            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2331                params: BTreeMap::new(),
2332            }),
2333            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2334                params: BTreeMap::new(),
2335            }),
2336        };
2337
2338        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2339
2340        let (response, action) = match result {
2341            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2342                (response, action)
2343            }
2344            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2345                // Make sure we have the correct set of write locks for this transaction.
2346                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2347                // transactions.
2348                let validated_locks = match write_lock_guards {
2349                    None => None,
2350                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2351                        Ok(locks) => Some(locks),
2352                        Err(missing) => {
2353                            tracing::error!(?missing, "programming error, missing write locks");
2354                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2355                        }
2356                    },
2357                };
2358
2359                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2360                for WriteOp { id, rows } in writes {
2361                    let total_rows = collected_writes.entry(id).or_default();
2362                    total_rows.push(rows);
2363                }
2364
2365                self.submit_write(PendingWriteTxn::User {
2366                    span: Span::current(),
2367                    writes: collected_writes,
2368                    write_locks: validated_locks,
2369                    pending_txn: PendingTxn {
2370                        ctx,
2371                        response,
2372                        action,
2373                    },
2374                });
2375                return;
2376            }
2377            Ok((
2378                Some(TransactionOps::Peeks {
2379                    determination,
2380                    requires_linearization: RequireLinearization::Required,
2381                    ..
2382                }),
2383                _,
2384            )) if ctx.session().vars().transaction_isolation()
2385                == &IsolationLevel::StrictSerializable =>
2386            {
2387                let conn_id = ctx.session().conn_id().clone();
2388                let pending_read_txn = PendingReadTxn {
2389                    txn: PendingRead::Read {
2390                        txn: PendingTxn {
2391                            ctx,
2392                            response,
2393                            action,
2394                        },
2395                    },
2396                    timestamp_context: determination.timestamp_context,
2397                    created: Instant::now(),
2398                    num_requeues: 0,
2399                    otel_ctx: OpenTelemetryContext::obtain(),
2400                };
2401                self.strict_serializable_reads_tx
2402                    .send((conn_id, pending_read_txn))
2403                    .expect("sending to strict_serializable_reads_tx cannot fail");
2404                return;
2405            }
2406            Ok((
2407                Some(TransactionOps::Peeks {
2408                    determination,
2409                    requires_linearization: RequireLinearization::Required,
2410                    ..
2411                }),
2412                _,
2413            )) if ctx.session().vars().transaction_isolation()
2414                == &IsolationLevel::StrongSessionSerializable =>
2415            {
2416                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2417                    ctx.session_mut()
2418                        .ensure_timestamp_oracle(timeline.clone())
2419                        .apply_write(*ts);
2420                }
2421                (response, action)
2422            }
2423            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2424                self.internal_cmd_tx
2425                    .send(Message::ExecuteSingleStatementTransaction {
2426                        ctx,
2427                        otel_ctx: OpenTelemetryContext::obtain(),
2428                        stmt,
2429                        params,
2430                    })
2431                    .expect("must send");
2432                return;
2433            }
2434            Ok((_, _)) => (response, action),
2435            Err(err) => (Err(err), EndTransactionAction::Rollback),
2436        };
2437        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2438        // Append any parameters that changed to the response.
2439        let response = response.map(|mut r| {
2440            r.extend_params(changed);
2441            ExecuteResponse::from(r)
2442        });
2443
2444        ctx.retire(response);
2445    }
2446
2447    #[instrument]
2448    async fn sequence_end_transaction_inner(
2449        &mut self,
2450        ctx: &mut ExecuteContext,
2451        action: EndTransactionAction,
2452    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2453        let txn = self.clear_transaction(ctx.session_mut()).await;
2454
2455        if let EndTransactionAction::Commit = action {
2456            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2457                match &mut ops {
2458                    TransactionOps::Writes(writes) => {
2459                        for WriteOp { id, .. } in &mut writes.iter() {
2460                            // Re-verify this id exists.
2461                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2462                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2463                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2464                                        CatalogError::UnknownItem(id.to_string()),
2465                                    ),
2466                                })
2467                            })?;
2468                        }
2469
2470                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2471                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2472                    }
2473                    TransactionOps::DDL {
2474                        ops,
2475                        state: _,
2476                        side_effects,
2477                        revision,
2478                    } => {
2479                        // Make sure our catalog hasn't changed.
2480                        if *revision != self.catalog().transient_revision() {
2481                            return Err(AdapterError::DDLTransactionRace);
2482                        }
2483                        // Commit all of our queued ops.
2484                        let ops = std::mem::take(ops);
2485                        let side_effects = std::mem::take(side_effects);
2486                        self.catalog_transact_with_side_effects(
2487                            Some(ctx),
2488                            ops,
2489                            move |a, mut ctx| {
2490                                Box::pin(async move {
2491                                    for side_effect in side_effects {
2492                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2493                                    }
2494                                })
2495                            },
2496                        )
2497                        .await?;
2498                    }
2499                    _ => (),
2500                }
2501                return Ok((Some(ops), write_lock_guards));
2502            }
2503        }
2504
2505        Ok((None, None))
2506    }
2507
2508    pub(super) async fn sequence_side_effecting_func(
2509        &mut self,
2510        ctx: ExecuteContext,
2511        plan: SideEffectingFunc,
2512    ) {
2513        match plan {
2514            SideEffectingFunc::PgCancelBackend { connection_id } => {
2515                if ctx.session().conn_id().unhandled() == connection_id {
2516                    // As a special case, if we're canceling ourselves, we send
2517                    // back a canceled resposne to the client issuing the query,
2518                    // and so we need to do no further processing of the cancel.
2519                    ctx.retire(Err(AdapterError::Canceled));
2520                    return;
2521                }
2522
2523                let res = if let Some((id_handle, _conn_meta)) =
2524                    self.active_conns.get_key_value(&connection_id)
2525                {
2526                    // check_plan already verified role membership.
2527                    self.handle_privileged_cancel(id_handle.clone()).await;
2528                    Datum::True
2529                } else {
2530                    Datum::False
2531                };
2532                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2533            }
2534        }
2535    }
2536
2537    /// Checks to see if the session needs a real time recency timestamp and if so returns
2538    /// a future that will return the timestamp.
2539    pub(super) async fn determine_real_time_recent_timestamp(
2540        &self,
2541        session: &Session,
2542        source_ids: impl Iterator<Item = CatalogItemId>,
2543    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2544    {
2545        let vars = session.vars();
2546
2547        // Ideally this logic belongs inside of
2548        // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2549        // logic in there would make it extremely difficult and inconvenient to pull the waiting off
2550        // of the main coord thread.
2551        let r = if vars.real_time_recency()
2552            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2553            && !session.contains_read_timestamp()
2554        {
2555            // Find all dependencies transitively because we need to ensure that
2556            // RTR queries determine the timestamp from the sources' (i.e.
2557            // storage objects that ingest data from external systems) remap
2558            // data. We "cheat" a little bit and filter out any IDs that aren't
2559            // user objects because we know they are not a RTR source.
2560            let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2561            // If none of the sources are user objects, we don't need to provide
2562            // a RTR timestamp.
2563            if to_visit.is_empty() {
2564                return Ok(None);
2565            }
2566
2567            let mut timestamp_objects = BTreeSet::new();
2568
2569            while let Some(id) = to_visit.pop_front() {
2570                timestamp_objects.insert(id);
2571                to_visit.extend(
2572                    self.catalog()
2573                        .get_entry(&id)
2574                        .uses()
2575                        .into_iter()
2576                        .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2577                );
2578            }
2579            let timestamp_objects = timestamp_objects
2580                .into_iter()
2581                .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2582                .collect();
2583
2584            let r = self
2585                .controller
2586                .determine_real_time_recent_timestamp(
2587                    timestamp_objects,
2588                    *vars.real_time_recency_timeout(),
2589                )
2590                .await?;
2591
2592            Some(r)
2593        } else {
2594            None
2595        };
2596
2597        Ok(r)
2598    }
2599
2600    #[instrument]
2601    pub(super) async fn sequence_explain_plan(
2602        &mut self,
2603        ctx: ExecuteContext,
2604        plan: plan::ExplainPlanPlan,
2605        target_cluster: TargetCluster,
2606    ) {
2607        match &plan.explainee {
2608            plan::Explainee::Statement(stmt) => match stmt {
2609                plan::ExplaineeStatement::CreateView { .. } => {
2610                    self.explain_create_view(ctx, plan).await;
2611                }
2612                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2613                    self.explain_create_materialized_view(ctx, plan).await;
2614                }
2615                plan::ExplaineeStatement::CreateIndex { .. } => {
2616                    self.explain_create_index(ctx, plan).await;
2617                }
2618                plan::ExplaineeStatement::Select { .. } => {
2619                    self.explain_peek(ctx, plan, target_cluster).await;
2620                }
2621            },
2622            plan::Explainee::View(_) => {
2623                let result = self.explain_view(&ctx, plan);
2624                ctx.retire(result);
2625            }
2626            plan::Explainee::MaterializedView(_) => {
2627                let result = self.explain_materialized_view(&ctx, plan);
2628                ctx.retire(result);
2629            }
2630            plan::Explainee::Index(_) => {
2631                let result = self.explain_index(&ctx, plan);
2632                ctx.retire(result);
2633            }
2634            plan::Explainee::ReplanView(_) => {
2635                self.explain_replan_view(ctx, plan).await;
2636            }
2637            plan::Explainee::ReplanMaterializedView(_) => {
2638                self.explain_replan_materialized_view(ctx, plan).await;
2639            }
2640            plan::Explainee::ReplanIndex(_) => {
2641                self.explain_replan_index(ctx, plan).await;
2642            }
2643        };
2644    }
2645
2646    pub(super) async fn sequence_explain_pushdown(
2647        &mut self,
2648        ctx: ExecuteContext,
2649        plan: plan::ExplainPushdownPlan,
2650        target_cluster: TargetCluster,
2651    ) {
2652        match plan.explainee {
2653            Explainee::Statement(ExplaineeStatement::Select {
2654                broken: false,
2655                plan,
2656                desc: _,
2657            }) => {
2658                let stage = return_if_err!(
2659                    self.peek_validate(
2660                        ctx.session(),
2661                        plan,
2662                        target_cluster,
2663                        None,
2664                        ExplainContext::Pushdown,
2665                        Some(ctx.session().vars().max_query_result_size()),
2666                    ),
2667                    ctx
2668                );
2669                self.sequence_staged(ctx, Span::current(), stage).await;
2670            }
2671            Explainee::MaterializedView(item_id) => {
2672                self.explain_pushdown_materialized_view(ctx, item_id).await;
2673            }
2674            _ => {
2675                ctx.retire(Err(AdapterError::Unsupported(
2676                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2677                )));
2678            }
2679        };
2680    }
2681
2682    async fn render_explain_pushdown(
2683        &self,
2684        ctx: ExecuteContext,
2685        as_of: Antichain<Timestamp>,
2686        mz_now: ResultSpec<'static>,
2687        read_holds: Option<ReadHolds<Timestamp>>,
2688        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2689    ) {
2690        let fut = self
2691            .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2692            .await;
2693        task::spawn(|| "render explain pushdown", async move {
2694            // Transfer the necessary read holds over to the background task
2695            let _read_holds = read_holds;
2696            let res = fut.await;
2697            ctx.retire(res);
2698        });
2699    }
2700
2701    async fn render_explain_pushdown_prepare<
2702        I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2703    >(
2704        &self,
2705        session: &Session,
2706        as_of: Antichain<Timestamp>,
2707        mz_now: ResultSpec<'static>,
2708        imports: I,
2709    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2710        let explain_timeout = *session.vars().statement_timeout();
2711        let mut futures = FuturesOrdered::new();
2712        for (id, mfp) in imports {
2713            let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2714            let full_name = self
2715                .catalog
2716                .for_session(session)
2717                .resolve_full_name(&catalog_entry.name);
2718            let name = format!("{}", full_name);
2719            let relation_desc = catalog_entry
2720                .desc_opt()
2721                .expect("source should have a proper desc")
2722                .into_owned();
2723            let stats_future = self
2724                .controller
2725                .storage_collections
2726                .snapshot_parts_stats(id, as_of.clone())
2727                .await;
2728
2729            let mz_now = mz_now.clone();
2730            // These futures may block if the source is not yet readable at the as-of;
2731            // stash them in `futures` and only block on them in a separate task.
2732            futures.push_back(async move {
2733                let snapshot_stats = match stats_future.await {
2734                    Ok(stats) => stats,
2735                    Err(e) => return Err(e),
2736                };
2737                let mut total_bytes = 0;
2738                let mut total_parts = 0;
2739                let mut selected_bytes = 0;
2740                let mut selected_parts = 0;
2741                for SnapshotPartStats {
2742                    encoded_size_bytes: bytes,
2743                    stats,
2744                } in &snapshot_stats.parts
2745                {
2746                    let bytes = u64::cast_from(*bytes);
2747                    total_bytes += bytes;
2748                    total_parts += 1u64;
2749                    let selected = match stats {
2750                        None => true,
2751                        Some(stats) => {
2752                            let stats = stats.decode();
2753                            let stats = RelationPartStats::new(
2754                                name.as_str(),
2755                                &snapshot_stats.metrics.pushdown.part_stats,
2756                                &relation_desc,
2757                                &stats,
2758                            );
2759                            stats.may_match_mfp(mz_now.clone(), &mfp)
2760                        }
2761                    };
2762
2763                    if selected {
2764                        selected_bytes += bytes;
2765                        selected_parts += 1u64;
2766                    }
2767                }
2768                Ok(Row::pack_slice(&[
2769                    name.as_str().into(),
2770                    total_bytes.into(),
2771                    selected_bytes.into(),
2772                    total_parts.into(),
2773                    selected_parts.into(),
2774                ]))
2775            });
2776        }
2777
2778        let fut = async move {
2779            match tokio::time::timeout(
2780                explain_timeout,
2781                futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2782            )
2783            .await
2784            {
2785                Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2786                    rows: Box::new(rows.into_row_iter()),
2787                }),
2788                Ok(Err(err)) => Err(err.into()),
2789                Err(_) => Err(AdapterError::StatementTimeout),
2790            }
2791        };
2792        fut
2793    }
2794
2795    #[instrument]
2796    pub(super) async fn sequence_insert(
2797        &mut self,
2798        mut ctx: ExecuteContext,
2799        plan: plan::InsertPlan,
2800    ) {
2801        // Normally, this would get checked when trying to add "write ops" to
2802        // the transaction but we go down diverging paths below, based on
2803        // whether the INSERT is only constant values or not.
2804        //
2805        // For the non-constant case we sequence an implicit read-then-write,
2806        // which messes with the transaction ops and would allow an implicit
2807        // read-then-write to sneak into a read-only transaction.
2808        if !ctx.session_mut().transaction().allows_writes() {
2809            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2810            return;
2811        }
2812
2813        // The structure of this code originates from a time where
2814        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2815        // optimized `MirRelationExpr`.
2816        //
2817        // Ideally, we would like to make the `selection.as_const().is_some()`
2818        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2819        // are planned as a Wrap($n, $vals) call, so until we can reduce
2820        // HirRelationExpr this will always returns false.
2821        //
2822        // Unfortunately, hitting the default path of the match below also
2823        // causes a lot of tests to fail, so we opted to go with the extra
2824        // `plan.values.clone()` statements when producing the `optimized_mir`
2825        // and re-optimize the values in the `sequence_read_then_write` call.
2826        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2827            // We don't perform any optimizations on an expression that is already
2828            // a constant for writes, as we want to maximize bulk-insert throughput.
2829            let expr = return_if_err!(
2830                plan.values
2831                    .clone()
2832                    .lower(self.catalog().system_config(), None),
2833                ctx
2834            );
2835            OptimizedMirRelationExpr(expr)
2836        } else {
2837            // Collect optimizer parameters.
2838            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2839
2840            // (`optimize::view::Optimizer` has a special case for constant queries.)
2841            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2842
2843            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2844            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2845        };
2846
2847        match optimized_mir.into_inner() {
2848            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2849                let catalog = self.owned_catalog();
2850                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2851                    let result =
2852                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2853                    ctx.retire(result);
2854                });
2855            }
2856            // All non-constant values must be planned as read-then-writes.
2857            _ => {
2858                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2859                    Some(table) => {
2860                        let fullname = self
2861                            .catalog()
2862                            .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2863                        // Inserts always occur at the latest version of the table.
2864                        table
2865                            .desc_latest(&fullname)
2866                            .expect("desc called on table")
2867                            .arity()
2868                    }
2869                    None => {
2870                        ctx.retire(Err(AdapterError::Catalog(
2871                            mz_catalog::memory::error::Error {
2872                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2873                                    CatalogError::UnknownItem(plan.id.to_string()),
2874                                ),
2875                            },
2876                        )));
2877                        return;
2878                    }
2879                };
2880
2881                let finishing = RowSetFinishing {
2882                    order_by: vec![],
2883                    limit: None,
2884                    offset: 0,
2885                    project: (0..desc_arity).collect(),
2886                };
2887
2888                let read_then_write_plan = plan::ReadThenWritePlan {
2889                    id: plan.id,
2890                    selection: plan.values,
2891                    finishing,
2892                    assignments: BTreeMap::new(),
2893                    kind: MutationKind::Insert,
2894                    returning: plan.returning,
2895                };
2896
2897                self.sequence_read_then_write(ctx, read_then_write_plan)
2898                    .await;
2899            }
2900        }
2901    }
2902
2903    /// ReadThenWrite is a plan whose writes depend on the results of a
2904    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2905    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2906    /// serializability violation could occur.
2907    #[instrument]
2908    pub(super) async fn sequence_read_then_write(
2909        &mut self,
2910        mut ctx: ExecuteContext,
2911        plan: plan::ReadThenWritePlan,
2912    ) {
2913        let mut source_ids: BTreeSet<_> = plan
2914            .selection
2915            .depends_on()
2916            .into_iter()
2917            .map(|gid| self.catalog().resolve_item_id(&gid))
2918            .collect();
2919        source_ids.insert(plan.id);
2920
2921        // If the transaction doesn't already have write locks, acquire them.
2922        if ctx.session().transaction().write_locks().is_none() {
2923            // Pre-define all of the locks we need.
2924            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2925
2926            // Try acquiring all of our locks.
2927            for id in &source_ids {
2928                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2929                    write_locks.insert_lock(*id, lock);
2930                }
2931            }
2932
2933            // See if we acquired all of the neccessary locks.
2934            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2935                Ok(locks) => locks,
2936                Err(missing) => {
2937                    // Defer our write if we couldn't acquire all of the locks.
2938                    let role_metadata = ctx.session().role_metadata().clone();
2939                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2940                    let plan = DeferredPlan {
2941                        ctx,
2942                        plan: Plan::ReadThenWrite(plan),
2943                        validity: PlanValidity::new(
2944                            self.catalog.transient_revision(),
2945                            source_ids.clone(),
2946                            None,
2947                            None,
2948                            role_metadata,
2949                        ),
2950                        requires_locks: source_ids,
2951                    };
2952                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2953                }
2954            };
2955
2956            ctx.session_mut()
2957                .try_grant_write_locks(write_locks)
2958                .expect("session has already been granted write locks");
2959        }
2960
2961        let plan::ReadThenWritePlan {
2962            id,
2963            kind,
2964            selection,
2965            mut assignments,
2966            finishing,
2967            mut returning,
2968        } = plan;
2969
2970        // Read then writes can be queued, so re-verify the id exists.
2971        let desc = match self.catalog().try_get_entry(&id) {
2972            Some(table) => {
2973                let full_name = self
2974                    .catalog()
2975                    .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2976                // Inserts always occur at the latest version of the table.
2977                table
2978                    .desc_latest(&full_name)
2979                    .expect("desc called on table")
2980                    .into_owned()
2981            }
2982            None => {
2983                ctx.retire(Err(AdapterError::Catalog(
2984                    mz_catalog::memory::error::Error {
2985                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2986                            id.to_string(),
2987                        )),
2988                    },
2989                )));
2990                return;
2991            }
2992        };
2993
2994        // Disallow mz_now in any position because read time and write time differ.
2995        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2996            || assignments.values().any(|e| e.contains_temporal())
2997            || returning.iter().any(|e| e.contains_temporal());
2998        if contains_temporal {
2999            ctx.retire(Err(AdapterError::Unsupported(
3000                "calls to mz_now in write statements",
3001            )));
3002            return;
3003        }
3004
3005        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
3006        //
3007        // - They do not refer to any objects whose notion of time moves differently than that of
3008        // user tables. This limitation is meant to ensure no writes occur between this read and the
3009        // subsequent write.
3010        // - They do not use mz_now(), whose time produced during read will differ from the write
3011        //   timestamp.
3012        fn validate_read_dependencies(
3013            catalog: &Catalog,
3014            id: &CatalogItemId,
3015        ) -> Result<(), AdapterError> {
3016            use CatalogItemType::*;
3017            use mz_catalog::memory::objects;
3018            let mut ids_to_check = Vec::new();
3019            let valid = match catalog.try_get_entry(id) {
3020                Some(entry) => {
3021                    if let CatalogItem::View(objects::View { optimized_expr, .. })
3022                    | CatalogItem::MaterializedView(objects::MaterializedView {
3023                        optimized_expr,
3024                        ..
3025                    }) = entry.item()
3026                    {
3027                        if optimized_expr.contains_temporal() {
3028                            return Err(AdapterError::Unsupported(
3029                                "calls to mz_now in write statements",
3030                            ));
3031                        }
3032                    }
3033                    match entry.item().typ() {
3034                        typ @ (Func | View | MaterializedView | ContinualTask) => {
3035                            ids_to_check.extend(entry.uses());
3036                            let valid_id = id.is_user() || matches!(typ, Func);
3037                            valid_id
3038                        }
3039                        Source | Secret | Connection => false,
3040                        // Cannot select from sinks or indexes.
3041                        Sink | Index => unreachable!(),
3042                        Table => {
3043                            if !id.is_user() {
3044                                // We can't read from non-user tables
3045                                false
3046                            } else {
3047                                // We can't read from tables that are source-exports
3048                                entry.source_export_details().is_none()
3049                            }
3050                        }
3051                        Type => true,
3052                    }
3053                }
3054                None => false,
3055            };
3056            if !valid {
3057                return Err(AdapterError::InvalidTableMutationSelection);
3058            }
3059            for id in ids_to_check {
3060                validate_read_dependencies(catalog, &id)?;
3061            }
3062            Ok(())
3063        }
3064
3065        for gid in selection.depends_on() {
3066            let item_id = self.catalog().resolve_item_id(&gid);
3067            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3068                ctx.retire(Err(err));
3069                return;
3070            }
3071        }
3072
3073        let (peek_tx, peek_rx) = oneshot::channel();
3074        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3075        let (tx, _, session, extra) = ctx.into_parts();
3076        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
3077        // execution context, because this peek does not directly correspond to an execute,
3078        // and so we don't need to take any action on its retirement.
3079        // TODO[btv]: we might consider extending statement logging to log the inner
3080        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
3081        // and mint a new "real" execution context here. We'd also have to add some logic to
3082        // make sure such "sub-statements" are always sampled when the top-level statement is
3083        //
3084        // It's debatable whether this makes sense conceptually,
3085        // because the inner fragment here is not actually a
3086        // "statement" in its own right.
3087        let peek_ctx = ExecuteContext::from_parts(
3088            peek_client_tx,
3089            self.internal_cmd_tx.clone(),
3090            session,
3091            Default::default(),
3092        );
3093
3094        self.sequence_peek(
3095            peek_ctx,
3096            plan::SelectPlan {
3097                select: None,
3098                source: selection,
3099                when: QueryWhen::FreshestTableWrite,
3100                finishing,
3101                copy_to: None,
3102            },
3103            TargetCluster::Active,
3104            None,
3105        )
3106        .await;
3107
3108        let internal_cmd_tx = self.internal_cmd_tx.clone();
3109        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3110        let catalog = self.owned_catalog();
3111        let max_result_size = self.catalog().system_config().max_result_size();
3112
3113        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3114            let (peek_response, session) = match peek_rx.await {
3115                Ok(Response {
3116                    result: Ok(resp),
3117                    session,
3118                    otel_ctx,
3119                }) => {
3120                    otel_ctx.attach_as_parent();
3121                    (resp, session)
3122                }
3123                Ok(Response {
3124                    result: Err(e),
3125                    session,
3126                    otel_ctx,
3127                }) => {
3128                    let ctx =
3129                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3130                    otel_ctx.attach_as_parent();
3131                    ctx.retire(Err(e));
3132                    return;
3133                }
3134                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
3135                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3136            };
3137            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3138            let mut timeout_dur = *ctx.session().vars().statement_timeout();
3139
3140            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
3141            if timeout_dur == Duration::ZERO {
3142                timeout_dur = Duration::MAX;
3143            }
3144
3145            let style = ExprPrepStyle::OneShot {
3146                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
3147                session: ctx.session(),
3148                catalog_state: catalog.state(),
3149            };
3150            for expr in assignments.values_mut().chain(returning.iter_mut()) {
3151                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3152            }
3153
3154            let make_diffs =
3155                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3156                    let arena = RowArena::new();
3157                    let mut diffs = Vec::new();
3158                    let mut datum_vec = mz_repr::DatumVec::new();
3159
3160                    while let Some(row) = rows.next() {
3161                        if !assignments.is_empty() {
3162                            assert!(
3163                                matches!(kind, MutationKind::Update),
3164                                "only updates support assignments"
3165                            );
3166                            let mut datums = datum_vec.borrow_with(row);
3167                            let mut updates = vec![];
3168                            for (idx, expr) in &assignments {
3169                                let updated = match expr.eval(&datums, &arena) {
3170                                    Ok(updated) => updated,
3171                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3172                                };
3173                                updates.push((*idx, updated));
3174                            }
3175                            for (idx, new_value) in updates {
3176                                datums[idx] = new_value;
3177                            }
3178                            let updated = Row::pack_slice(&datums);
3179                            diffs.push((updated, Diff::ONE));
3180                        }
3181                        match kind {
3182                            // Updates and deletes always remove the
3183                            // current row. Updates will also add an
3184                            // updated value.
3185                            MutationKind::Update | MutationKind::Delete => {
3186                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
3187                            }
3188                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3189                        }
3190                    }
3191
3192                    // Sum of all the rows' byte size, for checking if we go
3193                    // above the max_result_size threshold.
3194                    let mut byte_size: u64 = 0;
3195                    for (row, diff) in &diffs {
3196                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3197                        if diff.is_positive() {
3198                            for (idx, datum) in row.iter().enumerate() {
3199                                desc.constraints_met(idx, &datum)?;
3200                            }
3201                        }
3202                    }
3203                    Ok((diffs, byte_size))
3204                };
3205
3206            let diffs = match peek_response {
3207                ExecuteResponse::SendingRowsStreaming {
3208                    rows: mut rows_stream,
3209                    ..
3210                } => {
3211                    let mut byte_size: u64 = 0;
3212                    let mut diffs = Vec::new();
3213                    let result = loop {
3214                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3215                            Ok(Some(res)) => match res {
3216                                PeekResponseUnary::Rows(new_rows) => {
3217                                    match make_diffs(new_rows) {
3218                                        Ok((mut new_diffs, new_byte_size)) => {
3219                                            byte_size = byte_size.saturating_add(new_byte_size);
3220                                            if byte_size > max_result_size {
3221                                                break Err(AdapterError::ResultSize(format!(
3222                                                    "result exceeds max size of {max_result_size}"
3223                                                )));
3224                                            }
3225                                            diffs.append(&mut new_diffs)
3226                                        }
3227                                        Err(e) => break Err(e),
3228                                    };
3229                                }
3230                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3231                                PeekResponseUnary::Error(e) => {
3232                                    break Err(AdapterError::Unstructured(anyhow!(e)));
3233                                }
3234                            },
3235                            Ok(None) => break Ok(diffs),
3236                            Err(_) => {
3237                                // We timed out, so remove the pending peek. This is
3238                                // best-effort and doesn't guarantee we won't
3239                                // receive a response.
3240                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
3241                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3242                                    conn_id: ctx.session().conn_id().clone(),
3243                                });
3244                                if let Err(e) = result {
3245                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3246                                }
3247                                break Err(AdapterError::StatementTimeout);
3248                            }
3249                        }
3250                    };
3251
3252                    result
3253                }
3254                ExecuteResponse::SendingRowsImmediate { rows } => {
3255                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3256                }
3257                resp => Err(AdapterError::Unstructured(anyhow!(
3258                    "unexpected peek response: {resp:?}"
3259                ))),
3260            };
3261
3262            let mut returning_rows = Vec::new();
3263            let mut diff_err: Option<AdapterError> = None;
3264            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3265                let arena = RowArena::new();
3266                for (row, diff) in diffs {
3267                    if !diff.is_positive() {
3268                        continue;
3269                    }
3270                    let mut returning_row = Row::with_capacity(returning.len());
3271                    let mut packer = returning_row.packer();
3272                    for expr in &returning {
3273                        let datums: Vec<_> = row.iter().collect();
3274                        match expr.eval(&datums, &arena) {
3275                            Ok(datum) => {
3276                                packer.push(datum);
3277                            }
3278                            Err(err) => {
3279                                diff_err = Some(err.into());
3280                                break;
3281                            }
3282                        }
3283                    }
3284                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3285                    let diff = match NonZeroUsize::try_from(diff) {
3286                        Ok(diff) => diff,
3287                        Err(err) => {
3288                            diff_err = Some(err.into());
3289                            break;
3290                        }
3291                    };
3292                    returning_rows.push((returning_row, diff));
3293                    if diff_err.is_some() {
3294                        break;
3295                    }
3296                }
3297            }
3298            let diffs = if let Some(err) = diff_err {
3299                Err(err)
3300            } else {
3301                diffs
3302            };
3303
3304            // We need to clear out the timestamp context so the write doesn't fail due to a
3305            // read only transaction.
3306            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3307            // No matter what isolation level the client is using, we must linearize this
3308            // read. The write will be performed right after this, as part of a single
3309            // transaction, so the write must have a timestamp greater than or equal to the
3310            // read.
3311            //
3312            // Note: It's only OK for the write to have a greater timestamp than the read
3313            // because the write lock prevents any other writes from happening in between
3314            // the read and write.
3315            if let Some(timestamp_context) = timestamp_context {
3316                let (tx, rx) = tokio::sync::oneshot::channel();
3317                let conn_id = ctx.session().conn_id().clone();
3318                let pending_read_txn = PendingReadTxn {
3319                    txn: PendingRead::ReadThenWrite { ctx, tx },
3320                    timestamp_context,
3321                    created: Instant::now(),
3322                    num_requeues: 0,
3323                    otel_ctx: OpenTelemetryContext::obtain(),
3324                };
3325                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3326                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3327                if let Err(e) = result {
3328                    warn!(
3329                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3330                        e
3331                    );
3332                    return;
3333                }
3334                let result = rx.await;
3335                // It is not an error for these results to be ready after `tx` has been dropped.
3336                ctx = match result {
3337                    Ok(Some(ctx)) => ctx,
3338                    Ok(None) => {
3339                        // Coordinator took our context and will handle responding to the client.
3340                        // This usually indicates that our transaction was aborted.
3341                        return;
3342                    }
3343                    Err(e) => {
3344                        warn!(
3345                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3346                            e
3347                        );
3348                        return;
3349                    }
3350                };
3351            }
3352
3353            match diffs {
3354                Ok(diffs) => {
3355                    let result = Self::send_diffs(
3356                        ctx.session_mut(),
3357                        plan::SendDiffsPlan {
3358                            id,
3359                            updates: diffs,
3360                            kind,
3361                            returning: returning_rows,
3362                            max_result_size,
3363                        },
3364                    );
3365                    ctx.retire(result);
3366                }
3367                Err(e) => {
3368                    ctx.retire(Err(e));
3369                }
3370            }
3371        });
3372    }
3373
3374    #[instrument]
3375    pub(super) async fn sequence_alter_item_rename(
3376        &mut self,
3377        ctx: &mut ExecuteContext,
3378        plan: plan::AlterItemRenamePlan,
3379    ) -> Result<ExecuteResponse, AdapterError> {
3380        let op = catalog::Op::RenameItem {
3381            id: plan.id,
3382            current_full_name: plan.current_full_name,
3383            to_name: plan.to_name,
3384        };
3385        match self
3386            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3387            .await
3388        {
3389            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3390            Err(err) => Err(err),
3391        }
3392    }
3393
3394    #[instrument]
3395    pub(super) async fn sequence_alter_retain_history(
3396        &mut self,
3397        ctx: &mut ExecuteContext,
3398        plan: plan::AlterRetainHistoryPlan,
3399    ) -> Result<ExecuteResponse, AdapterError> {
3400        let ops = vec![catalog::Op::AlterRetainHistory {
3401            id: plan.id,
3402            value: plan.value,
3403            window: plan.window,
3404        }];
3405        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3406            Box::pin(async move {
3407                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3408                let cluster = match catalog_item {
3409                    CatalogItem::Table(_)
3410                    | CatalogItem::MaterializedView(_)
3411                    | CatalogItem::Source(_)
3412                    | CatalogItem::ContinualTask(_) => None,
3413                    CatalogItem::Index(index) => Some(index.cluster_id),
3414                    CatalogItem::Log(_)
3415                    | CatalogItem::View(_)
3416                    | CatalogItem::Sink(_)
3417                    | CatalogItem::Type(_)
3418                    | CatalogItem::Func(_)
3419                    | CatalogItem::Secret(_)
3420                    | CatalogItem::Connection(_) => unreachable!(),
3421                };
3422                match cluster {
3423                    Some(cluster) => {
3424                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3425                    }
3426                    None => {
3427                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3428                    }
3429                }
3430            })
3431        })
3432        .await?;
3433        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3434    }
3435
3436    #[instrument]
3437    pub(super) async fn sequence_alter_schema_rename(
3438        &mut self,
3439        ctx: &mut ExecuteContext,
3440        plan: plan::AlterSchemaRenamePlan,
3441    ) -> Result<ExecuteResponse, AdapterError> {
3442        let (database_spec, schema_spec) = plan.cur_schema_spec;
3443        let op = catalog::Op::RenameSchema {
3444            database_spec,
3445            schema_spec,
3446            new_name: plan.new_schema_name,
3447            check_reserved_names: true,
3448        };
3449        match self
3450            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3451            .await
3452        {
3453            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3454            Err(err) => Err(err),
3455        }
3456    }
3457
3458    #[instrument]
3459    pub(super) async fn sequence_alter_schema_swap(
3460        &mut self,
3461        ctx: &mut ExecuteContext,
3462        plan: plan::AlterSchemaSwapPlan,
3463    ) -> Result<ExecuteResponse, AdapterError> {
3464        let plan::AlterSchemaSwapPlan {
3465            schema_a_spec: (schema_a_db, schema_a),
3466            schema_a_name,
3467            schema_b_spec: (schema_b_db, schema_b),
3468            schema_b_name,
3469            name_temp,
3470        } = plan;
3471
3472        let op_a = catalog::Op::RenameSchema {
3473            database_spec: schema_a_db,
3474            schema_spec: schema_a,
3475            new_name: name_temp,
3476            check_reserved_names: false,
3477        };
3478        let op_b = catalog::Op::RenameSchema {
3479            database_spec: schema_b_db,
3480            schema_spec: schema_b,
3481            new_name: schema_a_name,
3482            check_reserved_names: false,
3483        };
3484        let op_c = catalog::Op::RenameSchema {
3485            database_spec: schema_a_db,
3486            schema_spec: schema_a,
3487            new_name: schema_b_name,
3488            check_reserved_names: false,
3489        };
3490
3491        match self
3492            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3493                Box::pin(async {})
3494            })
3495            .await
3496        {
3497            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3498            Err(err) => Err(err),
3499        }
3500    }
3501
3502    #[instrument]
3503    pub(super) async fn sequence_alter_role(
3504        &mut self,
3505        session: &Session,
3506        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3507    ) -> Result<ExecuteResponse, AdapterError> {
3508        let catalog = self.catalog().for_session(session);
3509        let role = catalog.get_role(&id);
3510
3511        // We'll send these notices to the user, if the operation is successful.
3512        let mut notices = vec![];
3513
3514        // Get the attributes and variables from the role, as they currently are.
3515        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3516        let mut vars = role.vars().clone();
3517
3518        // Whether to set the password to NULL. This is a special case since the existing
3519        // password is not stored in the role attributes.
3520        let mut nopassword = false;
3521
3522        // Apply our updates.
3523        match option {
3524            PlannedAlterRoleOption::Attributes(attrs) => {
3525                self.validate_role_attributes(&attrs.clone().into())?;
3526
3527                if let Some(inherit) = attrs.inherit {
3528                    attributes.inherit = inherit;
3529                }
3530
3531                if let Some(password) = attrs.password {
3532                    attributes.password = Some(password);
3533                }
3534
3535                if let Some(superuser) = attrs.superuser {
3536                    attributes.superuser = Some(superuser);
3537                }
3538
3539                if let Some(login) = attrs.login {
3540                    attributes.login = Some(login);
3541                }
3542
3543                if attrs.nopassword.unwrap_or(false) {
3544                    nopassword = true;
3545                }
3546
3547                if let Some(notice) = self.should_emit_rbac_notice(session) {
3548                    notices.push(notice);
3549                }
3550            }
3551            PlannedAlterRoleOption::Variable(variable) => {
3552                // Get the variable to make sure it's valid and visible.
3553                let session_var = session.vars().inspect(variable.name())?;
3554                // Return early if it's not visible.
3555                session_var.visible(session.user(), catalog.system_vars())?;
3556
3557                // Emit a warning when deprecated variables are used.
3558                // TODO(database-issues#8069) remove this after sufficient time has passed
3559                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3560                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3561                } else if let PlannedRoleVariable::Set {
3562                    name,
3563                    value: VariableValue::Values(vals),
3564                } = &variable
3565                {
3566                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3567                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3568                    }
3569                }
3570
3571                let var_name = match variable {
3572                    PlannedRoleVariable::Set { name, value } => {
3573                        // Update our persisted set.
3574                        match &value {
3575                            VariableValue::Default => {
3576                                vars.remove(&name);
3577                            }
3578                            VariableValue::Values(vals) => {
3579                                let var = match &vals[..] {
3580                                    [val] => OwnedVarInput::Flat(val.clone()),
3581                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3582                                };
3583                                // Make sure the input is valid.
3584                                session_var.check(var.borrow())?;
3585
3586                                vars.insert(name.clone(), var);
3587                            }
3588                        };
3589                        name
3590                    }
3591                    PlannedRoleVariable::Reset { name } => {
3592                        // Remove it from our persisted values.
3593                        vars.remove(&name);
3594                        name
3595                    }
3596                };
3597
3598                // Emit a notice that they need to reconnect to see the change take effect.
3599                notices.push(AdapterNotice::VarDefaultUpdated {
3600                    role: Some(name.clone()),
3601                    var_name: Some(var_name),
3602                });
3603            }
3604        }
3605
3606        let op = catalog::Op::AlterRole {
3607            id,
3608            name,
3609            attributes,
3610            nopassword,
3611            vars: RoleVars { map: vars },
3612        };
3613        let response = self
3614            .catalog_transact(Some(session), vec![op])
3615            .await
3616            .map(|_| ExecuteResponse::AlteredRole)?;
3617
3618        // Send all of our queued notices.
3619        session.add_notices(notices);
3620
3621        Ok(response)
3622    }
3623
3624    #[instrument]
3625    pub(super) async fn sequence_alter_sink_prepare(
3626        &mut self,
3627        ctx: ExecuteContext,
3628        plan: plan::AlterSinkPlan,
3629    ) {
3630        // Put a read hold on the new relation
3631        let id_bundle = crate::CollectionIdBundle {
3632            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3633            compute_ids: BTreeMap::new(),
3634        };
3635        let read_hold = self.acquire_read_holds(&id_bundle);
3636
3637        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3638            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3639            return;
3640        };
3641
3642        let otel_ctx = OpenTelemetryContext::obtain();
3643        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3644
3645        let plan_validity = PlanValidity::new(
3646            self.catalog().transient_revision(),
3647            BTreeSet::from_iter([plan.item_id, from_item_id]),
3648            Some(plan.in_cluster),
3649            None,
3650            ctx.session().role_metadata().clone(),
3651        );
3652
3653        info!(
3654            "preparing alter sink for {}: frontiers={:?} export={:?}",
3655            plan.global_id,
3656            self.controller
3657                .storage_collections
3658                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3659            self.controller.storage.export(plan.global_id)
3660        );
3661
3662        // Now we must wait for the sink to make enough progress such that there is overlap between
3663        // the new `from` collection's read hold and the sink's write frontier.
3664        //
3665        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3666        // the watch set never completes and neither does the `ALTER SINK` command.
3667        self.install_storage_watch_set(
3668            ctx.session().conn_id().clone(),
3669            BTreeSet::from_iter([plan.global_id]),
3670            read_ts,
3671            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3672                ctx: Some(ctx),
3673                otel_ctx,
3674                plan,
3675                plan_validity,
3676                read_hold,
3677            }),
3678        );
3679    }
3680
3681    #[instrument]
3682    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3683        ctx.otel_ctx.attach_as_parent();
3684
3685        let plan::AlterSinkPlan {
3686            item_id,
3687            global_id,
3688            sink: sink_plan,
3689            with_snapshot,
3690            in_cluster,
3691        } = ctx.plan.clone();
3692
3693        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3694        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3695        // arbitrarily changed since we performed planning, and we must re-assert that it still
3696        // matches our requirements.
3697        //
3698        // The `PlanValidity` check ensures that both the sink and the new source relation still
3699        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3700        // time, which we do by comparing the catalog sink version to the one in the plan.
3701        match ctx.plan_validity.check(self.catalog()) {
3702            Ok(()) => {}
3703            Err(err) => {
3704                ctx.retire(Err(err));
3705                return;
3706            }
3707        }
3708
3709        let entry = self.catalog().get_entry(&item_id);
3710        let CatalogItem::Sink(old_sink) = entry.item() else {
3711            panic!("invalid item kind for `AlterSinkPlan`");
3712        };
3713
3714        if sink_plan.version != old_sink.version + 1 {
3715            ctx.retire(Err(AdapterError::ChangedPlan(
3716                "sink was altered concurrently".into(),
3717            )));
3718            return;
3719        }
3720
3721        info!(
3722            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3723            self.controller
3724                .storage_collections
3725                .collections_frontiers(vec![global_id, sink_plan.from]),
3726            self.controller.storage.export(global_id),
3727        );
3728
3729        // Assert that we can recover the updates that happened at the timestamps of the write
3730        // frontier. This must be true in this call.
3731        let write_frontier = &self
3732            .controller
3733            .storage
3734            .export(global_id)
3735            .expect("sink known to exist")
3736            .write_frontier;
3737        let as_of = ctx.read_hold.least_valid_read();
3738        assert!(
3739            write_frontier.iter().all(|t| as_of.less_than(t)),
3740            "{:?} should be strictly less than {:?}",
3741            &*as_of,
3742            &**write_frontier
3743        );
3744
3745        // Parse the `create_sql` so we can update it to the new sink definition.
3746        //
3747        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3748        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3749        // names in the `create_sql` may have changed, for example due to a schema swap.
3750        let create_sql = &old_sink.create_sql;
3751        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3752        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3753            unreachable!("invalid statement kind for sink");
3754        };
3755
3756        // Update the sink version.
3757        stmt.with_options
3758            .retain(|o| o.name != CreateSinkOptionName::Version);
3759        stmt.with_options.push(CreateSinkOption {
3760            name: CreateSinkOptionName::Version,
3761            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3762                sink_plan.version.to_string(),
3763            ))),
3764        });
3765
3766        let conn_catalog = self.catalog().for_system_session();
3767        let (mut stmt, resolved_ids) =
3768            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3769
3770        // Update the `from` relation.
3771        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3772        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3773        stmt.from = ResolvedItemName::Item {
3774            id: from_entry.id(),
3775            qualifiers: from_entry.name.qualifiers.clone(),
3776            full_name,
3777            print_id: true,
3778            version: from_entry.version,
3779        };
3780
3781        let new_sink = Sink {
3782            create_sql: stmt.to_ast_string_stable(),
3783            global_id,
3784            from: sink_plan.from,
3785            connection: sink_plan.connection.clone(),
3786            envelope: sink_plan.envelope,
3787            version: sink_plan.version,
3788            with_snapshot,
3789            resolved_ids: resolved_ids.clone(),
3790            cluster_id: in_cluster,
3791        };
3792
3793        let ops = vec![catalog::Op::UpdateItem {
3794            id: item_id,
3795            name: entry.name().clone(),
3796            to_item: CatalogItem::Sink(new_sink),
3797        }];
3798
3799        match self
3800            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3801            .await
3802        {
3803            Ok(()) => {}
3804            Err(err) => {
3805                ctx.retire(Err(err));
3806                return;
3807            }
3808        }
3809
3810        let storage_sink_desc = StorageSinkDesc {
3811            from: sink_plan.from,
3812            from_desc: from_entry
3813                .desc_opt()
3814                .expect("sinks can only be built on items with descs")
3815                .into_owned(),
3816            connection: sink_plan
3817                .connection
3818                .clone()
3819                .into_inline_connection(self.catalog().state()),
3820            envelope: sink_plan.envelope,
3821            as_of,
3822            with_snapshot,
3823            version: sink_plan.version,
3824            from_storage_metadata: (),
3825            to_storage_metadata: (),
3826        };
3827
3828        self.controller
3829            .storage
3830            .alter_export(
3831                global_id,
3832                ExportDescription {
3833                    sink: storage_sink_desc,
3834                    instance_id: in_cluster,
3835                },
3836            )
3837            .await
3838            .unwrap_or_terminate("cannot fail to alter source desc");
3839
3840        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3841    }
3842
3843    #[instrument]
3844    pub(super) async fn sequence_alter_connection(
3845        &mut self,
3846        ctx: ExecuteContext,
3847        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3848    ) {
3849        match action {
3850            AlterConnectionAction::RotateKeys => {
3851                self.sequence_rotate_keys(ctx, id).await;
3852            }
3853            AlterConnectionAction::AlterOptions {
3854                set_options,
3855                drop_options,
3856                validate,
3857            } => {
3858                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3859                    .await
3860            }
3861        }
3862    }
3863
3864    #[instrument]
3865    async fn sequence_alter_connection_options(
3866        &mut self,
3867        mut ctx: ExecuteContext,
3868        id: CatalogItemId,
3869        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3870        drop_options: BTreeSet<ConnectionOptionName>,
3871        validate: bool,
3872    ) {
3873        let cur_entry = self.catalog().get_entry(&id);
3874        let cur_conn = cur_entry.connection().expect("known to be connection");
3875        let connection_gid = cur_conn.global_id();
3876
3877        let inner = || -> Result<Connection, AdapterError> {
3878            // Parse statement.
3879            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3880                .expect("invalid create sql persisted to catalog")
3881                .into_element()
3882                .ast
3883            {
3884                Statement::CreateConnection(stmt) => stmt,
3885                _ => unreachable!("proved type is source"),
3886            };
3887
3888            let catalog = self.catalog().for_system_session();
3889
3890            // Resolve items in statement
3891            let (mut create_conn_stmt, resolved_ids) =
3892                mz_sql::names::resolve(&catalog, create_conn_stmt)
3893                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3894
3895            // Retain options that are neither set nor dropped.
3896            create_conn_stmt
3897                .values
3898                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3899
3900            // Set new values
3901            create_conn_stmt.values.extend(
3902                set_options
3903                    .into_iter()
3904                    .map(|(name, value)| ConnectionOption { name, value }),
3905            );
3906
3907            // Open a new catalog, which we will use to re-plan our
3908            // statement with the desired config.
3909            let mut catalog = self.catalog().for_system_session();
3910            catalog.mark_id_unresolvable_for_replanning(id);
3911
3912            // Re-define our source in terms of the amended statement
3913            let plan = match mz_sql::plan::plan(
3914                None,
3915                &catalog,
3916                Statement::CreateConnection(create_conn_stmt),
3917                &Params::empty(),
3918                &resolved_ids,
3919            )
3920            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3921            {
3922                Plan::CreateConnection(plan) => plan,
3923                _ => unreachable!("create source plan is only valid response"),
3924            };
3925
3926            // Parse statement.
3927            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3928                .expect("invalid create sql persisted to catalog")
3929                .into_element()
3930                .ast
3931            {
3932                Statement::CreateConnection(stmt) => stmt,
3933                _ => unreachable!("proved type is source"),
3934            };
3935
3936            let catalog = self.catalog().for_system_session();
3937
3938            // Resolve items in statement
3939            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3940                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3941
3942            Ok(Connection {
3943                create_sql: plan.connection.create_sql,
3944                global_id: cur_conn.global_id,
3945                details: plan.connection.details,
3946                resolved_ids: new_deps,
3947            })
3948        };
3949
3950        let conn = match inner() {
3951            Ok(conn) => conn,
3952            Err(e) => {
3953                return ctx.retire(Err(e));
3954            }
3955        };
3956
3957        if validate {
3958            let connection = conn
3959                .details
3960                .to_connection()
3961                .into_inline_connection(self.catalog().state());
3962
3963            let internal_cmd_tx = self.internal_cmd_tx.clone();
3964            let transient_revision = self.catalog().transient_revision();
3965            let conn_id = ctx.session().conn_id().clone();
3966            let otel_ctx = OpenTelemetryContext::obtain();
3967            let role_metadata = ctx.session().role_metadata().clone();
3968            let current_storage_parameters = self.controller.storage.config().clone();
3969
3970            task::spawn(
3971                || format!("validate_alter_connection:{conn_id}"),
3972                async move {
3973                    let resolved_ids = conn.resolved_ids.clone();
3974                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3975                    let result = match connection.validate(id, &current_storage_parameters).await {
3976                        Ok(()) => Ok(conn),
3977                        Err(err) => Err(err.into()),
3978                    };
3979
3980                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3981                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3982                        AlterConnectionValidationReady {
3983                            ctx,
3984                            result,
3985                            connection_id: id,
3986                            connection_gid,
3987                            plan_validity: PlanValidity::new(
3988                                transient_revision,
3989                                dependency_ids.clone(),
3990                                None,
3991                                None,
3992                                role_metadata,
3993                            ),
3994                            otel_ctx,
3995                            resolved_ids,
3996                        },
3997                    ));
3998                    if let Err(e) = result {
3999                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
4000                    }
4001                },
4002            );
4003        } else {
4004            let result = self
4005                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
4006                .await;
4007            ctx.retire(result);
4008        }
4009    }
4010
4011    #[instrument]
4012    pub(crate) async fn sequence_alter_connection_stage_finish(
4013        &mut self,
4014        session: &mut Session,
4015        id: CatalogItemId,
4016        connection: Connection,
4017    ) -> Result<ExecuteResponse, AdapterError> {
4018        match self.catalog.get_entry(&id).item() {
4019            CatalogItem::Connection(curr_conn) => {
4020                curr_conn
4021                    .details
4022                    .to_connection()
4023                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
4024                    .map_err(StorageError::from)?;
4025            }
4026            _ => unreachable!("known to be a connection"),
4027        };
4028
4029        let ops = vec![catalog::Op::UpdateItem {
4030            id,
4031            name: self.catalog.get_entry(&id).name().clone(),
4032            to_item: CatalogItem::Connection(connection.clone()),
4033        }];
4034
4035        self.catalog_transact(Some(session), ops).await?;
4036
4037        match connection.details {
4038            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
4039                let spec = VpcEndpointConfig {
4040                    aws_service_name: privatelink.service_name.to_owned(),
4041                    availability_zone_ids: privatelink.availability_zones.to_owned(),
4042                };
4043                self.cloud_resource_controller
4044                    .as_ref()
4045                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
4046                    .ensure_vpc_endpoint(id, spec)
4047                    .await?;
4048            }
4049            _ => {}
4050        };
4051
4052        let entry = self.catalog().get_entry(&id);
4053
4054        let mut connections = VecDeque::new();
4055        connections.push_front(entry.id());
4056
4057        let mut source_connections = BTreeMap::new();
4058        let mut sink_connections = BTreeMap::new();
4059        let mut source_export_data_configs = BTreeMap::new();
4060
4061        while let Some(id) = connections.pop_front() {
4062            for id in self.catalog.get_entry(&id).used_by() {
4063                let entry = self.catalog.get_entry(id);
4064                match entry.item() {
4065                    CatalogItem::Connection(_) => connections.push_back(*id),
4066                    CatalogItem::Source(source) => {
4067                        let desc = match &entry.source().expect("known to be source").data_source {
4068                            DataSourceDesc::Ingestion { desc, .. }
4069                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4070                                desc.clone().into_inline_connection(self.catalog().state())
4071                            }
4072                            _ => unreachable!("only ingestions reference connections"),
4073                        };
4074
4075                        source_connections.insert(source.global_id, desc.connection);
4076                    }
4077                    CatalogItem::Sink(sink) => {
4078                        let export = entry.sink().expect("known to be sink");
4079                        sink_connections.insert(
4080                            sink.global_id,
4081                            export
4082                                .connection
4083                                .clone()
4084                                .into_inline_connection(self.catalog().state()),
4085                        );
4086                    }
4087                    CatalogItem::Table(table) => {
4088                        // This is a source-fed table that reference a schema registry
4089                        // connection as a part of its encoding / data config
4090                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4091                            let data_config = export_data_config.clone();
4092                            source_export_data_configs.insert(
4093                                table.global_id_writes(),
4094                                data_config.into_inline_connection(self.catalog().state()),
4095                            );
4096                        }
4097                    }
4098                    t => unreachable!("connection dependency not expected on {:?}", t),
4099                }
4100            }
4101        }
4102
4103        if !source_connections.is_empty() {
4104            self.controller
4105                .storage
4106                .alter_ingestion_connections(source_connections)
4107                .await
4108                .unwrap_or_terminate("cannot fail to alter ingestion connection");
4109        }
4110
4111        if !sink_connections.is_empty() {
4112            self.controller
4113                .storage
4114                .alter_export_connections(sink_connections)
4115                .await
4116                .unwrap_or_terminate("altering exports after txn must succeed");
4117        }
4118
4119        if !source_export_data_configs.is_empty() {
4120            self.controller
4121                .storage
4122                .alter_ingestion_export_data_configs(source_export_data_configs)
4123                .await
4124                .unwrap_or_terminate("altering source export data configs after txn must succeed");
4125        }
4126
4127        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4128    }
4129
4130    #[instrument]
4131    pub(super) async fn sequence_alter_source(
4132        &mut self,
4133        session: &Session,
4134        plan::AlterSourcePlan {
4135            item_id,
4136            ingestion_id,
4137            action,
4138        }: plan::AlterSourcePlan,
4139    ) -> Result<ExecuteResponse, AdapterError> {
4140        let cur_entry = self.catalog().get_entry(&item_id);
4141        let cur_source = cur_entry.source().expect("known to be source");
4142
4143        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4144            // Parse statement.
4145            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4146                .expect("invalid create sql persisted to catalog")
4147                .into_element()
4148                .ast
4149            {
4150                Statement::CreateSource(stmt) => stmt,
4151                _ => unreachable!("proved type is source"),
4152            };
4153
4154            let catalog = coord.catalog().for_system_session();
4155
4156            // Resolve items in statement
4157            mz_sql::names::resolve(&catalog, create_source_stmt)
4158                .map_err(|e| AdapterError::internal(err_cx, e))
4159        };
4160
4161        match action {
4162            plan::AlterSourceAction::AddSubsourceExports {
4163                subsources,
4164                options,
4165            } => {
4166                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4167
4168                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4169                    text_columns: mut new_text_columns,
4170                    exclude_columns: mut new_exclude_columns,
4171                    ..
4172                } = options.try_into()?;
4173
4174                // Resolve items in statement
4175                let (mut create_source_stmt, resolved_ids) =
4176                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4177
4178                // Get all currently referred-to items
4179                let catalog = self.catalog();
4180                let curr_references: BTreeSet<_> = catalog
4181                    .get_entry(&item_id)
4182                    .used_by()
4183                    .into_iter()
4184                    .filter_map(|subsource| {
4185                        catalog
4186                            .get_entry(subsource)
4187                            .subsource_details()
4188                            .map(|(_id, reference, _details)| reference)
4189                    })
4190                    .collect();
4191
4192                // We are doing a lot of unwrapping, so just make an error to reference; all of
4193                // these invariants are guaranteed to be true because of how we plan subsources.
4194                let purification_err =
4195                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4196
4197                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
4198                // we remove support for implicitly created subsources from a `CREATE SOURCE`
4199                // statement.
4200                match &mut create_source_stmt.connection {
4201                    CreateSourceConnection::Postgres {
4202                        options: curr_options,
4203                        ..
4204                    } => {
4205                        let mz_sql::plan::PgConfigOptionExtracted {
4206                            mut text_columns, ..
4207                        } = curr_options.clone().try_into()?;
4208
4209                        // Drop text columns; we will add them back in
4210                        // as appropriate below.
4211                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4212
4213                        // Drop all text columns that are not currently referred to.
4214                        text_columns.retain(|column_qualified_reference| {
4215                            mz_ore::soft_assert_eq_or_log!(
4216                                column_qualified_reference.0.len(),
4217                                4,
4218                                "all TEXT COLUMNS values must be column-qualified references"
4219                            );
4220                            let mut table = column_qualified_reference.clone();
4221                            table.0.truncate(3);
4222                            curr_references.contains(&table)
4223                        });
4224
4225                        // Merge the current text columns into the new text columns.
4226                        new_text_columns.extend(text_columns);
4227
4228                        // If we have text columns, add them to the options.
4229                        if !new_text_columns.is_empty() {
4230                            new_text_columns.sort();
4231                            let new_text_columns = new_text_columns
4232                                .into_iter()
4233                                .map(WithOptionValue::UnresolvedItemName)
4234                                .collect();
4235
4236                            curr_options.push(PgConfigOption {
4237                                name: PgConfigOptionName::TextColumns,
4238                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4239                            });
4240                        }
4241                    }
4242                    CreateSourceConnection::MySql {
4243                        options: curr_options,
4244                        ..
4245                    } => {
4246                        let mz_sql::plan::MySqlConfigOptionExtracted {
4247                            mut text_columns,
4248                            mut exclude_columns,
4249                            ..
4250                        } = curr_options.clone().try_into()?;
4251
4252                        // Drop both ignore and text columns; we will add them back in
4253                        // as appropriate below.
4254                        curr_options.retain(|o| {
4255                            !matches!(
4256                                o.name,
4257                                MySqlConfigOptionName::TextColumns
4258                                    | MySqlConfigOptionName::ExcludeColumns
4259                            )
4260                        });
4261
4262                        // Drop all text / exclude columns that are not currently referred to.
4263                        let column_referenced =
4264                            |column_qualified_reference: &UnresolvedItemName| {
4265                                mz_ore::soft_assert_eq_or_log!(
4266                                    column_qualified_reference.0.len(),
4267                                    3,
4268                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4269                                );
4270                                let mut table = column_qualified_reference.clone();
4271                                table.0.truncate(2);
4272                                curr_references.contains(&table)
4273                            };
4274                        text_columns.retain(column_referenced);
4275                        exclude_columns.retain(column_referenced);
4276
4277                        // Merge the current text / exclude columns into the new text / exclude columns.
4278                        new_text_columns.extend(text_columns);
4279                        new_exclude_columns.extend(exclude_columns);
4280
4281                        // If we have text columns, add them to the options.
4282                        if !new_text_columns.is_empty() {
4283                            new_text_columns.sort();
4284                            let new_text_columns = new_text_columns
4285                                .into_iter()
4286                                .map(WithOptionValue::UnresolvedItemName)
4287                                .collect();
4288
4289                            curr_options.push(MySqlConfigOption {
4290                                name: MySqlConfigOptionName::TextColumns,
4291                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4292                            });
4293                        }
4294                        // If we have exclude columns, add them to the options.
4295                        if !new_exclude_columns.is_empty() {
4296                            new_exclude_columns.sort();
4297                            let new_exclude_columns = new_exclude_columns
4298                                .into_iter()
4299                                .map(WithOptionValue::UnresolvedItemName)
4300                                .collect();
4301
4302                            curr_options.push(MySqlConfigOption {
4303                                name: MySqlConfigOptionName::ExcludeColumns,
4304                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4305                            });
4306                        }
4307                    }
4308                    CreateSourceConnection::SqlServer {
4309                        options: curr_options,
4310                        ..
4311                    } => {
4312                        let mz_sql::plan::SqlServerConfigOptionExtracted {
4313                            mut text_columns,
4314                            mut exclude_columns,
4315                            ..
4316                        } = curr_options.clone().try_into()?;
4317
4318                        // Drop both ignore and text columns; we will add them back in
4319                        // as appropriate below.
4320                        curr_options.retain(|o| {
4321                            !matches!(
4322                                o.name,
4323                                SqlServerConfigOptionName::TextColumns
4324                                    | SqlServerConfigOptionName::ExcludeColumns
4325                            )
4326                        });
4327
4328                        // Drop all text / exclude columns that are not currently referred to.
4329                        let column_referenced =
4330                            |column_qualified_reference: &UnresolvedItemName| {
4331                                mz_ore::soft_assert_eq_or_log!(
4332                                    column_qualified_reference.0.len(),
4333                                    3,
4334                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4335                                );
4336                                let mut table = column_qualified_reference.clone();
4337                                table.0.truncate(2);
4338                                curr_references.contains(&table)
4339                            };
4340                        text_columns.retain(column_referenced);
4341                        exclude_columns.retain(column_referenced);
4342
4343                        // Merge the current text / exclude columns into the new text / exclude columns.
4344                        new_text_columns.extend(text_columns);
4345                        new_exclude_columns.extend(exclude_columns);
4346
4347                        // If we have text columns, add them to the options.
4348                        if !new_text_columns.is_empty() {
4349                            new_text_columns.sort();
4350                            let new_text_columns = new_text_columns
4351                                .into_iter()
4352                                .map(WithOptionValue::UnresolvedItemName)
4353                                .collect();
4354
4355                            curr_options.push(SqlServerConfigOption {
4356                                name: SqlServerConfigOptionName::TextColumns,
4357                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4358                            });
4359                        }
4360                        // If we have exclude columns, add them to the options.
4361                        if !new_exclude_columns.is_empty() {
4362                            new_exclude_columns.sort();
4363                            let new_exclude_columns = new_exclude_columns
4364                                .into_iter()
4365                                .map(WithOptionValue::UnresolvedItemName)
4366                                .collect();
4367
4368                            curr_options.push(SqlServerConfigOption {
4369                                name: SqlServerConfigOptionName::ExcludeColumns,
4370                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4371                            });
4372                        }
4373                    }
4374                    _ => return Err(purification_err()),
4375                };
4376
4377                let mut catalog = self.catalog().for_system_session();
4378                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4379
4380                // Re-define our source in terms of the amended statement
4381                let plan = match mz_sql::plan::plan(
4382                    None,
4383                    &catalog,
4384                    Statement::CreateSource(create_source_stmt),
4385                    &Params::empty(),
4386                    &resolved_ids,
4387                )
4388                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4389                {
4390                    Plan::CreateSource(plan) => plan,
4391                    _ => unreachable!("create source plan is only valid response"),
4392                };
4393
4394                // Asserting that we've done the right thing with dependencies
4395                // here requires mocking out objects in the catalog, which is a
4396                // large task for an operation we have to cover in tests anyway.
4397                let source = Source::new(
4398                    plan,
4399                    cur_source.global_id,
4400                    resolved_ids,
4401                    cur_source.custom_logical_compaction_window,
4402                    cur_source.is_retained_metrics_object,
4403                );
4404
4405                let source_compaction_window = source.custom_logical_compaction_window;
4406
4407                // Get new ingestion description for storage.
4408                let desc = match &source.data_source {
4409                    DataSourceDesc::Ingestion { desc, .. }
4410                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4411                        desc.clone().into_inline_connection(self.catalog().state())
4412                    }
4413                    _ => unreachable!("already verified of type ingestion"),
4414                };
4415
4416                self.controller
4417                    .storage
4418                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4419                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4420
4421                // Redefine source. This must be done before we create any new
4422                // subsources so that it has the right ingestion.
4423                let mut ops = vec![catalog::Op::UpdateItem {
4424                    id: item_id,
4425                    // Look this up again so we don't have to hold an immutable reference to the
4426                    // entry for so long.
4427                    name: self.catalog.get_entry(&item_id).name().clone(),
4428                    to_item: CatalogItem::Source(source),
4429                }];
4430
4431                let CreateSourceInner {
4432                    ops: new_ops,
4433                    sources,
4434                    if_not_exists_ids,
4435                } = self.create_source_inner(session, subsources).await?;
4436
4437                ops.extend(new_ops.into_iter());
4438
4439                assert!(
4440                    if_not_exists_ids.is_empty(),
4441                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4442                );
4443
4444                self.catalog_transact(Some(session), ops).await?;
4445
4446                let mut item_ids = BTreeSet::new();
4447                let mut collections = Vec::with_capacity(sources.len());
4448                for (item_id, source) in sources {
4449                    let status_id = self.catalog().resolve_builtin_storage_collection(
4450                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4451                    );
4452                    let source_status_collection_id =
4453                        Some(self.catalog().get_entry(&status_id).latest_global_id());
4454
4455                    let (data_source, status_collection_id) = match source.data_source {
4456                        // Subsources use source statuses.
4457                        DataSourceDesc::IngestionExport {
4458                            ingestion_id,
4459                            external_reference: _,
4460                            details,
4461                            data_config,
4462                        } => {
4463                            // TODO(parkmycar): We should probably check the type here, but I'm not sure if
4464                            // this will always be a Source or a Table.
4465                            let ingestion_id =
4466                                self.catalog().get_entry(&ingestion_id).latest_global_id();
4467                            (
4468                                DataSource::IngestionExport {
4469                                    ingestion_id,
4470                                    details,
4471                                    data_config: data_config
4472                                        .into_inline_connection(self.catalog().state()),
4473                                },
4474                                source_status_collection_id,
4475                            )
4476                        }
4477                        o => {
4478                            unreachable!(
4479                                "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4480                                o
4481                            )
4482                        }
4483                    };
4484
4485                    collections.push((
4486                        source.global_id,
4487                        CollectionDescription {
4488                            desc: source.desc.clone(),
4489                            data_source,
4490                            since: None,
4491                            status_collection_id,
4492                            timeline: Some(source.timeline.clone()),
4493                        },
4494                    ));
4495
4496                    item_ids.insert(item_id);
4497                }
4498
4499                let storage_metadata = self.catalog.state().storage_metadata();
4500
4501                self.controller
4502                    .storage
4503                    .create_collections(storage_metadata, None, collections)
4504                    .await
4505                    .unwrap_or_terminate("cannot fail to create collections");
4506
4507                self.initialize_storage_read_policies(
4508                    item_ids,
4509                    source_compaction_window.unwrap_or(CompactionWindow::Default),
4510                )
4511                .await;
4512            }
4513            plan::AlterSourceAction::RefreshReferences { references } => {
4514                self.catalog_transact(
4515                    Some(session),
4516                    vec![catalog::Op::UpdateSourceReferences {
4517                        source_id: item_id,
4518                        references: references.into(),
4519                    }],
4520                )
4521                .await?;
4522            }
4523        }
4524
4525        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4526    }
4527
4528    #[instrument]
4529    pub(super) async fn sequence_alter_system_set(
4530        &mut self,
4531        session: &Session,
4532        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4533    ) -> Result<ExecuteResponse, AdapterError> {
4534        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4535        // We want to ensure that the network policy we're switching too actually exists.
4536        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4537            self.validate_alter_system_network_policy(session, &value)?;
4538        }
4539
4540        let op = match value {
4541            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4542                name: name.clone(),
4543                value: OwnedVarInput::SqlSet(values),
4544            },
4545            plan::VariableValue::Default => {
4546                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4547            }
4548        };
4549        self.catalog_transact(Some(session), vec![op]).await?;
4550
4551        session.add_notice(AdapterNotice::VarDefaultUpdated {
4552            role: None,
4553            var_name: Some(name),
4554        });
4555        Ok(ExecuteResponse::AlteredSystemConfiguration)
4556    }
4557
4558    #[instrument]
4559    pub(super) async fn sequence_alter_system_reset(
4560        &mut self,
4561        session: &Session,
4562        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4563    ) -> Result<ExecuteResponse, AdapterError> {
4564        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4565        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4566        self.catalog_transact(Some(session), vec![op]).await?;
4567        session.add_notice(AdapterNotice::VarDefaultUpdated {
4568            role: None,
4569            var_name: Some(name),
4570        });
4571        Ok(ExecuteResponse::AlteredSystemConfiguration)
4572    }
4573
4574    #[instrument]
4575    pub(super) async fn sequence_alter_system_reset_all(
4576        &mut self,
4577        session: &Session,
4578        _: plan::AlterSystemResetAllPlan,
4579    ) -> Result<ExecuteResponse, AdapterError> {
4580        self.is_user_allowed_to_alter_system(session, None)?;
4581        let op = catalog::Op::ResetAllSystemConfiguration;
4582        self.catalog_transact(Some(session), vec![op]).await?;
4583        session.add_notice(AdapterNotice::VarDefaultUpdated {
4584            role: None,
4585            var_name: None,
4586        });
4587        Ok(ExecuteResponse::AlteredSystemConfiguration)
4588    }
4589
4590    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4591    fn is_user_allowed_to_alter_system(
4592        &self,
4593        session: &Session,
4594        var_name: Option<&str>,
4595    ) -> Result<(), AdapterError> {
4596        match (session.user().kind(), var_name) {
4597            // Only internal superusers can reset all system variables.
4598            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4599            // Whether or not a variable can be modified depends if we're an internal superuser.
4600            (UserKind::Superuser, Some(name))
4601                if session.user().is_internal()
4602                    || self.catalog().system_config().user_modifiable(name) =>
4603            {
4604                // In lieu of plumbing the user to all system config functions, just check that
4605                // the var is visible.
4606                let var = self.catalog().system_config().get(name)?;
4607                var.visible(session.user(), self.catalog().system_config())?;
4608                Ok(())
4609            }
4610            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4611            // session variables.
4612            (UserKind::Regular, Some(name))
4613                if self.catalog().system_config().user_modifiable(name) =>
4614            {
4615                Err(AdapterError::Unauthorized(
4616                    rbac::UnauthorizedError::Superuser {
4617                        action: format!("toggle the '{name}' system configuration parameter"),
4618                    },
4619                ))
4620            }
4621            _ => Err(AdapterError::Unauthorized(
4622                rbac::UnauthorizedError::MzSystem {
4623                    action: "alter system".into(),
4624                },
4625            )),
4626        }
4627    }
4628
4629    fn validate_alter_system_network_policy(
4630        &self,
4631        session: &Session,
4632        policy_value: &plan::VariableValue,
4633    ) -> Result<(), AdapterError> {
4634        let policy_name = match &policy_value {
4635            // Make sure the compiled in default still exists.
4636            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4637            plan::VariableValue::Values(values) if values.len() == 1 => {
4638                values.iter().next().cloned()
4639            }
4640            plan::VariableValue::Values(values) => {
4641                tracing::warn!(?values, "can't set multiple network policies at once");
4642                None
4643            }
4644        };
4645        let maybe_network_policy = policy_name
4646            .as_ref()
4647            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4648        let Some(network_policy) = maybe_network_policy else {
4649            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4650                VarError::InvalidParameterValue {
4651                    name: NETWORK_POLICY.name(),
4652                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4653                    reason: "no network policy with such name exists".to_string(),
4654                },
4655            )));
4656        };
4657        self.validate_alter_network_policy(session, &network_policy.rules)
4658    }
4659
4660    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4661    ///
4662    /// This helps prevent users from modifying network policies in a way that would lock out their
4663    /// current connection.
4664    fn validate_alter_network_policy(
4665        &self,
4666        session: &Session,
4667        policy_rules: &Vec<NetworkPolicyRule>,
4668    ) -> Result<(), AdapterError> {
4669        // If the user is not an internal user attempt to protect them from
4670        // blocking themselves.
4671        if session.user().is_internal() {
4672            return Ok(());
4673        }
4674        if let Some(ip) = session.meta().client_ip() {
4675            validate_ip_with_policy_rules(ip, policy_rules)
4676                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4677        } else {
4678            // Sessions without IPs are only temporarily constructed for default values
4679            // they should not be permitted here.
4680            return Err(AdapterError::NetworkPolicyDenied(
4681                NetworkPolicyError::MissingIp,
4682            ));
4683        }
4684        Ok(())
4685    }
4686
4687    // Returns the name of the portal to execute.
4688    #[instrument]
4689    pub(super) fn sequence_execute(
4690        &mut self,
4691        session: &mut Session,
4692        plan: plan::ExecutePlan,
4693    ) -> Result<String, AdapterError> {
4694        // Verify the stmt is still valid.
4695        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4696        let ps = session
4697            .get_prepared_statement_unverified(&plan.name)
4698            .expect("known to exist");
4699        let stmt = ps.stmt().cloned();
4700        let desc = ps.desc().clone();
4701        let state_revision = ps.state_revision;
4702        let logging = Arc::clone(ps.logging());
4703        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4704    }
4705
4706    #[instrument]
4707    pub(super) async fn sequence_grant_privileges(
4708        &mut self,
4709        session: &Session,
4710        plan::GrantPrivilegesPlan {
4711            update_privileges,
4712            grantees,
4713        }: plan::GrantPrivilegesPlan,
4714    ) -> Result<ExecuteResponse, AdapterError> {
4715        self.sequence_update_privileges(
4716            session,
4717            update_privileges,
4718            grantees,
4719            UpdatePrivilegeVariant::Grant,
4720        )
4721        .await
4722    }
4723
4724    #[instrument]
4725    pub(super) async fn sequence_revoke_privileges(
4726        &mut self,
4727        session: &Session,
4728        plan::RevokePrivilegesPlan {
4729            update_privileges,
4730            revokees,
4731        }: plan::RevokePrivilegesPlan,
4732    ) -> Result<ExecuteResponse, AdapterError> {
4733        self.sequence_update_privileges(
4734            session,
4735            update_privileges,
4736            revokees,
4737            UpdatePrivilegeVariant::Revoke,
4738        )
4739        .await
4740    }
4741
4742    #[instrument]
4743    async fn sequence_update_privileges(
4744        &mut self,
4745        session: &Session,
4746        update_privileges: Vec<UpdatePrivilege>,
4747        grantees: Vec<RoleId>,
4748        variant: UpdatePrivilegeVariant,
4749    ) -> Result<ExecuteResponse, AdapterError> {
4750        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4751        let mut warnings = Vec::new();
4752        let catalog = self.catalog().for_session(session);
4753
4754        for UpdatePrivilege {
4755            acl_mode,
4756            target_id,
4757            grantor,
4758        } in update_privileges
4759        {
4760            let actual_object_type = catalog.get_system_object_type(&target_id);
4761            // For all relations we allow all applicable table privileges, but send a warning if the
4762            // privilege isn't actually applicable to the object type.
4763            if actual_object_type.is_relation() {
4764                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4765                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4766                if !non_applicable_privileges.is_empty() {
4767                    let object_description =
4768                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4769                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4770                        non_applicable_privileges,
4771                        object_description,
4772                    })
4773                }
4774            }
4775
4776            if let SystemObjectId::Object(object_id) = &target_id {
4777                self.catalog()
4778                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4779            }
4780
4781            let privileges = self
4782                .catalog()
4783                .get_privileges(&target_id, session.conn_id())
4784                // Should be unreachable since the parser will refuse to parse grant/revoke
4785                // statements on objects without privileges.
4786                .ok_or(AdapterError::Unsupported(
4787                    "GRANTs/REVOKEs on an object type with no privileges",
4788                ))?;
4789
4790            for grantee in &grantees {
4791                self.catalog().ensure_not_system_role(grantee)?;
4792                self.catalog().ensure_not_predefined_role(grantee)?;
4793                let existing_privilege = privileges
4794                    .get_acl_item(grantee, &grantor)
4795                    .map(Cow::Borrowed)
4796                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4797
4798                match variant {
4799                    UpdatePrivilegeVariant::Grant
4800                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4801                    {
4802                        ops.push(catalog::Op::UpdatePrivilege {
4803                            target_id: target_id.clone(),
4804                            privilege: MzAclItem {
4805                                grantee: *grantee,
4806                                grantor,
4807                                acl_mode,
4808                            },
4809                            variant,
4810                        });
4811                    }
4812                    UpdatePrivilegeVariant::Revoke
4813                        if !existing_privilege
4814                            .acl_mode
4815                            .intersection(acl_mode)
4816                            .is_empty() =>
4817                    {
4818                        ops.push(catalog::Op::UpdatePrivilege {
4819                            target_id: target_id.clone(),
4820                            privilege: MzAclItem {
4821                                grantee: *grantee,
4822                                grantor,
4823                                acl_mode,
4824                            },
4825                            variant,
4826                        });
4827                    }
4828                    // no-op
4829                    _ => {}
4830                }
4831            }
4832        }
4833
4834        if ops.is_empty() {
4835            session.add_notices(warnings);
4836            return Ok(variant.into());
4837        }
4838
4839        let res = self
4840            .catalog_transact(Some(session), ops)
4841            .await
4842            .map(|_| match variant {
4843                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4844                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4845            });
4846        if res.is_ok() {
4847            session.add_notices(warnings);
4848        }
4849        res
4850    }
4851
4852    #[instrument]
4853    pub(super) async fn sequence_alter_default_privileges(
4854        &mut self,
4855        session: &Session,
4856        plan::AlterDefaultPrivilegesPlan {
4857            privilege_objects,
4858            privilege_acl_items,
4859            is_grant,
4860        }: plan::AlterDefaultPrivilegesPlan,
4861    ) -> Result<ExecuteResponse, AdapterError> {
4862        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4863        let variant = if is_grant {
4864            UpdatePrivilegeVariant::Grant
4865        } else {
4866            UpdatePrivilegeVariant::Revoke
4867        };
4868        for privilege_object in &privilege_objects {
4869            self.catalog()
4870                .ensure_not_system_role(&privilege_object.role_id)?;
4871            self.catalog()
4872                .ensure_not_predefined_role(&privilege_object.role_id)?;
4873            if let Some(database_id) = privilege_object.database_id {
4874                self.catalog()
4875                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4876            }
4877            if let Some(schema_id) = privilege_object.schema_id {
4878                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4879                let schema_spec: SchemaSpecifier = schema_id.into();
4880
4881                self.catalog().ensure_not_reserved_object(
4882                    &(database_spec, schema_spec).into(),
4883                    session.conn_id(),
4884                )?;
4885            }
4886            for privilege_acl_item in &privilege_acl_items {
4887                self.catalog()
4888                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4889                self.catalog()
4890                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4891                ops.push(catalog::Op::UpdateDefaultPrivilege {
4892                    privilege_object: privilege_object.clone(),
4893                    privilege_acl_item: privilege_acl_item.clone(),
4894                    variant,
4895                })
4896            }
4897        }
4898
4899        self.catalog_transact(Some(session), ops).await?;
4900        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4901    }
4902
4903    #[instrument]
4904    pub(super) async fn sequence_grant_role(
4905        &mut self,
4906        session: &Session,
4907        plan::GrantRolePlan {
4908            role_ids,
4909            member_ids,
4910            grantor_id,
4911        }: plan::GrantRolePlan,
4912    ) -> Result<ExecuteResponse, AdapterError> {
4913        let catalog = self.catalog();
4914        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4915        for role_id in role_ids {
4916            for member_id in &member_ids {
4917                let member_membership: BTreeSet<_> =
4918                    catalog.get_role(member_id).membership().keys().collect();
4919                if member_membership.contains(&role_id) {
4920                    let role_name = catalog.get_role(&role_id).name().to_string();
4921                    let member_name = catalog.get_role(member_id).name().to_string();
4922                    // We need this check so we don't accidentally return a success on a reserved role.
4923                    catalog.ensure_not_reserved_role(member_id)?;
4924                    catalog.ensure_grantable_role(&role_id)?;
4925                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4926                        role_name,
4927                        member_name,
4928                    });
4929                } else {
4930                    ops.push(catalog::Op::GrantRole {
4931                        role_id,
4932                        member_id: *member_id,
4933                        grantor_id,
4934                    });
4935                }
4936            }
4937        }
4938
4939        if ops.is_empty() {
4940            return Ok(ExecuteResponse::GrantedRole);
4941        }
4942
4943        self.catalog_transact(Some(session), ops)
4944            .await
4945            .map(|_| ExecuteResponse::GrantedRole)
4946    }
4947
4948    #[instrument]
4949    pub(super) async fn sequence_revoke_role(
4950        &mut self,
4951        session: &Session,
4952        plan::RevokeRolePlan {
4953            role_ids,
4954            member_ids,
4955            grantor_id,
4956        }: plan::RevokeRolePlan,
4957    ) -> Result<ExecuteResponse, AdapterError> {
4958        let catalog = self.catalog();
4959        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4960        for role_id in role_ids {
4961            for member_id in &member_ids {
4962                let member_membership: BTreeSet<_> =
4963                    catalog.get_role(member_id).membership().keys().collect();
4964                if !member_membership.contains(&role_id) {
4965                    let role_name = catalog.get_role(&role_id).name().to_string();
4966                    let member_name = catalog.get_role(member_id).name().to_string();
4967                    // We need this check so we don't accidentally return a success on a reserved role.
4968                    catalog.ensure_not_reserved_role(member_id)?;
4969                    catalog.ensure_grantable_role(&role_id)?;
4970                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4971                        role_name,
4972                        member_name,
4973                    });
4974                } else {
4975                    ops.push(catalog::Op::RevokeRole {
4976                        role_id,
4977                        member_id: *member_id,
4978                        grantor_id,
4979                    });
4980                }
4981            }
4982        }
4983
4984        if ops.is_empty() {
4985            return Ok(ExecuteResponse::RevokedRole);
4986        }
4987
4988        self.catalog_transact(Some(session), ops)
4989            .await
4990            .map(|_| ExecuteResponse::RevokedRole)
4991    }
4992
4993    #[instrument]
4994    pub(super) async fn sequence_alter_owner(
4995        &mut self,
4996        session: &Session,
4997        plan::AlterOwnerPlan {
4998            id,
4999            object_type,
5000            new_owner,
5001        }: plan::AlterOwnerPlan,
5002    ) -> Result<ExecuteResponse, AdapterError> {
5003        let mut ops = vec![catalog::Op::UpdateOwner {
5004            id: id.clone(),
5005            new_owner,
5006        }];
5007
5008        match &id {
5009            ObjectId::Item(global_id) => {
5010                let entry = self.catalog().get_entry(global_id);
5011
5012                // Cannot directly change the owner of an index.
5013                if entry.is_index() {
5014                    let name = self
5015                        .catalog()
5016                        .resolve_full_name(entry.name(), Some(session.conn_id()))
5017                        .to_string();
5018                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
5019                    return Ok(ExecuteResponse::AlteredObject(object_type));
5020                }
5021
5022                // Alter owner cascades down to dependent indexes.
5023                let dependent_index_ops = entry
5024                    .used_by()
5025                    .into_iter()
5026                    .filter(|id| self.catalog().get_entry(id).is_index())
5027                    .map(|id| catalog::Op::UpdateOwner {
5028                        id: ObjectId::Item(*id),
5029                        new_owner,
5030                    });
5031                ops.extend(dependent_index_ops);
5032
5033                // Alter owner cascades down to progress collections.
5034                let dependent_subsources =
5035                    entry
5036                        .progress_id()
5037                        .into_iter()
5038                        .map(|item_id| catalog::Op::UpdateOwner {
5039                            id: ObjectId::Item(item_id),
5040                            new_owner,
5041                        });
5042                ops.extend(dependent_subsources);
5043            }
5044            ObjectId::Cluster(cluster_id) => {
5045                let cluster = self.catalog().get_cluster(*cluster_id);
5046                // Alter owner cascades down to cluster replicas.
5047                let managed_cluster_replica_ops =
5048                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
5049                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
5050                        new_owner,
5051                    });
5052                ops.extend(managed_cluster_replica_ops);
5053            }
5054            _ => {}
5055        }
5056
5057        self.catalog_transact(Some(session), ops)
5058            .await
5059            .map(|_| ExecuteResponse::AlteredObject(object_type))
5060    }
5061
5062    #[instrument]
5063    pub(super) async fn sequence_reassign_owned(
5064        &mut self,
5065        session: &Session,
5066        plan::ReassignOwnedPlan {
5067            old_roles,
5068            new_role,
5069            reassign_ids,
5070        }: plan::ReassignOwnedPlan,
5071    ) -> Result<ExecuteResponse, AdapterError> {
5072        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5073            self.catalog().ensure_not_reserved_role(role_id)?;
5074        }
5075
5076        let ops = reassign_ids
5077            .into_iter()
5078            .map(|id| catalog::Op::UpdateOwner {
5079                id,
5080                new_owner: new_role,
5081            })
5082            .collect();
5083
5084        self.catalog_transact(Some(session), ops)
5085            .await
5086            .map(|_| ExecuteResponse::ReassignOwned)
5087    }
5088
5089    #[instrument]
5090    pub(crate) async fn handle_deferred_statement(&mut self) {
5091        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
5092        // was processed, removing the single element from deferred_statements, so it is expected
5093        // that this is sometimes empty.
5094        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5095            return;
5096        };
5097        match ps {
5098            crate::coord::PlanStatement::Statement { stmt, params } => {
5099                self.handle_execute_inner(stmt, params, ctx).await;
5100            }
5101            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5102                self.sequence_plan(ctx, plan, resolved_ids).await;
5103            }
5104        }
5105    }
5106
5107    #[instrument]
5108    // TODO(parkmycar): Remove this once we have an actual implementation.
5109    #[allow(clippy::unused_async)]
5110    pub(super) async fn sequence_alter_table(
5111        &mut self,
5112        ctx: &mut ExecuteContext,
5113        plan: plan::AlterTablePlan,
5114    ) -> Result<ExecuteResponse, AdapterError> {
5115        let plan::AlterTablePlan {
5116            relation_id,
5117            column_name,
5118            column_type,
5119            raw_sql_type,
5120        } = plan;
5121
5122        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
5123        let id_ts = self.get_catalog_write_ts().await;
5124        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5125        let ops = vec![catalog::Op::AlterAddColumn {
5126            id: relation_id,
5127            new_global_id,
5128            name: column_name,
5129            typ: column_type,
5130            sql: raw_sql_type,
5131        }];
5132
5133        let entry = self.catalog().get_entry(&relation_id);
5134        let CatalogItem::Table(table) = &entry.item else {
5135            let err = format!("expected table, found {:?}", entry.item);
5136            return Err(AdapterError::Internal(err));
5137        };
5138        // Expected schema version, before altering the table.
5139        let expected_version = table.desc.latest_version();
5140        let existing_global_id = table.global_id_writes();
5141
5142        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5143            Box::pin(async move {
5144                let entry = coord.catalog().get_entry(&relation_id);
5145                let CatalogItem::Table(table) = &entry.item else {
5146                    panic!("programming error, expected table found {:?}", entry.item);
5147                };
5148                let table = table.clone();
5149
5150                // Acquire a read hold on the original table for the duration of
5151                // the alter to prevent the since of the original table from
5152                // getting advanced, while the ALTER is running.
5153                let existing_table = crate::CollectionIdBundle {
5154                    storage_ids: btreeset![existing_global_id],
5155                    compute_ids: BTreeMap::new(),
5156                };
5157                let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5158
5159                let new_version = table.desc.latest_version();
5160                let new_desc = table
5161                    .desc
5162                    .at_version(RelationVersionSelector::Specific(new_version));
5163                let register_ts = coord.get_local_write_ts().await.timestamp;
5164
5165                // Alter the table description, creating a "new" collection.
5166                coord
5167                    .controller
5168                    .storage
5169                    .alter_table_desc(
5170                        existing_global_id,
5171                        new_global_id,
5172                        new_desc,
5173                        expected_version,
5174                        register_ts,
5175                    )
5176                    .await
5177                    .expect("failed to alter desc of table");
5178
5179                // Initialize the ReadPolicy which ensures we have the correct read holds.
5180                let compaction_window = table
5181                    .custom_logical_compaction_window
5182                    .unwrap_or(CompactionWindow::Default);
5183                coord
5184                    .initialize_read_policies(
5185                        &crate::CollectionIdBundle {
5186                            storage_ids: btreeset![new_global_id],
5187                            compute_ids: BTreeMap::new(),
5188                        },
5189                        compaction_window,
5190                    )
5191                    .await;
5192                coord.apply_local_write(register_ts).await;
5193
5194                // Alter is complete! We can drop our read hold.
5195                drop(existing_table_read_hold);
5196            })
5197        })
5198        .await?;
5199
5200        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5201    }
5202}
5203
5204#[derive(Debug)]
5205struct CachedStatisticsOracle {
5206    cache: BTreeMap<GlobalId, usize>,
5207}
5208
5209impl CachedStatisticsOracle {
5210    pub async fn new<T: TimelyTimestamp>(
5211        ids: &BTreeSet<GlobalId>,
5212        as_of: &Antichain<T>,
5213        storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5214    ) -> Result<Self, StorageError<T>> {
5215        let mut cache = BTreeMap::new();
5216
5217        for id in ids {
5218            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5219
5220            match stats {
5221                Ok(stats) => {
5222                    cache.insert(*id, stats.num_updates);
5223                }
5224                Err(StorageError::IdentifierMissing(id)) => {
5225                    ::tracing::debug!("no statistics for {id}")
5226                }
5227                Err(e) => return Err(e),
5228            }
5229        }
5230
5231        Ok(Self { cache })
5232    }
5233}
5234
5235impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5236    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5237        self.cache.get(&id).map(|estimate| *estimate)
5238    }
5239
5240    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5241        self.cache.clone()
5242    }
5243}
5244
5245impl Coordinator {
5246    pub(super) async fn statistics_oracle(
5247        &self,
5248        session: &Session,
5249        source_ids: &BTreeSet<GlobalId>,
5250        query_as_of: &Antichain<Timestamp>,
5251        is_oneshot: bool,
5252    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5253        if !session.vars().enable_session_cardinality_estimates() {
5254            return Ok(Box::new(EmptyStatisticsOracle));
5255        }
5256
5257        let timeout = if is_oneshot {
5258            // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
5259            self.catalog()
5260                .system_config()
5261                .optimizer_oneshot_stats_timeout()
5262        } else {
5263            self.catalog().system_config().optimizer_stats_timeout()
5264        };
5265
5266        let cached_stats = mz_ore::future::timeout(
5267            timeout,
5268            CachedStatisticsOracle::new(
5269                source_ids,
5270                query_as_of,
5271                self.controller.storage_collections.as_ref(),
5272            ),
5273        )
5274        .await;
5275
5276        match cached_stats {
5277            Ok(stats) => Ok(Box::new(stats)),
5278            Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5279                warn!(
5280                    is_oneshot = is_oneshot,
5281                    "optimizer statistics collection timed out after {}ms",
5282                    timeout.as_millis()
5283                );
5284
5285                Ok(Box::new(EmptyStatisticsOracle))
5286            }
5287            Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5288        }
5289    }
5290}
5291
5292/// Checks whether we should emit diagnostic
5293/// information associated with reading per-replica sources.
5294///
5295/// If an unrecoverable error is found (today: an untargeted read on a
5296/// cluster with a non-1 number of replicas), return that.  Otherwise,
5297/// return a list of associated notices (today: we always emit exactly
5298/// one notice if there are any per-replica log dependencies and if
5299/// `emit_introspection_query_notice` is set, and none otherwise.)
5300pub(super) fn check_log_reads(
5301    catalog: &Catalog,
5302    cluster: &Cluster,
5303    source_ids: &BTreeSet<GlobalId>,
5304    target_replica: &mut Option<ReplicaId>,
5305    vars: &SessionVars,
5306) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5307where
5308{
5309    let log_names = source_ids
5310        .iter()
5311        .map(|gid| catalog.resolve_item_id(gid))
5312        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5313        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5314        .collect::<Vec<_>>();
5315
5316    if log_names.is_empty() {
5317        return Ok(None);
5318    }
5319
5320    // Reading from log sources on replicated clusters is only allowed if a
5321    // target replica is selected. Otherwise, we have no way of knowing which
5322    // replica we read the introspection data from.
5323    let num_replicas = cluster.replicas().count();
5324    if target_replica.is_none() {
5325        if num_replicas == 1 {
5326            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5327        } else {
5328            return Err(AdapterError::UntargetedLogRead { log_names });
5329        }
5330    }
5331
5332    // Ensure that logging is initialized for the target replica, lest
5333    // we try to read from a non-existing arrangement.
5334    let replica_id = target_replica.expect("set to `Some` above");
5335    let replica = &cluster.replica(replica_id).expect("Replica must exist");
5336    if !replica.config.compute.logging.enabled() {
5337        return Err(AdapterError::IntrospectionDisabled { log_names });
5338    }
5339
5340    Ok(vars
5341        .emit_introspection_query_notice()
5342        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5343}
5344
5345impl Coordinator {
5346    /// Forward notices that we got from the optimizer.
5347    fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5348        // `for_session` below is expensive, so return early if there's nothing to do.
5349        if notices.is_empty() {
5350            return;
5351        }
5352        let humanizer = self.catalog.for_session(session);
5353        let system_vars = self.catalog.system_config();
5354        for notice in notices {
5355            let kind = OptimizerNoticeKind::from(notice);
5356            let notice_enabled = match kind {
5357                OptimizerNoticeKind::IndexAlreadyExists => {
5358                    system_vars.enable_notices_for_index_already_exists()
5359                }
5360                OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5361                    system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5362                }
5363                OptimizerNoticeKind::IndexKeyEmpty => {
5364                    system_vars.enable_notices_for_index_empty_key()
5365                }
5366            };
5367            if notice_enabled {
5368                // We don't need to redact the notice parts because
5369                // `emit_optimizer_notices` is only called by the `sequence_~`
5370                // method for the statement that produces that notice.
5371                session.add_notice(AdapterNotice::OptimizerNotice {
5372                    notice: notice.message(&humanizer, false).to_string(),
5373                    hint: notice.hint(&humanizer, false).to_string(),
5374                });
5375            }
5376            self.metrics
5377                .optimization_notices
5378                .with_label_values(&[kind.metric_label()])
5379                .inc_by(1);
5380        }
5381    }
5382
5383    /// Process the metainfo from a newly created non-transient dataflow.
5384    async fn process_dataflow_metainfo(
5385        &mut self,
5386        df_meta: DataflowMetainfo,
5387        export_id: GlobalId,
5388        ctx: Option<&mut ExecuteContext>,
5389        notice_ids: Vec<GlobalId>,
5390    ) -> Option<BuiltinTableAppendNotify> {
5391        // Emit raw notices to the user.
5392        if let Some(ctx) = ctx {
5393            self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5394        }
5395
5396        // Create a metainfo with rendered notices.
5397        let df_meta = self
5398            .catalog()
5399            .render_notices(df_meta, notice_ids, Some(export_id));
5400
5401        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
5402        // in-memory state.
5403        if self.catalog().state().system_config().enable_mz_notices()
5404            && !df_meta.optimizer_notices.is_empty()
5405        {
5406            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5407            self.catalog().state().pack_optimizer_notices(
5408                &mut builtin_table_updates,
5409                df_meta.optimizer_notices.iter(),
5410                Diff::ONE,
5411            );
5412
5413            // Save the metainfo.
5414            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5415
5416            Some(
5417                self.builtin_table_update()
5418                    .execute(builtin_table_updates)
5419                    .await
5420                    .0,
5421            )
5422        } else {
5423            // Save the metainfo.
5424            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5425
5426            None
5427        }
5428    }
5429}