mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27    CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46    CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47    RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50    AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
51    CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
52    SqlServerConfigOptionName,
53};
54use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
55use mz_sql::catalog::{
56    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
57    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
58    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
59};
60use mz_sql::names::{
61    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
62    SchemaSpecifier, SystemObjectId,
63};
64use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
68// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
69use mz_sql::plan::{
70    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
78    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85    WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_storage_types::stats::RelationPartStats;
93use mz_transform::EmptyStatisticsOracle;
94use mz_transform::dataflow::DataflowMetainfo;
95use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
96use smallvec::SmallVec;
97use timely::progress::Antichain;
98use timely::progress::Timestamp as TimelyTimestamp;
99use tokio::sync::{oneshot, watch};
100use tracing::{Instrument, Span, info, warn};
101
102use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
103use crate::command::{ExecuteResponse, Response};
104use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
105use crate::coord::{
106    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
107    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
108    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
109    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
110    validate_ip_with_policy_rules,
111};
112use crate::error::AdapterError;
113use crate::notice::{AdapterNotice, DroppedInUseIndex};
114use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
115use crate::optimize::{self, Optimize};
116use crate::session::{
117    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
118    WriteLocks, WriteOp,
119};
120use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
121use crate::{PeekResponseUnary, ReadHolds};
122
123mod cluster;
124mod copy_from;
125mod create_continual_task;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134/// Attempts to evaluate an expression. If an error is returned then the error is sent
135/// to the client and the function is exited.
136macro_rules! return_if_err {
137    ($expr:expr, $ctx:expr) => {
138        match $expr {
139            Ok(v) => v,
140            Err(e) => return $ctx.retire(Err(e.into())),
141        }
142    };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148    ops: Vec<catalog::Op>,
149    dropped_active_db: bool,
150    dropped_active_cluster: bool,
151    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154// A bundle of values returned from create_source_inner
155struct CreateSourceInner {
156    ops: Vec<catalog::Op>,
157    sources: Vec<(CatalogItemId, Source)>,
158    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
163    /// execute both on and off of the coordinator thread. Stages can either produce another stage
164    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
165    /// tracing.
166    pub(crate) async fn sequence_staged<S>(
167        &mut self,
168        mut ctx: S::Ctx,
169        parent_span: Span,
170        mut stage: S,
171    ) where
172        S: Staged + 'static,
173        S::Ctx: Send + 'static,
174    {
175        return_if_err!(stage.validity().check(self.catalog()), ctx);
176        loop {
177            let mut cancel_enabled = stage.cancel_enabled();
178            if let Some(session) = ctx.session() {
179                if cancel_enabled {
180                    // Channel to await cancellation. Insert a new channel, but check if the previous one
181                    // was already canceled.
182                    if let Some((_prev_tx, prev_rx)) = self
183                        .staged_cancellation
184                        .insert(session.conn_id().clone(), watch::channel(false))
185                    {
186                        let was_canceled = *prev_rx.borrow();
187                        if was_canceled {
188                            ctx.retire(Err(AdapterError::Canceled));
189                            return;
190                        }
191                    }
192                } else {
193                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
194                    // when cancel_enabled may have been true on an earlier stage.
195                    self.staged_cancellation.remove(session.conn_id());
196                }
197            } else {
198                cancel_enabled = false
199            };
200            let next = stage
201                .stage(self, &mut ctx)
202                .instrument(parent_span.clone())
203                .await;
204            let res = return_if_err!(next, ctx);
205            stage = match res {
206                StageResult::Handle(handle) => {
207                    let internal_cmd_tx = self.internal_cmd_tx.clone();
208                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210                    });
211                    return;
212                }
213                StageResult::HandleRetire(handle) => {
214                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215                        ctx.retire(Ok(resp));
216                    });
217                    return;
218                }
219                StageResult::Response(resp) => {
220                    ctx.retire(Ok(resp));
221                    return;
222                }
223                StageResult::Immediate(stage) => *stage,
224            }
225        }
226    }
227
228    fn handle_spawn<C, T, F>(
229        &self,
230        ctx: C,
231        handle: JoinHandle<Result<T, AdapterError>>,
232        cancel_enabled: bool,
233        f: F,
234    ) where
235        C: StagedContext + Send + 'static,
236        T: Send + 'static,
237        F: FnOnce(C, T) + Send + 'static,
238    {
239        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240            .session()
241            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242        {
243            let mut rx = rx.clone();
244            Box::pin(async move {
245                // Wait for true or dropped sender.
246                let _ = rx.wait_for(|v| *v).await;
247                ()
248            })
249        } else {
250            Box::pin(future::pending())
251        };
252        spawn(|| "sequence_staged", async move {
253            tokio::select! {
254                res = handle => {
255                    let next = match res {
256                        Ok(next) => return_if_err!(next, ctx),
257                        Err(err) => {
258                            tracing::error!("sequence_staged join error {err}");
259                            ctx.retire(Err(AdapterError::Internal(
260                                "sequence_staged join error".into(),
261                            )));
262                            return;
263                        }
264                    };
265                    f(ctx, next);
266                }
267                _ = rx, if cancel_enabled => {
268                    ctx.retire(Err(AdapterError::Canceled));
269                }
270            }
271        });
272    }
273
274    async fn create_source_inner(
275        &self,
276        session: &Session,
277        plans: Vec<plan::CreateSourcePlanBundle>,
278    ) -> Result<CreateSourceInner, AdapterError> {
279        let mut ops = vec![];
280        let mut sources = vec![];
281
282        let if_not_exists_ids = plans
283            .iter()
284            .filter_map(
285                |plan::CreateSourcePlanBundle {
286                     item_id,
287                     global_id: _,
288                     plan,
289                     resolved_ids: _,
290                     available_source_references: _,
291                 }| {
292                    if plan.if_not_exists {
293                        Some((*item_id, plan.name.clone()))
294                    } else {
295                        None
296                    }
297                },
298            )
299            .collect::<BTreeMap<_, _>>();
300
301        for plan::CreateSourcePlanBundle {
302            item_id,
303            global_id,
304            mut plan,
305            resolved_ids,
306            available_source_references,
307        } in plans
308        {
309            let name = plan.name.clone();
310
311            match plan.source.data_source {
312                plan::DataSourceDesc::Ingestion(ref desc)
313                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
314                    let cluster_id = plan
315                        .in_cluster
316                        .expect("ingestion plans must specify cluster");
317                    match desc.connection {
318                        GenericSourceConnection::Postgres(_)
319                        | GenericSourceConnection::MySql(_)
320                        | GenericSourceConnection::SqlServer(_)
321                        | GenericSourceConnection::Kafka(_)
322                        | GenericSourceConnection::LoadGenerator(_) => {
323                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
324                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
325                                    .get(self.catalog().system_config().dyncfgs());
326
327                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
328                                {
329                                    return Err(AdapterError::Unsupported(
330                                        "sources in clusters with >1 replicas",
331                                    ));
332                                }
333                            }
334                        }
335                    }
336                }
337                plan::DataSourceDesc::Webhook { .. } => {
338                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
339                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
340                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
341                            .get(self.catalog().system_config().dyncfgs());
342
343                        if !enable_multi_replica_sources {
344                            if cluster.replica_ids().len() > 1 {
345                                return Err(AdapterError::Unsupported(
346                                    "webhook sources in clusters with >1 replicas",
347                                ));
348                            }
349                        }
350                    }
351                }
352                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
353            }
354
355            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
356            if let mz_sql::plan::DataSourceDesc::Webhook {
357                validate_using: Some(validate),
358                ..
359            } = &mut plan.source.data_source
360            {
361                if let Err(reason) = validate.reduce_expression().await {
362                    self.metrics
363                        .webhook_validation_reduce_failures
364                        .with_label_values(&[reason])
365                        .inc();
366                    return Err(AdapterError::Internal(format!(
367                        "failed to reduce check expression, {reason}"
368                    )));
369                }
370            }
371
372            // If this source contained a set of available source references, update the
373            // source references catalog table.
374            let mut reference_ops = vec![];
375            if let Some(references) = &available_source_references {
376                reference_ops.push(catalog::Op::UpdateSourceReferences {
377                    source_id: item_id,
378                    references: references.clone().into(),
379                });
380            }
381
382            let source = Source::new(plan, global_id, resolved_ids, None, false);
383            ops.push(catalog::Op::CreateItem {
384                id: item_id,
385                name,
386                item: CatalogItem::Source(source.clone()),
387                owner_id: *session.current_role_id(),
388            });
389            sources.push((item_id, source));
390            // These operations must be executed after the source is added to the catalog.
391            ops.extend(reference_ops);
392        }
393
394        Ok(CreateSourceInner {
395            ops,
396            sources,
397            if_not_exists_ids,
398        })
399    }
400
401    /// Subsources are planned differently from other statements because they
402    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
403    /// Because of this, we have usually "missed" the opportunity to plan them
404    /// through the normal statement execution life cycle (the exception being
405    /// during bootstrapping).
406    ///
407    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
408    pub(crate) fn plan_subsource(
409        &self,
410        session: &Session,
411        params: &mz_sql::plan::Params,
412        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
413        item_id: CatalogItemId,
414        global_id: GlobalId,
415    ) -> Result<CreateSourcePlanBundle, AdapterError> {
416        let catalog = self.catalog().for_session(session);
417        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
418
419        let plan = self.plan_statement(
420            session,
421            Statement::CreateSubsource(subsource_stmt),
422            params,
423            &resolved_ids,
424        )?;
425        let plan = match plan {
426            Plan::CreateSource(plan) => plan,
427            _ => unreachable!(),
428        };
429        Ok(CreateSourcePlanBundle {
430            item_id,
431            global_id,
432            plan,
433            resolved_ids,
434            available_source_references: None,
435        })
436    }
437
438    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
439    pub(crate) async fn plan_purified_alter_source_add_subsource(
440        &mut self,
441        session: &Session,
442        params: Params,
443        source_name: ResolvedItemName,
444        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
445        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
446    ) -> Result<(Plan, ResolvedIds), AdapterError> {
447        let mut subsource_plans = Vec::with_capacity(subsources.len());
448
449        // Generate subsource statements
450        let conn_catalog = self.catalog().for_system_session();
451        let pcx = plan::PlanContext::zero();
452        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
453
454        let entry = self.catalog().get_entry(source_name.item_id());
455        let source = entry.source().ok_or_else(|| {
456            AdapterError::internal(
457                "plan alter source",
458                format!("expected Source found {entry:?}"),
459            )
460        })?;
461
462        let item_id = entry.id();
463        let ingestion_id = source.global_id();
464        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
465
466        let id_ts = self.get_catalog_write_ts().await;
467        let ids = self
468            .catalog_mut()
469            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
470            .await?;
471        for (subsource_stmt, (item_id, global_id)) in
472            subsource_stmts.into_iter().zip_eq(ids.into_iter())
473        {
474            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
475            subsource_plans.push(s);
476        }
477
478        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
479            subsources: subsource_plans,
480            options,
481        };
482
483        Ok((
484            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
485                item_id,
486                ingestion_id,
487                action,
488            }),
489            ResolvedIds::empty(),
490        ))
491    }
492
493    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
494    pub(crate) fn plan_purified_alter_source_refresh_references(
495        &self,
496        _session: &Session,
497        _params: Params,
498        source_name: ResolvedItemName,
499        available_source_references: plan::SourceReferences,
500    ) -> Result<(Plan, ResolvedIds), AdapterError> {
501        let entry = self.catalog().get_entry(source_name.item_id());
502        let source = entry.source().ok_or_else(|| {
503            AdapterError::internal(
504                "plan alter source",
505                format!("expected Source found {entry:?}"),
506            )
507        })?;
508        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
509            references: available_source_references,
510        };
511
512        Ok((
513            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
514                item_id: entry.id(),
515                ingestion_id: source.global_id(),
516                action,
517            }),
518            ResolvedIds::empty(),
519        ))
520    }
521
522    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
523    /// the primary source, and any ingestion export subsources (e.g. PG
524    /// tables).
525    pub(crate) async fn plan_purified_create_source(
526        &mut self,
527        ctx: &ExecuteContext,
528        params: Params,
529        progress_stmt: Option<CreateSubsourceStatement<Aug>>,
530        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
531        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
532        available_source_references: plan::SourceReferences,
533    ) -> Result<(Plan, ResolvedIds), AdapterError> {
534        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
535
536        // 1. First plan the progress subsource, if any.
537        if let Some(progress_stmt) = progress_stmt {
538            // The primary source depends on this subsource because the primary
539            // source needs to know its shard ID, and the easiest way of
540            // guaranteeing that the shard ID is discoverable is to create this
541            // collection first.
542            assert_none!(progress_stmt.of_source);
543            let id_ts = self.get_catalog_write_ts().await;
544            let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
545            let progress_plan =
546                self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
547            let progress_full_name = self
548                .catalog()
549                .resolve_full_name(&progress_plan.plan.name, None);
550            let progress_subsource = ResolvedItemName::Item {
551                id: progress_plan.item_id,
552                qualifiers: progress_plan.plan.name.qualifiers.clone(),
553                full_name: progress_full_name,
554                print_id: true,
555                version: RelationVersionSelector::Latest,
556            };
557
558            create_source_plans.push(progress_plan);
559
560            source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
561        }
562
563        let catalog = self.catalog().for_session(ctx.session());
564        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
565
566        let propagated_with_options: Vec<_> = source_stmt
567            .with_options
568            .iter()
569            .filter_map(|opt| match opt.name {
570                CreateSourceOptionName::TimestampInterval => None,
571                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
572                    name: CreateSubsourceOptionName::RetainHistory,
573                    value: opt.value.clone(),
574                }),
575            })
576            .collect();
577
578        // 2. Then plan the main source.
579        let source_plan = match self.plan_statement(
580            ctx.session(),
581            Statement::CreateSource(source_stmt),
582            &params,
583            &resolved_ids,
584        )? {
585            Plan::CreateSource(plan) => plan,
586            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
587        };
588
589        let id_ts = self.get_catalog_write_ts().await;
590        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
591
592        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
593        let of_source = ResolvedItemName::Item {
594            id: item_id,
595            qualifiers: source_plan.name.qualifiers.clone(),
596            full_name: source_full_name,
597            print_id: true,
598            version: RelationVersionSelector::Latest,
599        };
600
601        // Generate subsource statements
602        let conn_catalog = self.catalog().for_system_session();
603        let pcx = plan::PlanContext::zero();
604        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
605
606        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
607
608        for subsource_stmt in subsource_stmts.iter_mut() {
609            subsource_stmt
610                .with_options
611                .extend(propagated_with_options.iter().cloned())
612        }
613
614        create_source_plans.push(CreateSourcePlanBundle {
615            item_id,
616            global_id,
617            plan: source_plan,
618            resolved_ids: resolved_ids.clone(),
619            available_source_references: Some(available_source_references),
620        });
621
622        // 3. Finally, plan all the subsources
623        let id_ts = self.get_catalog_write_ts().await;
624        let ids = self
625            .catalog_mut()
626            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
627            .await?;
628        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
629            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
630            create_source_plans.push(plan);
631        }
632
633        Ok((
634            Plan::CreateSources(create_source_plans),
635            ResolvedIds::empty(),
636        ))
637    }
638
639    #[instrument]
640    pub(super) async fn sequence_create_source(
641        &mut self,
642        ctx: &mut ExecuteContext,
643        plans: Vec<plan::CreateSourcePlanBundle>,
644    ) -> Result<ExecuteResponse, AdapterError> {
645        let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
646        let CreateSourceInner {
647            ops,
648            sources,
649            if_not_exists_ids,
650        } = self.create_source_inner(ctx.session(), plans).await?;
651
652        let transact_result = self
653            .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
654                Box::pin(async move {
655                    // TODO(roshan): This will be unnecessary when we drop support for
656                    // automatically created subsources.
657                    // To improve the performance of creating a source and many
658                    // subsources, we create all of the collections at once. If we
659                    // created each collection independently, we would reschedule
660                    // the primary source's execution for each subsources, causing
661                    // unncessary thrashing.
662                    //
663                    // Note that creating all of the collections at once should
664                    // never be semantic bearing; we should always be able to break
665                    // this apart into creating the collections sequentially.
666                    let mut collections = Vec::with_capacity(sources.len());
667
668                    for (item_id, source) in sources {
669                        let source_status_item_id =
670                            coord.catalog().resolve_builtin_storage_collection(
671                                &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
672                            );
673                        let source_status_collection_id = Some(
674                            coord
675                                .catalog()
676                                .get_entry(&source_status_item_id)
677                                .latest_global_id(),
678                        );
679
680                        let (data_source, status_collection_id) = match source.data_source {
681                            DataSourceDesc::Ingestion { desc, cluster_id } => {
682                                let desc = desc.into_inline_connection(coord.catalog().state());
683                                let item_global_id =
684                                    coord.catalog().get_entry(&item_id).latest_global_id();
685
686                                let ingestion =
687                                    IngestionDescription::new(desc, cluster_id, item_global_id);
688
689                                (
690                                    DataSource::Ingestion(ingestion),
691                                    source_status_collection_id,
692                                )
693                            }
694                            DataSourceDesc::OldSyntaxIngestion {
695                                desc,
696                                progress_subsource,
697                                data_config,
698                                details,
699                                cluster_id,
700                            } => {
701                                let desc = desc.into_inline_connection(coord.catalog().state());
702                                let data_config =
703                                    data_config.into_inline_connection(coord.catalog().state());
704                                // TODO(parkmycar): We should probably check the type here, but I'm not
705                                // sure if this will always be a Source or a Table.
706                                let progress_subsource = coord
707                                    .catalog()
708                                    .get_entry(&progress_subsource)
709                                    .latest_global_id();
710
711                                let mut ingestion =
712                                    IngestionDescription::new(desc, cluster_id, progress_subsource);
713                                let legacy_export = SourceExport {
714                                    storage_metadata: (),
715                                    data_config,
716                                    details,
717                                };
718                                ingestion
719                                    .source_exports
720                                    .insert(source.global_id, legacy_export);
721
722                                (
723                                    DataSource::Ingestion(ingestion),
724                                    source_status_collection_id,
725                                )
726                            }
727                            DataSourceDesc::IngestionExport {
728                                ingestion_id,
729                                external_reference: _,
730                                details,
731                                data_config,
732                            } => {
733                                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
734                                // this will always be a Source or a Table.
735                                let ingestion_id =
736                                    coord.catalog().get_entry(&ingestion_id).latest_global_id();
737                                (
738                                    DataSource::IngestionExport {
739                                        ingestion_id,
740                                        details,
741                                        data_config: data_config
742                                            .into_inline_connection(coord.catalog().state()),
743                                    },
744                                    source_status_collection_id,
745                                )
746                            }
747                            DataSourceDesc::Progress => (DataSource::Progress, None),
748                            DataSourceDesc::Webhook { .. } => {
749                                if let Some(url) =
750                                    coord.catalog().state().try_get_webhook_url(&item_id)
751                                {
752                                    if let Some(ctx) = ctx.as_ref() {
753                                        ctx.session()
754                                            .add_notice(AdapterNotice::WebhookSourceCreated { url })
755                                    }
756                                }
757
758                                (DataSource::Webhook, None)
759                            }
760                            DataSourceDesc::Introspection(_) => {
761                                unreachable!(
762                                    "cannot create sources with introspection data sources"
763                                )
764                            }
765                        };
766
767                        collections.push((
768                            source.global_id,
769                            CollectionDescription::<Timestamp> {
770                                desc: source.desc.clone(),
771                                data_source,
772                                timeline: Some(source.timeline),
773                                since: None,
774                                status_collection_id,
775                            },
776                        ));
777                    }
778
779                    let storage_metadata = coord.catalog.state().storage_metadata();
780
781                    coord
782                        .controller
783                        .storage
784                        .create_collections(storage_metadata, None, collections)
785                        .await
786                        .unwrap_or_terminate("cannot fail to create collections");
787
788                    // It is _very_ important that we only initialize read policies
789                    // after we have created all the sources/collections. Some of
790                    // the sources created in this collection might have
791                    // dependencies on other sources, so the controller must get a
792                    // chance to install read holds before we set a policy that
793                    // might make the since advance.
794                    //
795                    // One instance of this is the remap shard: it presents as a
796                    // SUBSOURCE, and all other SUBSOURCES of a SOURCE will depend
797                    // on it. Both subsources and sources will show up as a `Source`
798                    // in the above.
799                    // Although there should only be one parent source that sequence_create_source is
800                    // ever called with, hedge our bets a bit and collect the compaction windows for
801                    // each id in the bundle (these should all be identical). This is some extra work
802                    // but seems safer.
803                    let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
804                    for (compaction_window, storage_policies) in read_policies {
805                        coord
806                            .initialize_storage_read_policies(storage_policies, compaction_window)
807                            .await;
808                    }
809                })
810            })
811            .await;
812
813        match transact_result {
814            Ok(()) => Ok(ExecuteResponse::CreatedSource),
815            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
816                kind:
817                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
818            })) if if_not_exists_ids.contains_key(&id) => {
819                ctx.session()
820                    .add_notice(AdapterNotice::ObjectAlreadyExists {
821                        name: if_not_exists_ids[&id].item.clone(),
822                        ty: "source",
823                    });
824                Ok(ExecuteResponse::CreatedSource)
825            }
826            Err(err) => Err(err),
827        }
828    }
829
830    #[instrument]
831    pub(super) async fn sequence_create_connection(
832        &mut self,
833        mut ctx: ExecuteContext,
834        plan: plan::CreateConnectionPlan,
835        resolved_ids: ResolvedIds,
836    ) {
837        let id_ts = self.get_catalog_write_ts().await;
838        let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
839        {
840            Ok(item_id) => item_id,
841            Err(err) => return ctx.retire(Err(err.into())),
842        };
843
844        match &plan.connection.details {
845            ConnectionDetails::Ssh { key_1, key_2, .. } => {
846                let key_1 = match key_1.as_key_pair() {
847                    Some(key_1) => key_1.clone(),
848                    None => {
849                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
850                            "the PUBLIC KEY 1 option cannot be explicitly specified"
851                        ))));
852                    }
853                };
854
855                let key_2 = match key_2.as_key_pair() {
856                    Some(key_2) => key_2.clone(),
857                    None => {
858                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
859                            "the PUBLIC KEY 2 option cannot be explicitly specified"
860                        ))));
861                    }
862                };
863
864                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
865                let secret = key_set.to_bytes();
866                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
867                    return ctx.retire(Err(err.into()));
868                }
869            }
870            _ => (),
871        };
872
873        if plan.validate {
874            let internal_cmd_tx = self.internal_cmd_tx.clone();
875            let transient_revision = self.catalog().transient_revision();
876            let conn_id = ctx.session().conn_id().clone();
877            let otel_ctx = OpenTelemetryContext::obtain();
878            let role_metadata = ctx.session().role_metadata().clone();
879
880            let connection = plan
881                .connection
882                .details
883                .to_connection()
884                .into_inline_connection(self.catalog().state());
885
886            let current_storage_parameters = self.controller.storage.config().clone();
887            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
888                let result = match connection
889                    .validate(connection_id, &current_storage_parameters)
890                    .await
891                {
892                    Ok(()) => Ok(plan),
893                    Err(err) => Err(err.into()),
894                };
895
896                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
897                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
898                    CreateConnectionValidationReady {
899                        ctx,
900                        result,
901                        connection_id,
902                        connection_gid,
903                        plan_validity: PlanValidity::new(
904                            transient_revision,
905                            resolved_ids.items().copied().collect(),
906                            None,
907                            None,
908                            role_metadata,
909                        ),
910                        otel_ctx,
911                        resolved_ids: resolved_ids.clone(),
912                    },
913                ));
914                if let Err(e) = result {
915                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
916                }
917            });
918        } else {
919            let result = self
920                .sequence_create_connection_stage_finish(
921                    &mut ctx,
922                    connection_id,
923                    connection_gid,
924                    plan,
925                    resolved_ids,
926                )
927                .await;
928            ctx.retire(result);
929        }
930    }
931
932    #[instrument]
933    pub(crate) async fn sequence_create_connection_stage_finish(
934        &mut self,
935        ctx: &mut ExecuteContext,
936        connection_id: CatalogItemId,
937        connection_gid: GlobalId,
938        plan: plan::CreateConnectionPlan,
939        resolved_ids: ResolvedIds,
940    ) -> Result<ExecuteResponse, AdapterError> {
941        let ops = vec![catalog::Op::CreateItem {
942            id: connection_id,
943            name: plan.name.clone(),
944            item: CatalogItem::Connection(Connection {
945                create_sql: plan.connection.create_sql,
946                global_id: connection_gid,
947                details: plan.connection.details.clone(),
948                resolved_ids,
949            }),
950            owner_id: *ctx.session().current_role_id(),
951        }];
952
953        let transact_result = self
954            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
955                Box::pin(async move {
956                    match plan.connection.details {
957                        ConnectionDetails::AwsPrivatelink(ref privatelink) => {
958                            let spec = VpcEndpointConfig {
959                                aws_service_name: privatelink.service_name.to_owned(),
960                                availability_zone_ids: privatelink.availability_zones.to_owned(),
961                            };
962                            let cloud_resource_controller =
963                                match coord.cloud_resource_controller.as_ref().cloned() {
964                                    Some(controller) => controller,
965                                    None => {
966                                        tracing::warn!("AWS PrivateLink connections unsupported");
967                                        return;
968                                    }
969                                };
970                            if let Err(err) = cloud_resource_controller
971                                .ensure_vpc_endpoint(connection_id, spec)
972                                .await
973                            {
974                                tracing::warn!(?err, "failed to ensure vpc endpoint!");
975                            }
976                        }
977                        _ => {}
978                    }
979                })
980            })
981            .await;
982
983        match transact_result {
984            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
985            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
986                kind:
987                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
988            })) if plan.if_not_exists => {
989                ctx.session()
990                    .add_notice(AdapterNotice::ObjectAlreadyExists {
991                        name: plan.name.item,
992                        ty: "connection",
993                    });
994                Ok(ExecuteResponse::CreatedConnection)
995            }
996            Err(err) => Err(err),
997        }
998    }
999
1000    #[instrument]
1001    pub(super) async fn sequence_create_database(
1002        &mut self,
1003        session: &mut Session,
1004        plan: plan::CreateDatabasePlan,
1005    ) -> Result<ExecuteResponse, AdapterError> {
1006        let ops = vec![catalog::Op::CreateDatabase {
1007            name: plan.name.clone(),
1008            owner_id: *session.current_role_id(),
1009        }];
1010        match self.catalog_transact(Some(session), ops).await {
1011            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
1012            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1013                kind:
1014                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
1015            })) if plan.if_not_exists => {
1016                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
1017                Ok(ExecuteResponse::CreatedDatabase)
1018            }
1019            Err(err) => Err(err),
1020        }
1021    }
1022
1023    #[instrument]
1024    pub(super) async fn sequence_create_schema(
1025        &mut self,
1026        session: &mut Session,
1027        plan: plan::CreateSchemaPlan,
1028    ) -> Result<ExecuteResponse, AdapterError> {
1029        let op = catalog::Op::CreateSchema {
1030            database_id: plan.database_spec,
1031            schema_name: plan.schema_name.clone(),
1032            owner_id: *session.current_role_id(),
1033        };
1034        match self.catalog_transact(Some(session), vec![op]).await {
1035            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1036            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1037                kind:
1038                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1039            })) if plan.if_not_exists => {
1040                session.add_notice(AdapterNotice::SchemaAlreadyExists {
1041                    name: plan.schema_name,
1042                });
1043                Ok(ExecuteResponse::CreatedSchema)
1044            }
1045            Err(err) => Err(err),
1046        }
1047    }
1048
1049    /// Validates the role attributes for a `CREATE ROLE` statement.
1050    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
1051        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1052            if attributes.superuser.is_some()
1053                || attributes.password.is_some()
1054                || attributes.login.is_some()
1055            {
1056                return Err(AdapterError::UnavailableFeature {
1057                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1058                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1059                });
1060            }
1061        }
1062        Ok(())
1063    }
1064
1065    #[instrument]
1066    pub(super) async fn sequence_create_role(
1067        &mut self,
1068        conn_id: Option<&ConnectionId>,
1069        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1070    ) -> Result<ExecuteResponse, AdapterError> {
1071        self.validate_role_attributes(&attributes.clone())?;
1072        let op = catalog::Op::CreateRole { name, attributes };
1073        self.catalog_transact_conn(conn_id, vec![op])
1074            .await
1075            .map(|_| ExecuteResponse::CreatedRole)
1076    }
1077
1078    #[instrument]
1079    pub(super) async fn sequence_create_network_policy(
1080        &mut self,
1081        session: &Session,
1082        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1083    ) -> Result<ExecuteResponse, AdapterError> {
1084        let op = catalog::Op::CreateNetworkPolicy {
1085            rules,
1086            name,
1087            owner_id: *session.current_role_id(),
1088        };
1089        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1090            .await
1091            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1092    }
1093
1094    #[instrument]
1095    pub(super) async fn sequence_alter_network_policy(
1096        &mut self,
1097        session: &Session,
1098        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1099    ) -> Result<ExecuteResponse, AdapterError> {
1100        // TODO(network_policy): Consider role based network policies here.
1101        let current_network_policy_name = self
1102            .owned_catalog()
1103            .system_config()
1104            .default_network_policy_name();
1105        // Check if the way we're alerting the policy is still valid for the current connection.
1106        if current_network_policy_name == name {
1107            self.validate_alter_network_policy(session, &rules)?;
1108        }
1109
1110        let op = catalog::Op::AlterNetworkPolicy {
1111            id,
1112            rules,
1113            name,
1114            owner_id: *session.current_role_id(),
1115        };
1116        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1117            .await
1118            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1119    }
1120
1121    #[instrument]
1122    pub(super) async fn sequence_create_table(
1123        &mut self,
1124        ctx: &mut ExecuteContext,
1125        plan: plan::CreateTablePlan,
1126        resolved_ids: ResolvedIds,
1127    ) -> Result<ExecuteResponse, AdapterError> {
1128        let plan::CreateTablePlan {
1129            name,
1130            table,
1131            if_not_exists,
1132        } = plan;
1133
1134        let conn_id = if table.temporary {
1135            Some(ctx.session().conn_id())
1136        } else {
1137            None
1138        };
1139        let id_ts = self.get_catalog_write_ts().await;
1140        let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1141        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1142
1143        let data_source = match table.data_source {
1144            plan::TableDataSource::TableWrites { defaults } => {
1145                TableDataSource::TableWrites { defaults }
1146            }
1147            plan::TableDataSource::DataSource {
1148                desc: data_source_plan,
1149                timeline,
1150            } => match data_source_plan {
1151                plan::DataSourceDesc::IngestionExport {
1152                    ingestion_id,
1153                    external_reference,
1154                    details,
1155                    data_config,
1156                } => TableDataSource::DataSource {
1157                    desc: DataSourceDesc::IngestionExport {
1158                        ingestion_id,
1159                        external_reference,
1160                        details,
1161                        data_config,
1162                    },
1163                    timeline,
1164                },
1165                plan::DataSourceDesc::Webhook {
1166                    validate_using,
1167                    body_format,
1168                    headers,
1169                    cluster_id,
1170                } => TableDataSource::DataSource {
1171                    desc: DataSourceDesc::Webhook {
1172                        validate_using,
1173                        body_format,
1174                        headers,
1175                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1176                    },
1177                    timeline,
1178                },
1179                o => {
1180                    unreachable!("CREATE TABLE data source got {:?}", o)
1181                }
1182            },
1183        };
1184        let table = Table {
1185            create_sql: Some(table.create_sql),
1186            desc: table.desc,
1187            collections,
1188            conn_id: conn_id.cloned(),
1189            resolved_ids,
1190            custom_logical_compaction_window: table.compaction_window,
1191            is_retained_metrics_object: false,
1192            data_source,
1193        };
1194        let ops = vec![catalog::Op::CreateItem {
1195            id: table_id,
1196            name: name.clone(),
1197            item: CatalogItem::Table(table.clone()),
1198            owner_id: *ctx.session().current_role_id(),
1199        }];
1200
1201        let catalog_result = self
1202            .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1203                Box::pin(async move {
1204                    // The table data_source determines whether this table will be written to
1205                    // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1206                    // (e.g. a source-fed table).
1207                    let (collections, register_ts, read_policies) = match table.data_source {
1208                        TableDataSource::TableWrites { defaults: _ } => {
1209                            // Determine the initial validity for the table.
1210                            let register_ts = coord.get_local_write_ts().await.timestamp;
1211
1212                            // After acquiring `register_ts` but before using it, we need to
1213                            // be sure we're still the leader. Otherwise a new generation
1214                            // may also be trying to use `register_ts` for a different
1215                            // purpose.
1216                            //
1217                            // See database-issues#8273.
1218                            coord
1219                                .catalog
1220                                .confirm_leadership()
1221                                .await
1222                                .unwrap_or_terminate("unable to confirm leadership");
1223
1224                            if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1225                                coord.set_statement_execution_timestamp(id, register_ts);
1226                            }
1227
1228                            // When initially creating a table it should only have a single version.
1229                            let relation_version = RelationVersion::root();
1230                            assert_eq!(table.desc.latest_version(), relation_version);
1231                            let relation_desc = table
1232                                .desc
1233                                .at_version(RelationVersionSelector::Specific(relation_version));
1234                            // We assert above we have a single version, and thus we are the primary.
1235                            let collection_desc =
1236                                CollectionDescription::for_table(relation_desc, None);
1237                            let collections = vec![(global_id, collection_desc)];
1238
1239                            let compaction_window = table
1240                                .custom_logical_compaction_window
1241                                .unwrap_or(CompactionWindow::Default);
1242                            let read_policies =
1243                                BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1244
1245                            (collections, Some(register_ts), read_policies)
1246                        }
1247                        TableDataSource::DataSource {
1248                            desc: data_source,
1249                            timeline,
1250                        } => {
1251                            match data_source {
1252                                DataSourceDesc::IngestionExport {
1253                                    ingestion_id,
1254                                    external_reference: _,
1255                                    details,
1256                                    data_config,
1257                                } => {
1258                                    // TODO: It's a little weird that a table will be present in this
1259                                    // source status collection, we might want to split out into a separate
1260                                    // status collection.
1261                                    let source_status_item_id =
1262                                        coord.catalog().resolve_builtin_storage_collection(
1263                                            &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1264                                        );
1265                                    let status_collection_id = Some(
1266                                        coord
1267                                            .catalog()
1268                                            .get_entry(&source_status_item_id)
1269                                            .latest_global_id(),
1270                                    );
1271                                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1272                                    // this will always be a Source or a Table.
1273                                    let ingestion_id =
1274                                        coord.catalog().get_entry(&ingestion_id).latest_global_id();
1275                                    // Create the underlying collection with the latest schema from the Table.
1276                                    let collection_desc = CollectionDescription::<Timestamp> {
1277                                        desc: table
1278                                            .desc
1279                                            .at_version(RelationVersionSelector::Latest),
1280                                        data_source: DataSource::IngestionExport {
1281                                            ingestion_id,
1282                                            details,
1283                                            data_config: data_config
1284                                                .into_inline_connection(coord.catalog.state()),
1285                                        },
1286                                        since: None,
1287                                        status_collection_id,
1288                                        timeline: Some(timeline.clone()),
1289                                    };
1290
1291                                    let collections = vec![(global_id, collection_desc)];
1292                                    let read_policies = coord
1293                                        .catalog()
1294                                        .state()
1295                                        .source_compaction_windows(vec![table_id]);
1296
1297                                    (collections, None, read_policies)
1298                                }
1299                                DataSourceDesc::Webhook { .. } => {
1300                                    if let Some(url) =
1301                                        coord.catalog().state().try_get_webhook_url(&table_id)
1302                                    {
1303                                        if let Some(ctx) = ctx.as_ref() {
1304                                            ctx.session().add_notice(
1305                                                AdapterNotice::WebhookSourceCreated { url },
1306                                            )
1307                                        }
1308                                    }
1309
1310                                    // Create the underlying collection with the latest schema from the Table.
1311                                    assert_eq!(
1312                                        table.desc.latest_version(),
1313                                        RelationVersion::root(),
1314                                        "found webhook with more than 1 relation version, {:?}",
1315                                        table.desc
1316                                    );
1317                                    let desc = table.desc.latest();
1318
1319                                    let collection_desc = CollectionDescription {
1320                                        desc,
1321                                        data_source: DataSource::Webhook,
1322                                        since: None,
1323                                        status_collection_id: None,
1324                                        timeline: Some(timeline.clone()),
1325                                    };
1326                                    let collections = vec![(global_id, collection_desc)];
1327                                    let read_policies = coord
1328                                        .catalog()
1329                                        .state()
1330                                        .source_compaction_windows(vec![table_id]);
1331
1332                                    (collections, None, read_policies)
1333                                }
1334                                _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1335                            }
1336                        }
1337                    };
1338
1339                    // Create the collections.
1340                    let storage_metadata = coord.catalog.state().storage_metadata();
1341                    coord
1342                        .controller
1343                        .storage
1344                        .create_collections(storage_metadata, register_ts, collections)
1345                        .await
1346                        .unwrap_or_terminate("cannot fail to create collections");
1347
1348                    // Mark the register timestamp as completed.
1349                    if let Some(register_ts) = register_ts {
1350                        coord.apply_local_write(register_ts).await;
1351                    }
1352
1353                    // Initialize the Read Policies.
1354                    for (compaction_window, storage_policies) in read_policies {
1355                        coord
1356                            .initialize_storage_read_policies(storage_policies, compaction_window)
1357                            .await;
1358                    }
1359                })
1360            })
1361            .await;
1362
1363        match catalog_result {
1364            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1365            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1366                kind:
1367                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1368            })) if if_not_exists => {
1369                ctx.session_mut()
1370                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1371                        name: name.item,
1372                        ty: "table",
1373                    });
1374                Ok(ExecuteResponse::CreatedTable)
1375            }
1376            Err(err) => Err(err),
1377        }
1378    }
1379
1380    #[instrument]
1381    pub(super) async fn sequence_create_sink(
1382        &mut self,
1383        ctx: ExecuteContext,
1384        plan: plan::CreateSinkPlan,
1385        resolved_ids: ResolvedIds,
1386    ) {
1387        let plan::CreateSinkPlan {
1388            name,
1389            sink,
1390            with_snapshot,
1391            if_not_exists,
1392            in_cluster,
1393        } = plan;
1394
1395        // First try to allocate an ID and an OID. If either fails, we're done.
1396        let id_ts = self.get_catalog_write_ts().await;
1397        let (item_id, global_id) =
1398            return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1399
1400        let catalog_sink = Sink {
1401            create_sql: sink.create_sql,
1402            global_id,
1403            from: sink.from,
1404            connection: sink.connection,
1405            envelope: sink.envelope,
1406            version: sink.version,
1407            with_snapshot,
1408            resolved_ids,
1409            cluster_id: in_cluster,
1410        };
1411
1412        let ops = vec![catalog::Op::CreateItem {
1413            id: item_id,
1414            name: name.clone(),
1415            item: CatalogItem::Sink(catalog_sink.clone()),
1416            owner_id: *ctx.session().current_role_id(),
1417        }];
1418
1419        let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1420        if let Err(e) = self
1421            .controller
1422            .storage
1423            .check_exists(sink.from)
1424            .map_err(|e| match e {
1425                StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1426                    "{} is a {}, which cannot be exported as a sink",
1427                    from.name().item.clone(),
1428                    from.item().typ(),
1429                )),
1430                e => AdapterError::Storage(e),
1431            })
1432        {
1433            ctx.retire(Err(e));
1434            return;
1435        }
1436
1437        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1438
1439        match result {
1440            Ok(()) => {}
1441            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1442                kind:
1443                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1444            })) if if_not_exists => {
1445                ctx.session()
1446                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1447                        name: name.item,
1448                        ty: "sink",
1449                    });
1450                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1451                return;
1452            }
1453            Err(e) => {
1454                ctx.retire(Err(e));
1455                return;
1456            }
1457        };
1458
1459        self.create_storage_export(global_id, &catalog_sink)
1460            .await
1461            .unwrap_or_terminate("cannot fail to create exports");
1462
1463        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1464            .await;
1465
1466        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1467    }
1468
1469    /// Validates that a view definition does not contain any expressions that may lead to
1470    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1471    ///
1472    /// We prevent these expressions so that we can add columns to system tables without
1473    /// changing the definition of the view.
1474    ///
1475    /// Here is a bit of a hand wavy proof as to why we only need to check the
1476    /// immediate view definition for system objects and ambiguous column
1477    /// references, and not the entire dependency tree:
1478    ///
1479    ///   - A view with no object references cannot have any ambiguous column
1480    ///   references to a system object, because it has no system objects.
1481    ///   - A view with a direct reference to a system object and a * or
1482    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1483    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1484    ///   any ambiguous column references to a system object, because all column
1485    ///   references are explicitly named.
1486    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1487    ///   system object cannot have any ambiguous column references to a system
1488    ///   object, because there are no system objects in the top level view and
1489    ///   all sub-views are guaranteed to have no ambiguous column references to
1490    ///   system objects.
1491    pub(super) fn validate_system_column_references(
1492        &self,
1493        uses_ambiguous_columns: bool,
1494        depends_on: &BTreeSet<GlobalId>,
1495    ) -> Result<(), AdapterError> {
1496        if uses_ambiguous_columns
1497            && depends_on
1498                .iter()
1499                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1500        {
1501            Err(AdapterError::AmbiguousSystemColumnReference)
1502        } else {
1503            Ok(())
1504        }
1505    }
1506
1507    #[instrument]
1508    pub(super) async fn sequence_create_type(
1509        &mut self,
1510        session: &Session,
1511        plan: plan::CreateTypePlan,
1512        resolved_ids: ResolvedIds,
1513    ) -> Result<ExecuteResponse, AdapterError> {
1514        let id_ts = self.get_catalog_write_ts().await;
1515        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1516        let typ = Type {
1517            create_sql: Some(plan.typ.create_sql),
1518            global_id,
1519            desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1520            details: CatalogTypeDetails {
1521                array_id: None,
1522                typ: plan.typ.inner,
1523                pg_metadata: None,
1524            },
1525            resolved_ids,
1526        };
1527        let op = catalog::Op::CreateItem {
1528            id: item_id,
1529            name: plan.name,
1530            item: CatalogItem::Type(typ),
1531            owner_id: *session.current_role_id(),
1532        };
1533        match self.catalog_transact(Some(session), vec![op]).await {
1534            Ok(()) => Ok(ExecuteResponse::CreatedType),
1535            Err(err) => Err(err),
1536        }
1537    }
1538
1539    #[instrument]
1540    pub(super) async fn sequence_comment_on(
1541        &mut self,
1542        session: &Session,
1543        plan: plan::CommentPlan,
1544    ) -> Result<ExecuteResponse, AdapterError> {
1545        let op = catalog::Op::Comment {
1546            object_id: plan.object_id,
1547            sub_component: plan.sub_component,
1548            comment: plan.comment,
1549        };
1550        self.catalog_transact(Some(session), vec![op]).await?;
1551        Ok(ExecuteResponse::Comment)
1552    }
1553
1554    #[instrument]
1555    pub(super) async fn sequence_drop_objects(
1556        &mut self,
1557        session: &Session,
1558        plan::DropObjectsPlan {
1559            drop_ids,
1560            object_type,
1561            referenced_ids,
1562        }: plan::DropObjectsPlan,
1563    ) -> Result<ExecuteResponse, AdapterError> {
1564        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1565        let mut objects = Vec::new();
1566        for obj_id in &drop_ids {
1567            if !referenced_ids_hashset.contains(obj_id) {
1568                let object_info = ErrorMessageObjectDescription::from_id(
1569                    obj_id,
1570                    &self.catalog().for_session(session),
1571                )
1572                .to_string();
1573                objects.push(object_info);
1574            }
1575        }
1576
1577        if !objects.is_empty() {
1578            session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1579        }
1580
1581        let DropOps {
1582            ops,
1583            dropped_active_db,
1584            dropped_active_cluster,
1585            dropped_in_use_indexes,
1586        } = self.sequence_drop_common(session, drop_ids)?;
1587
1588        self.catalog_transact(Some(session), ops).await?;
1589
1590        fail::fail_point!("after_sequencer_drop_replica");
1591
1592        if dropped_active_db {
1593            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1594                name: session.vars().database().to_string(),
1595            });
1596        }
1597        if dropped_active_cluster {
1598            session.add_notice(AdapterNotice::DroppedActiveCluster {
1599                name: session.vars().cluster().to_string(),
1600            });
1601        }
1602        for dropped_in_use_index in dropped_in_use_indexes {
1603            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1604            self.metrics
1605                .optimization_notices
1606                .with_label_values(&["DroppedInUseIndex"])
1607                .inc_by(1);
1608        }
1609        Ok(ExecuteResponse::DroppedObject(object_type))
1610    }
1611
1612    fn validate_dropped_role_ownership(
1613        &self,
1614        session: &Session,
1615        dropped_roles: &BTreeMap<RoleId, &str>,
1616    ) -> Result<(), AdapterError> {
1617        fn privilege_check(
1618            privileges: &PrivilegeMap,
1619            dropped_roles: &BTreeMap<RoleId, &str>,
1620            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1621            object_id: &SystemObjectId,
1622            catalog: &ConnCatalog,
1623        ) {
1624            for privilege in privileges.all_values() {
1625                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1626                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1627                    let object_description =
1628                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1629                    dependent_objects
1630                        .entry(role_name.to_string())
1631                        .or_default()
1632                        .push(format!(
1633                            "privileges on {object_description} granted by {grantor_name}",
1634                        ));
1635                }
1636                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1637                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1638                    let object_description =
1639                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1640                    dependent_objects
1641                        .entry(role_name.to_string())
1642                        .or_default()
1643                        .push(format!(
1644                            "privileges granted on {object_description} to {grantee_name}"
1645                        ));
1646                }
1647            }
1648        }
1649
1650        let catalog = self.catalog().for_session(session);
1651        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1652        for entry in self.catalog.entries() {
1653            let id = SystemObjectId::Object(entry.id().into());
1654            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1655                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1656                dependent_objects
1657                    .entry(role_name.to_string())
1658                    .or_default()
1659                    .push(format!("owner of {object_description}"));
1660            }
1661            privilege_check(
1662                entry.privileges(),
1663                dropped_roles,
1664                &mut dependent_objects,
1665                &id,
1666                &catalog,
1667            );
1668        }
1669        for database in self.catalog.databases() {
1670            let database_id = SystemObjectId::Object(database.id().into());
1671            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1672                let object_description =
1673                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1674                dependent_objects
1675                    .entry(role_name.to_string())
1676                    .or_default()
1677                    .push(format!("owner of {object_description}"));
1678            }
1679            privilege_check(
1680                &database.privileges,
1681                dropped_roles,
1682                &mut dependent_objects,
1683                &database_id,
1684                &catalog,
1685            );
1686            for schema in database.schemas_by_id.values() {
1687                let schema_id = SystemObjectId::Object(
1688                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1689                );
1690                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1691                    let object_description =
1692                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1693                    dependent_objects
1694                        .entry(role_name.to_string())
1695                        .or_default()
1696                        .push(format!("owner of {object_description}"));
1697                }
1698                privilege_check(
1699                    &schema.privileges,
1700                    dropped_roles,
1701                    &mut dependent_objects,
1702                    &schema_id,
1703                    &catalog,
1704                );
1705            }
1706        }
1707        for cluster in self.catalog.clusters() {
1708            let cluster_id = SystemObjectId::Object(cluster.id().into());
1709            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1710                let object_description =
1711                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1712                dependent_objects
1713                    .entry(role_name.to_string())
1714                    .or_default()
1715                    .push(format!("owner of {object_description}"));
1716            }
1717            privilege_check(
1718                &cluster.privileges,
1719                dropped_roles,
1720                &mut dependent_objects,
1721                &cluster_id,
1722                &catalog,
1723            );
1724            for replica in cluster.replicas() {
1725                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1726                    let replica_id =
1727                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1728                    let object_description =
1729                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1730                    dependent_objects
1731                        .entry(role_name.to_string())
1732                        .or_default()
1733                        .push(format!("owner of {object_description}"));
1734                }
1735            }
1736        }
1737        privilege_check(
1738            self.catalog().system_privileges(),
1739            dropped_roles,
1740            &mut dependent_objects,
1741            &SystemObjectId::System,
1742            &catalog,
1743        );
1744        for (default_privilege_object, default_privilege_acl_items) in
1745            self.catalog.default_privileges()
1746        {
1747            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1748                dependent_objects
1749                    .entry(role_name.to_string())
1750                    .or_default()
1751                    .push(format!(
1752                        "default privileges on {}S created by {}",
1753                        default_privilege_object.object_type, role_name
1754                    ));
1755            }
1756            for default_privilege_acl_item in default_privilege_acl_items {
1757                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1758                    dependent_objects
1759                        .entry(role_name.to_string())
1760                        .or_default()
1761                        .push(format!(
1762                            "default privileges on {}S granted to {}",
1763                            default_privilege_object.object_type, role_name
1764                        ));
1765                }
1766            }
1767        }
1768
1769        if !dependent_objects.is_empty() {
1770            Err(AdapterError::DependentObject(dependent_objects))
1771        } else {
1772            Ok(())
1773        }
1774    }
1775
1776    #[instrument]
1777    pub(super) async fn sequence_drop_owned(
1778        &mut self,
1779        session: &Session,
1780        plan: plan::DropOwnedPlan,
1781    ) -> Result<ExecuteResponse, AdapterError> {
1782        for role_id in &plan.role_ids {
1783            self.catalog().ensure_not_reserved_role(role_id)?;
1784        }
1785
1786        let mut privilege_revokes = plan.privilege_revokes;
1787
1788        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1789        let session_catalog = self.catalog().for_session(session);
1790        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1791            && !session.is_superuser()
1792        {
1793            // Obtain all roles that the current session is a member of.
1794            let role_membership =
1795                session_catalog.collect_role_membership(session.current_role_id());
1796            let invalid_revokes: BTreeSet<_> = privilege_revokes
1797                .extract_if(.., |(_, privilege)| {
1798                    !role_membership.contains(&privilege.grantor)
1799                })
1800                .map(|(object_id, _)| object_id)
1801                .collect();
1802            for invalid_revoke in invalid_revokes {
1803                let object_description =
1804                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1805                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1806            }
1807        }
1808
1809        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1810            catalog::Op::UpdatePrivilege {
1811                target_id: object_id,
1812                privilege,
1813                variant: UpdatePrivilegeVariant::Revoke,
1814            }
1815        });
1816        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1817            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1818                privilege_object,
1819                privilege_acl_item,
1820                variant: UpdatePrivilegeVariant::Revoke,
1821            },
1822        );
1823        let DropOps {
1824            ops: drop_ops,
1825            dropped_active_db,
1826            dropped_active_cluster,
1827            dropped_in_use_indexes,
1828        } = self.sequence_drop_common(session, plan.drop_ids)?;
1829
1830        let ops = privilege_revoke_ops
1831            .chain(default_privilege_revoke_ops)
1832            .chain(drop_ops.into_iter())
1833            .collect();
1834
1835        self.catalog_transact(Some(session), ops).await?;
1836
1837        if dropped_active_db {
1838            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1839                name: session.vars().database().to_string(),
1840            });
1841        }
1842        if dropped_active_cluster {
1843            session.add_notice(AdapterNotice::DroppedActiveCluster {
1844                name: session.vars().cluster().to_string(),
1845            });
1846        }
1847        for dropped_in_use_index in dropped_in_use_indexes {
1848            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1849        }
1850        Ok(ExecuteResponse::DroppedOwned)
1851    }
1852
1853    fn sequence_drop_common(
1854        &self,
1855        session: &Session,
1856        ids: Vec<ObjectId>,
1857    ) -> Result<DropOps, AdapterError> {
1858        let mut dropped_active_db = false;
1859        let mut dropped_active_cluster = false;
1860        let mut dropped_in_use_indexes = Vec::new();
1861        let mut dropped_roles = BTreeMap::new();
1862        let mut dropped_databases = BTreeSet::new();
1863        let mut dropped_schemas = BTreeSet::new();
1864        // Dropping either the group role or the member role of a role membership will trigger a
1865        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1866        // role membership twice.
1867        let mut role_revokes = BTreeSet::new();
1868        // Dropping a database or a schema will revoke all default roles associated with that
1869        // database or schema.
1870        let mut default_privilege_revokes = BTreeSet::new();
1871
1872        // Clusters we're dropping
1873        let mut clusters_to_drop = BTreeSet::new();
1874
1875        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1876        for id in &ids {
1877            match id {
1878                ObjectId::Database(id) => {
1879                    let name = self.catalog().get_database(id).name();
1880                    if name == session.vars().database() {
1881                        dropped_active_db = true;
1882                    }
1883                    dropped_databases.insert(id);
1884                }
1885                ObjectId::Schema((_, spec)) => {
1886                    if let SchemaSpecifier::Id(id) = spec {
1887                        dropped_schemas.insert(id);
1888                    }
1889                }
1890                ObjectId::Cluster(id) => {
1891                    clusters_to_drop.insert(*id);
1892                    if let Some(active_id) = self
1893                        .catalog()
1894                        .active_cluster(session)
1895                        .ok()
1896                        .map(|cluster| cluster.id())
1897                    {
1898                        if id == &active_id {
1899                            dropped_active_cluster = true;
1900                        }
1901                    }
1902                }
1903                ObjectId::Role(id) => {
1904                    let role = self.catalog().get_role(id);
1905                    let name = role.name();
1906                    dropped_roles.insert(*id, name);
1907                    // We must revoke all role memberships that the dropped roles belongs to.
1908                    for (group_id, grantor_id) in &role.membership.map {
1909                        role_revokes.insert((*group_id, *id, *grantor_id));
1910                    }
1911                }
1912                ObjectId::Item(id) => {
1913                    if let Some(index) = self.catalog().get_entry(id).index() {
1914                        let humanizer = self.catalog().for_session(session);
1915                        let dependants = self
1916                            .controller
1917                            .compute
1918                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1919                            .ok()
1920                            .into_iter()
1921                            .flatten()
1922                            .filter(|dependant_id| {
1923                                // Transient Ids belong to Peeks. We are not interested for now in
1924                                // peeks depending on a dropped index.
1925                                // TODO: show a different notice in this case. Something like
1926                                // "There is an in-progress ad hoc SELECT that uses the dropped
1927                                // index. The resources used by the index will be freed when all
1928                                // such SELECTs complete."
1929                                if dependant_id.is_transient() {
1930                                    return false;
1931                                }
1932                                // The item should exist, but don't panic if it doesn't.
1933                                let Some(dependent_id) = humanizer
1934                                    .try_get_item_by_global_id(dependant_id)
1935                                    .map(|item| item.id())
1936                                else {
1937                                    return false;
1938                                };
1939                                // If the dependent object is also being dropped, then there is no
1940                                // problem, so we don't want a notice.
1941                                !ids_set.contains(&ObjectId::Item(dependent_id))
1942                            })
1943                            .flat_map(|dependant_id| {
1944                                // If we are not able to find a name for this ID it probably means
1945                                // we have already dropped the compute collection, in which case we
1946                                // can ignore it.
1947                                humanizer.humanize_id(dependant_id)
1948                            })
1949                            .collect_vec();
1950                        if !dependants.is_empty() {
1951                            dropped_in_use_indexes.push(DroppedInUseIndex {
1952                                index_name: humanizer
1953                                    .humanize_id(index.global_id())
1954                                    .unwrap_or_else(|| id.to_string()),
1955                                dependant_objects: dependants,
1956                            });
1957                        }
1958                    }
1959                }
1960                _ => {}
1961            }
1962        }
1963
1964        for id in &ids {
1965            match id {
1966                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1967                // unless they are internal replicas, which exist outside the scope
1968                // of managed clusters.
1969                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1970                    if !clusters_to_drop.contains(cluster_id) {
1971                        let cluster = self.catalog.get_cluster(*cluster_id);
1972                        if cluster.is_managed() {
1973                            let replica =
1974                                cluster.replica(*replica_id).expect("Catalog out of sync");
1975                            if !replica.config.location.internal() {
1976                                coord_bail!("cannot drop replica of managed cluster");
1977                            }
1978                        }
1979                    }
1980                }
1981                _ => {}
1982            }
1983        }
1984
1985        for role_id in dropped_roles.keys() {
1986            self.catalog().ensure_not_reserved_role(role_id)?;
1987        }
1988        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1989        // If any role is a member of a dropped role, then we must revoke that membership.
1990        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1991        for role in self.catalog().user_roles() {
1992            for dropped_role_id in
1993                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1994            {
1995                role_revokes.insert((
1996                    **dropped_role_id,
1997                    role.id(),
1998                    *role
1999                        .membership
2000                        .map
2001                        .get(*dropped_role_id)
2002                        .expect("included in keys above"),
2003                ));
2004            }
2005        }
2006
2007        for (default_privilege_object, default_privilege_acls) in
2008            self.catalog().default_privileges()
2009        {
2010            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
2011                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
2012            {
2013                for default_privilege_acl in default_privilege_acls {
2014                    default_privilege_revokes.insert((
2015                        default_privilege_object.clone(),
2016                        default_privilege_acl.clone(),
2017                    ));
2018                }
2019            }
2020        }
2021
2022        let ops = role_revokes
2023            .into_iter()
2024            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2025                role_id,
2026                member_id,
2027                grantor_id,
2028            })
2029            .chain(default_privilege_revokes.into_iter().map(
2030                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2031                    privilege_object,
2032                    privilege_acl_item,
2033                    variant: UpdatePrivilegeVariant::Revoke,
2034                },
2035            ))
2036            .chain(iter::once(catalog::Op::DropObjects(
2037                ids.into_iter()
2038                    .map(DropObjectInfo::manual_drop_from_object_id)
2039                    .collect(),
2040            )))
2041            .collect();
2042
2043        Ok(DropOps {
2044            ops,
2045            dropped_active_db,
2046            dropped_active_cluster,
2047            dropped_in_use_indexes,
2048        })
2049    }
2050
2051    pub(super) fn sequence_explain_schema(
2052        &self,
2053        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2054    ) -> Result<ExecuteResponse, AdapterError> {
2055        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2056            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2057        })?;
2058
2059        let json_string = json_string(&json_value);
2060        let row = Row::pack_slice(&[Datum::String(&json_string)]);
2061        Ok(Self::send_immediate_rows(row))
2062    }
2063
2064    pub(super) fn sequence_show_all_variables(
2065        &self,
2066        session: &Session,
2067    ) -> Result<ExecuteResponse, AdapterError> {
2068        let mut rows = viewable_variables(self.catalog().state(), session)
2069            .map(|v| (v.name(), v.value(), v.description()))
2070            .collect::<Vec<_>>();
2071        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2072
2073        // TODO(parkmycar): Pack all of these into a single RowCollection.
2074        let rows: Vec<_> = rows
2075            .into_iter()
2076            .map(|(name, val, desc)| {
2077                Row::pack_slice(&[
2078                    Datum::String(name),
2079                    Datum::String(&val),
2080                    Datum::String(desc),
2081                ])
2082            })
2083            .collect();
2084        Ok(Self::send_immediate_rows(rows))
2085    }
2086
2087    pub(super) fn sequence_show_variable(
2088        &self,
2089        session: &Session,
2090        plan: plan::ShowVariablePlan,
2091    ) -> Result<ExecuteResponse, AdapterError> {
2092        if &plan.name == SCHEMA_ALIAS {
2093            let schemas = self.catalog.resolve_search_path(session);
2094            let schema = schemas.first();
2095            return match schema {
2096                Some((database_spec, schema_spec)) => {
2097                    let schema_name = &self
2098                        .catalog
2099                        .get_schema(database_spec, schema_spec, session.conn_id())
2100                        .name()
2101                        .schema;
2102                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
2103                    Ok(Self::send_immediate_rows(row))
2104                }
2105                None => {
2106                    if session.vars().current_object_missing_warnings() {
2107                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2108                            search_path: session
2109                                .vars()
2110                                .search_path()
2111                                .into_iter()
2112                                .map(|schema| schema.to_string())
2113                                .collect(),
2114                        });
2115                    }
2116                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2117                }
2118            };
2119        }
2120
2121        let variable = session
2122            .vars()
2123            .get(self.catalog().system_config(), &plan.name)
2124            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2125
2126        // In lieu of plumbing the user to all system config functions, just check that the var is
2127        // visible.
2128        variable.visible(session.user(), self.catalog().system_config())?;
2129
2130        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2131        if variable.name() == vars::DATABASE.name()
2132            && matches!(
2133                self.catalog().resolve_database(&variable.value()),
2134                Err(CatalogError::UnknownDatabase(_))
2135            )
2136            && session.vars().current_object_missing_warnings()
2137        {
2138            let name = variable.value();
2139            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2140        } else if variable.name() == vars::CLUSTER.name()
2141            && matches!(
2142                self.catalog().resolve_cluster(&variable.value()),
2143                Err(CatalogError::UnknownCluster(_))
2144            )
2145            && session.vars().current_object_missing_warnings()
2146        {
2147            let name = variable.value();
2148            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2149        }
2150        Ok(Self::send_immediate_rows(row))
2151    }
2152
2153    #[instrument]
2154    pub(super) async fn sequence_inspect_shard(
2155        &self,
2156        session: &Session,
2157        plan: plan::InspectShardPlan,
2158    ) -> Result<ExecuteResponse, AdapterError> {
2159        // TODO: Not thrilled about this rbac special case here, but probably
2160        // sufficient for now.
2161        if !session.user().is_internal() {
2162            return Err(AdapterError::Unauthorized(
2163                rbac::UnauthorizedError::MzSystem {
2164                    action: "inspect".into(),
2165                },
2166            ));
2167        }
2168        let state = self
2169            .controller
2170            .storage
2171            .inspect_persist_state(plan.id)
2172            .await?;
2173        let jsonb = Jsonb::from_serde_json(state)?;
2174        Ok(Self::send_immediate_rows(jsonb.into_row()))
2175    }
2176
2177    #[instrument]
2178    pub(super) fn sequence_set_variable(
2179        &self,
2180        session: &mut Session,
2181        plan: plan::SetVariablePlan,
2182    ) -> Result<ExecuteResponse, AdapterError> {
2183        let (name, local) = (plan.name, plan.local);
2184        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2185            self.validate_set_isolation_level(session)?;
2186        }
2187        if &name == vars::CLUSTER.name() {
2188            self.validate_set_cluster(session)?;
2189        }
2190
2191        let vars = session.vars_mut();
2192        let values = match plan.value {
2193            plan::VariableValue::Default => None,
2194            plan::VariableValue::Values(values) => Some(values),
2195        };
2196
2197        match values {
2198            Some(values) => {
2199                vars.set(
2200                    self.catalog().system_config(),
2201                    &name,
2202                    VarInput::SqlSet(&values),
2203                    local,
2204                )?;
2205
2206                let vars = session.vars();
2207
2208                // Emit a warning when deprecated variables are used.
2209                // TODO(database-issues#8069) remove this after sufficient time has passed
2210                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2211                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2212                } else if name == vars::CLUSTER.name()
2213                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2214                {
2215                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2216                }
2217
2218                // Database or cluster value does not correspond to a catalog item.
2219                if name.as_str() == vars::DATABASE.name()
2220                    && matches!(
2221                        self.catalog().resolve_database(vars.database()),
2222                        Err(CatalogError::UnknownDatabase(_))
2223                    )
2224                    && session.vars().current_object_missing_warnings()
2225                {
2226                    let name = vars.database().to_string();
2227                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2228                } else if name.as_str() == vars::CLUSTER.name()
2229                    && matches!(
2230                        self.catalog().resolve_cluster(vars.cluster()),
2231                        Err(CatalogError::UnknownCluster(_))
2232                    )
2233                    && session.vars().current_object_missing_warnings()
2234                {
2235                    let name = vars.cluster().to_string();
2236                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2237                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2238                    let v = values.into_first().to_lowercase();
2239                    if v == IsolationLevel::ReadUncommitted.as_str()
2240                        || v == IsolationLevel::ReadCommitted.as_str()
2241                        || v == IsolationLevel::RepeatableRead.as_str()
2242                    {
2243                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2244                            isolation_level: v,
2245                        });
2246                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2247                        session.add_notice(AdapterNotice::StrongSessionSerializable);
2248                    }
2249                }
2250            }
2251            None => vars.reset(self.catalog().system_config(), &name, local)?,
2252        }
2253
2254        Ok(ExecuteResponse::SetVariable { name, reset: false })
2255    }
2256
2257    pub(super) fn sequence_reset_variable(
2258        &self,
2259        session: &mut Session,
2260        plan: plan::ResetVariablePlan,
2261    ) -> Result<ExecuteResponse, AdapterError> {
2262        let name = plan.name;
2263        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2264            self.validate_set_isolation_level(session)?;
2265        }
2266        if &name == vars::CLUSTER.name() {
2267            self.validate_set_cluster(session)?;
2268        }
2269        session
2270            .vars_mut()
2271            .reset(self.catalog().system_config(), &name, false)?;
2272        Ok(ExecuteResponse::SetVariable { name, reset: true })
2273    }
2274
2275    pub(super) fn sequence_set_transaction(
2276        &self,
2277        session: &mut Session,
2278        plan: plan::SetTransactionPlan,
2279    ) -> Result<ExecuteResponse, AdapterError> {
2280        // TODO(jkosh44) Only supports isolation levels for now.
2281        for mode in plan.modes {
2282            match mode {
2283                TransactionMode::AccessMode(_) => {
2284                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2285                }
2286                TransactionMode::IsolationLevel(isolation_level) => {
2287                    self.validate_set_isolation_level(session)?;
2288
2289                    session.vars_mut().set(
2290                        self.catalog().system_config(),
2291                        TRANSACTION_ISOLATION_VAR_NAME,
2292                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2293                        plan.local,
2294                    )?
2295                }
2296            }
2297        }
2298        Ok(ExecuteResponse::SetVariable {
2299            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2300            reset: false,
2301        })
2302    }
2303
2304    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2305        if session.transaction().contains_ops() {
2306            Err(AdapterError::InvalidSetIsolationLevel)
2307        } else {
2308            Ok(())
2309        }
2310    }
2311
2312    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2313        if session.transaction().contains_ops() {
2314            Err(AdapterError::InvalidSetCluster)
2315        } else {
2316            Ok(())
2317        }
2318    }
2319
2320    #[instrument]
2321    pub(super) async fn sequence_end_transaction(
2322        &mut self,
2323        mut ctx: ExecuteContext,
2324        mut action: EndTransactionAction,
2325    ) {
2326        // If the transaction has failed, we can only rollback.
2327        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2328            (&action, ctx.session().transaction())
2329        {
2330            action = EndTransactionAction::Rollback;
2331        }
2332        let response = match action {
2333            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2334                params: BTreeMap::new(),
2335            }),
2336            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2337                params: BTreeMap::new(),
2338            }),
2339        };
2340
2341        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2342
2343        let (response, action) = match result {
2344            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2345                (response, action)
2346            }
2347            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2348                // Make sure we have the correct set of write locks for this transaction.
2349                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2350                // transactions.
2351                let validated_locks = match write_lock_guards {
2352                    None => None,
2353                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2354                        Ok(locks) => Some(locks),
2355                        Err(missing) => {
2356                            tracing::error!(?missing, "programming error, missing write locks");
2357                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2358                        }
2359                    },
2360                };
2361
2362                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2363                for WriteOp { id, rows } in writes {
2364                    let total_rows = collected_writes.entry(id).or_default();
2365                    total_rows.push(rows);
2366                }
2367
2368                self.submit_write(PendingWriteTxn::User {
2369                    span: Span::current(),
2370                    writes: collected_writes,
2371                    write_locks: validated_locks,
2372                    pending_txn: PendingTxn {
2373                        ctx,
2374                        response,
2375                        action,
2376                    },
2377                });
2378                return;
2379            }
2380            Ok((
2381                Some(TransactionOps::Peeks {
2382                    determination,
2383                    requires_linearization: RequireLinearization::Required,
2384                    ..
2385                }),
2386                _,
2387            )) if ctx.session().vars().transaction_isolation()
2388                == &IsolationLevel::StrictSerializable =>
2389            {
2390                let conn_id = ctx.session().conn_id().clone();
2391                let pending_read_txn = PendingReadTxn {
2392                    txn: PendingRead::Read {
2393                        txn: PendingTxn {
2394                            ctx,
2395                            response,
2396                            action,
2397                        },
2398                    },
2399                    timestamp_context: determination.timestamp_context,
2400                    created: Instant::now(),
2401                    num_requeues: 0,
2402                    otel_ctx: OpenTelemetryContext::obtain(),
2403                };
2404                self.strict_serializable_reads_tx
2405                    .send((conn_id, pending_read_txn))
2406                    .expect("sending to strict_serializable_reads_tx cannot fail");
2407                return;
2408            }
2409            Ok((
2410                Some(TransactionOps::Peeks {
2411                    determination,
2412                    requires_linearization: RequireLinearization::Required,
2413                    ..
2414                }),
2415                _,
2416            )) if ctx.session().vars().transaction_isolation()
2417                == &IsolationLevel::StrongSessionSerializable =>
2418            {
2419                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2420                    ctx.session_mut()
2421                        .ensure_timestamp_oracle(timeline.clone())
2422                        .apply_write(*ts);
2423                }
2424                (response, action)
2425            }
2426            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2427                self.internal_cmd_tx
2428                    .send(Message::ExecuteSingleStatementTransaction {
2429                        ctx,
2430                        otel_ctx: OpenTelemetryContext::obtain(),
2431                        stmt,
2432                        params,
2433                    })
2434                    .expect("must send");
2435                return;
2436            }
2437            Ok((_, _)) => (response, action),
2438            Err(err) => (Err(err), EndTransactionAction::Rollback),
2439        };
2440        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2441        // Append any parameters that changed to the response.
2442        let response = response.map(|mut r| {
2443            r.extend_params(changed);
2444            ExecuteResponse::from(r)
2445        });
2446
2447        ctx.retire(response);
2448    }
2449
2450    #[instrument]
2451    async fn sequence_end_transaction_inner(
2452        &mut self,
2453        ctx: &mut ExecuteContext,
2454        action: EndTransactionAction,
2455    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2456        let txn = self.clear_transaction(ctx.session_mut()).await;
2457
2458        if let EndTransactionAction::Commit = action {
2459            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2460                match &mut ops {
2461                    TransactionOps::Writes(writes) => {
2462                        for WriteOp { id, .. } in &mut writes.iter() {
2463                            // Re-verify this id exists.
2464                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2465                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2466                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2467                                        CatalogError::UnknownItem(id.to_string()),
2468                                    ),
2469                                })
2470                            })?;
2471                        }
2472
2473                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2474                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2475                    }
2476                    TransactionOps::DDL {
2477                        ops,
2478                        state: _,
2479                        side_effects,
2480                        revision,
2481                    } => {
2482                        // Make sure our catalog hasn't changed.
2483                        if *revision != self.catalog().transient_revision() {
2484                            return Err(AdapterError::DDLTransactionRace);
2485                        }
2486                        // Commit all of our queued ops.
2487                        let ops = std::mem::take(ops);
2488                        let side_effects = std::mem::take(side_effects);
2489                        self.catalog_transact_with_side_effects(
2490                            Some(ctx),
2491                            ops,
2492                            move |a, mut ctx| {
2493                                Box::pin(async move {
2494                                    for side_effect in side_effects {
2495                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2496                                    }
2497                                })
2498                            },
2499                        )
2500                        .await?;
2501                    }
2502                    _ => (),
2503                }
2504                return Ok((Some(ops), write_lock_guards));
2505            }
2506        }
2507
2508        Ok((None, None))
2509    }
2510
2511    pub(super) async fn sequence_side_effecting_func(
2512        &mut self,
2513        ctx: ExecuteContext,
2514        plan: SideEffectingFunc,
2515    ) {
2516        match plan {
2517            SideEffectingFunc::PgCancelBackend { connection_id } => {
2518                if ctx.session().conn_id().unhandled() == connection_id {
2519                    // As a special case, if we're canceling ourselves, we send
2520                    // back a canceled resposne to the client issuing the query,
2521                    // and so we need to do no further processing of the cancel.
2522                    ctx.retire(Err(AdapterError::Canceled));
2523                    return;
2524                }
2525
2526                let res = if let Some((id_handle, _conn_meta)) =
2527                    self.active_conns.get_key_value(&connection_id)
2528                {
2529                    // check_plan already verified role membership.
2530                    self.handle_privileged_cancel(id_handle.clone()).await;
2531                    Datum::True
2532                } else {
2533                    Datum::False
2534                };
2535                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2536            }
2537        }
2538    }
2539
2540    /// Checks to see if the session needs a real time recency timestamp and if so returns
2541    /// a future that will return the timestamp.
2542    pub(super) async fn determine_real_time_recent_timestamp(
2543        &self,
2544        session: &Session,
2545        source_ids: impl Iterator<Item = CatalogItemId>,
2546    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2547    {
2548        let vars = session.vars();
2549
2550        // Ideally this logic belongs inside of
2551        // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2552        // logic in there would make it extremely difficult and inconvenient to pull the waiting off
2553        // of the main coord thread.
2554        let r = if vars.real_time_recency()
2555            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2556            && !session.contains_read_timestamp()
2557        {
2558            // Find all dependencies transitively because we need to ensure that
2559            // RTR queries determine the timestamp from the sources' (i.e.
2560            // storage objects that ingest data from external systems) remap
2561            // data. We "cheat" a little bit and filter out any IDs that aren't
2562            // user objects because we know they are not a RTR source.
2563            let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2564            // If none of the sources are user objects, we don't need to provide
2565            // a RTR timestamp.
2566            if to_visit.is_empty() {
2567                return Ok(None);
2568            }
2569
2570            let mut timestamp_objects = BTreeSet::new();
2571
2572            while let Some(id) = to_visit.pop_front() {
2573                timestamp_objects.insert(id);
2574                to_visit.extend(
2575                    self.catalog()
2576                        .get_entry(&id)
2577                        .uses()
2578                        .into_iter()
2579                        .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2580                );
2581            }
2582            let timestamp_objects = timestamp_objects
2583                .into_iter()
2584                .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2585                .collect();
2586
2587            let r = self
2588                .controller
2589                .determine_real_time_recent_timestamp(
2590                    timestamp_objects,
2591                    *vars.real_time_recency_timeout(),
2592                )
2593                .await?;
2594
2595            Some(r)
2596        } else {
2597            None
2598        };
2599
2600        Ok(r)
2601    }
2602
2603    #[instrument]
2604    pub(super) async fn sequence_explain_plan(
2605        &mut self,
2606        ctx: ExecuteContext,
2607        plan: plan::ExplainPlanPlan,
2608        target_cluster: TargetCluster,
2609    ) {
2610        match &plan.explainee {
2611            plan::Explainee::Statement(stmt) => match stmt {
2612                plan::ExplaineeStatement::CreateView { .. } => {
2613                    self.explain_create_view(ctx, plan).await;
2614                }
2615                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2616                    self.explain_create_materialized_view(ctx, plan).await;
2617                }
2618                plan::ExplaineeStatement::CreateIndex { .. } => {
2619                    self.explain_create_index(ctx, plan).await;
2620                }
2621                plan::ExplaineeStatement::Select { .. } => {
2622                    self.explain_peek(ctx, plan, target_cluster).await;
2623                }
2624            },
2625            plan::Explainee::View(_) => {
2626                let result = self.explain_view(&ctx, plan);
2627                ctx.retire(result);
2628            }
2629            plan::Explainee::MaterializedView(_) => {
2630                let result = self.explain_materialized_view(&ctx, plan);
2631                ctx.retire(result);
2632            }
2633            plan::Explainee::Index(_) => {
2634                let result = self.explain_index(&ctx, plan);
2635                ctx.retire(result);
2636            }
2637            plan::Explainee::ReplanView(_) => {
2638                self.explain_replan_view(ctx, plan).await;
2639            }
2640            plan::Explainee::ReplanMaterializedView(_) => {
2641                self.explain_replan_materialized_view(ctx, plan).await;
2642            }
2643            plan::Explainee::ReplanIndex(_) => {
2644                self.explain_replan_index(ctx, plan).await;
2645            }
2646        };
2647    }
2648
2649    pub(super) async fn sequence_explain_pushdown(
2650        &mut self,
2651        ctx: ExecuteContext,
2652        plan: plan::ExplainPushdownPlan,
2653        target_cluster: TargetCluster,
2654    ) {
2655        match plan.explainee {
2656            Explainee::Statement(ExplaineeStatement::Select {
2657                broken: false,
2658                plan,
2659                desc: _,
2660            }) => {
2661                let stage = return_if_err!(
2662                    self.peek_validate(
2663                        ctx.session(),
2664                        plan,
2665                        target_cluster,
2666                        None,
2667                        ExplainContext::Pushdown,
2668                        Some(ctx.session().vars().max_query_result_size()),
2669                    ),
2670                    ctx
2671                );
2672                self.sequence_staged(ctx, Span::current(), stage).await;
2673            }
2674            Explainee::MaterializedView(item_id) => {
2675                self.explain_pushdown_materialized_view(ctx, item_id).await;
2676            }
2677            _ => {
2678                ctx.retire(Err(AdapterError::Unsupported(
2679                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2680                )));
2681            }
2682        };
2683    }
2684
2685    async fn render_explain_pushdown(
2686        &self,
2687        ctx: ExecuteContext,
2688        as_of: Antichain<Timestamp>,
2689        mz_now: ResultSpec<'static>,
2690        read_holds: Option<ReadHolds<Timestamp>>,
2691        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2692    ) {
2693        let fut = self
2694            .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2695            .await;
2696        task::spawn(|| "render explain pushdown", async move {
2697            // Transfer the necessary read holds over to the background task
2698            let _read_holds = read_holds;
2699            let res = fut.await;
2700            ctx.retire(res);
2701        });
2702    }
2703
2704    async fn render_explain_pushdown_prepare<
2705        I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2706    >(
2707        &self,
2708        session: &Session,
2709        as_of: Antichain<Timestamp>,
2710        mz_now: ResultSpec<'static>,
2711        imports: I,
2712    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2713        let explain_timeout = *session.vars().statement_timeout();
2714        let mut futures = FuturesOrdered::new();
2715        for (id, mfp) in imports {
2716            let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2717            let full_name = self
2718                .catalog
2719                .for_session(session)
2720                .resolve_full_name(&catalog_entry.name);
2721            let name = format!("{}", full_name);
2722            let relation_desc = catalog_entry
2723                .desc_opt()
2724                .expect("source should have a proper desc")
2725                .into_owned();
2726            let stats_future = self
2727                .controller
2728                .storage_collections
2729                .snapshot_parts_stats(id, as_of.clone())
2730                .await;
2731
2732            let mz_now = mz_now.clone();
2733            // These futures may block if the source is not yet readable at the as-of;
2734            // stash them in `futures` and only block on them in a separate task.
2735            futures.push_back(async move {
2736                let snapshot_stats = match stats_future.await {
2737                    Ok(stats) => stats,
2738                    Err(e) => return Err(e),
2739                };
2740                let mut total_bytes = 0;
2741                let mut total_parts = 0;
2742                let mut selected_bytes = 0;
2743                let mut selected_parts = 0;
2744                for SnapshotPartStats {
2745                    encoded_size_bytes: bytes,
2746                    stats,
2747                } in &snapshot_stats.parts
2748                {
2749                    let bytes = u64::cast_from(*bytes);
2750                    total_bytes += bytes;
2751                    total_parts += 1u64;
2752                    let selected = match stats {
2753                        None => true,
2754                        Some(stats) => {
2755                            let stats = stats.decode();
2756                            let stats = RelationPartStats::new(
2757                                name.as_str(),
2758                                &snapshot_stats.metrics.pushdown.part_stats,
2759                                &relation_desc,
2760                                &stats,
2761                            );
2762                            stats.may_match_mfp(mz_now.clone(), &mfp)
2763                        }
2764                    };
2765
2766                    if selected {
2767                        selected_bytes += bytes;
2768                        selected_parts += 1u64;
2769                    }
2770                }
2771                Ok(Row::pack_slice(&[
2772                    name.as_str().into(),
2773                    total_bytes.into(),
2774                    selected_bytes.into(),
2775                    total_parts.into(),
2776                    selected_parts.into(),
2777                ]))
2778            });
2779        }
2780
2781        let fut = async move {
2782            match tokio::time::timeout(
2783                explain_timeout,
2784                futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2785            )
2786            .await
2787            {
2788                Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2789                    rows: Box::new(rows.into_row_iter()),
2790                }),
2791                Ok(Err(err)) => Err(err.into()),
2792                Err(_) => Err(AdapterError::StatementTimeout),
2793            }
2794        };
2795        fut
2796    }
2797
2798    #[instrument]
2799    pub(super) async fn sequence_insert(
2800        &mut self,
2801        mut ctx: ExecuteContext,
2802        plan: plan::InsertPlan,
2803    ) {
2804        // Normally, this would get checked when trying to add "write ops" to
2805        // the transaction but we go down diverging paths below, based on
2806        // whether the INSERT is only constant values or not.
2807        //
2808        // For the non-constant case we sequence an implicit read-then-write,
2809        // which messes with the transaction ops and would allow an implicit
2810        // read-then-write to sneak into a read-only transaction.
2811        if !ctx.session_mut().transaction().allows_writes() {
2812            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2813            return;
2814        }
2815
2816        // The structure of this code originates from a time where
2817        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2818        // optimized `MirRelationExpr`.
2819        //
2820        // Ideally, we would like to make the `selection.as_const().is_some()`
2821        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2822        // are planned as a Wrap($n, $vals) call, so until we can reduce
2823        // HirRelationExpr this will always returns false.
2824        //
2825        // Unfortunately, hitting the default path of the match below also
2826        // causes a lot of tests to fail, so we opted to go with the extra
2827        // `plan.values.clone()` statements when producing the `optimized_mir`
2828        // and re-optimize the values in the `sequence_read_then_write` call.
2829        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2830            // We don't perform any optimizations on an expression that is already
2831            // a constant for writes, as we want to maximize bulk-insert throughput.
2832            let expr = return_if_err!(
2833                plan.values
2834                    .clone()
2835                    .lower(self.catalog().system_config(), None),
2836                ctx
2837            );
2838            OptimizedMirRelationExpr(expr)
2839        } else {
2840            // Collect optimizer parameters.
2841            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2842
2843            // (`optimize::view::Optimizer` has a special case for constant queries.)
2844            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2845
2846            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2847            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2848        };
2849
2850        match optimized_mir.into_inner() {
2851            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2852                let catalog = self.owned_catalog();
2853                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2854                    let result =
2855                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2856                    ctx.retire(result);
2857                });
2858            }
2859            // All non-constant values must be planned as read-then-writes.
2860            _ => {
2861                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2862                    Some(table) => {
2863                        let fullname = self
2864                            .catalog()
2865                            .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2866                        // Inserts always occur at the latest version of the table.
2867                        table
2868                            .desc_latest(&fullname)
2869                            .expect("desc called on table")
2870                            .arity()
2871                    }
2872                    None => {
2873                        ctx.retire(Err(AdapterError::Catalog(
2874                            mz_catalog::memory::error::Error {
2875                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2876                                    CatalogError::UnknownItem(plan.id.to_string()),
2877                                ),
2878                            },
2879                        )));
2880                        return;
2881                    }
2882                };
2883
2884                let finishing = RowSetFinishing {
2885                    order_by: vec![],
2886                    limit: None,
2887                    offset: 0,
2888                    project: (0..desc_arity).collect(),
2889                };
2890
2891                let read_then_write_plan = plan::ReadThenWritePlan {
2892                    id: plan.id,
2893                    selection: plan.values,
2894                    finishing,
2895                    assignments: BTreeMap::new(),
2896                    kind: MutationKind::Insert,
2897                    returning: plan.returning,
2898                };
2899
2900                self.sequence_read_then_write(ctx, read_then_write_plan)
2901                    .await;
2902            }
2903        }
2904    }
2905
2906    /// ReadThenWrite is a plan whose writes depend on the results of a
2907    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2908    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2909    /// serializability violation could occur.
2910    #[instrument]
2911    pub(super) async fn sequence_read_then_write(
2912        &mut self,
2913        mut ctx: ExecuteContext,
2914        plan: plan::ReadThenWritePlan,
2915    ) {
2916        let mut source_ids: BTreeSet<_> = plan
2917            .selection
2918            .depends_on()
2919            .into_iter()
2920            .map(|gid| self.catalog().resolve_item_id(&gid))
2921            .collect();
2922        source_ids.insert(plan.id);
2923
2924        // If the transaction doesn't already have write locks, acquire them.
2925        if ctx.session().transaction().write_locks().is_none() {
2926            // Pre-define all of the locks we need.
2927            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2928
2929            // Try acquiring all of our locks.
2930            for id in &source_ids {
2931                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2932                    write_locks.insert_lock(*id, lock);
2933                }
2934            }
2935
2936            // See if we acquired all of the neccessary locks.
2937            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2938                Ok(locks) => locks,
2939                Err(missing) => {
2940                    // Defer our write if we couldn't acquire all of the locks.
2941                    let role_metadata = ctx.session().role_metadata().clone();
2942                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2943                    let plan = DeferredPlan {
2944                        ctx,
2945                        plan: Plan::ReadThenWrite(plan),
2946                        validity: PlanValidity::new(
2947                            self.catalog.transient_revision(),
2948                            source_ids.clone(),
2949                            None,
2950                            None,
2951                            role_metadata,
2952                        ),
2953                        requires_locks: source_ids,
2954                    };
2955                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2956                }
2957            };
2958
2959            ctx.session_mut()
2960                .try_grant_write_locks(write_locks)
2961                .expect("session has already been granted write locks");
2962        }
2963
2964        let plan::ReadThenWritePlan {
2965            id,
2966            kind,
2967            selection,
2968            mut assignments,
2969            finishing,
2970            mut returning,
2971        } = plan;
2972
2973        // Read then writes can be queued, so re-verify the id exists.
2974        let desc = match self.catalog().try_get_entry(&id) {
2975            Some(table) => {
2976                let full_name = self
2977                    .catalog()
2978                    .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2979                // Inserts always occur at the latest version of the table.
2980                table
2981                    .desc_latest(&full_name)
2982                    .expect("desc called on table")
2983                    .into_owned()
2984            }
2985            None => {
2986                ctx.retire(Err(AdapterError::Catalog(
2987                    mz_catalog::memory::error::Error {
2988                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2989                            id.to_string(),
2990                        )),
2991                    },
2992                )));
2993                return;
2994            }
2995        };
2996
2997        // Disallow mz_now in any position because read time and write time differ.
2998        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2999            || assignments.values().any(|e| e.contains_temporal())
3000            || returning.iter().any(|e| e.contains_temporal());
3001        if contains_temporal {
3002            ctx.retire(Err(AdapterError::Unsupported(
3003                "calls to mz_now in write statements",
3004            )));
3005            return;
3006        }
3007
3008        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
3009        //
3010        // - They do not refer to any objects whose notion of time moves differently than that of
3011        // user tables. This limitation is meant to ensure no writes occur between this read and the
3012        // subsequent write.
3013        // - They do not use mz_now(), whose time produced during read will differ from the write
3014        //   timestamp.
3015        fn validate_read_dependencies(
3016            catalog: &Catalog,
3017            id: &CatalogItemId,
3018        ) -> Result<(), AdapterError> {
3019            use CatalogItemType::*;
3020            use mz_catalog::memory::objects;
3021            let mut ids_to_check = Vec::new();
3022            let valid = match catalog.try_get_entry(id) {
3023                Some(entry) => {
3024                    if let CatalogItem::View(objects::View { optimized_expr, .. })
3025                    | CatalogItem::MaterializedView(objects::MaterializedView {
3026                        optimized_expr,
3027                        ..
3028                    }) = entry.item()
3029                    {
3030                        if optimized_expr.contains_temporal() {
3031                            return Err(AdapterError::Unsupported(
3032                                "calls to mz_now in write statements",
3033                            ));
3034                        }
3035                    }
3036                    match entry.item().typ() {
3037                        typ @ (Func | View | MaterializedView | ContinualTask) => {
3038                            ids_to_check.extend(entry.uses());
3039                            let valid_id = id.is_user() || matches!(typ, Func);
3040                            valid_id
3041                        }
3042                        Source | Secret | Connection => false,
3043                        // Cannot select from sinks or indexes.
3044                        Sink | Index => unreachable!(),
3045                        Table => {
3046                            if !id.is_user() {
3047                                // We can't read from non-user tables
3048                                false
3049                            } else {
3050                                // We can't read from tables that are source-exports
3051                                entry.source_export_details().is_none()
3052                            }
3053                        }
3054                        Type => true,
3055                    }
3056                }
3057                None => false,
3058            };
3059            if !valid {
3060                return Err(AdapterError::InvalidTableMutationSelection);
3061            }
3062            for id in ids_to_check {
3063                validate_read_dependencies(catalog, &id)?;
3064            }
3065            Ok(())
3066        }
3067
3068        for gid in selection.depends_on() {
3069            let item_id = self.catalog().resolve_item_id(&gid);
3070            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3071                ctx.retire(Err(err));
3072                return;
3073            }
3074        }
3075
3076        let (peek_tx, peek_rx) = oneshot::channel();
3077        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3078        let (tx, _, session, extra) = ctx.into_parts();
3079        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
3080        // execution context, because this peek does not directly correspond to an execute,
3081        // and so we don't need to take any action on its retirement.
3082        // TODO[btv]: we might consider extending statement logging to log the inner
3083        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
3084        // and mint a new "real" execution context here. We'd also have to add some logic to
3085        // make sure such "sub-statements" are always sampled when the top-level statement is
3086        //
3087        // It's debatable whether this makes sense conceptually,
3088        // because the inner fragment here is not actually a
3089        // "statement" in its own right.
3090        let peek_ctx = ExecuteContext::from_parts(
3091            peek_client_tx,
3092            self.internal_cmd_tx.clone(),
3093            session,
3094            Default::default(),
3095        );
3096
3097        self.sequence_peek(
3098            peek_ctx,
3099            plan::SelectPlan {
3100                select: None,
3101                source: selection,
3102                when: QueryWhen::FreshestTableWrite,
3103                finishing,
3104                copy_to: None,
3105            },
3106            TargetCluster::Active,
3107            None,
3108        )
3109        .await;
3110
3111        let internal_cmd_tx = self.internal_cmd_tx.clone();
3112        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3113        let catalog = self.owned_catalog();
3114        let max_result_size = self.catalog().system_config().max_result_size();
3115
3116        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3117            let (peek_response, session) = match peek_rx.await {
3118                Ok(Response {
3119                    result: Ok(resp),
3120                    session,
3121                    otel_ctx,
3122                }) => {
3123                    otel_ctx.attach_as_parent();
3124                    (resp, session)
3125                }
3126                Ok(Response {
3127                    result: Err(e),
3128                    session,
3129                    otel_ctx,
3130                }) => {
3131                    let ctx =
3132                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3133                    otel_ctx.attach_as_parent();
3134                    ctx.retire(Err(e));
3135                    return;
3136                }
3137                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
3138                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3139            };
3140            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3141            let mut timeout_dur = *ctx.session().vars().statement_timeout();
3142
3143            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
3144            if timeout_dur == Duration::ZERO {
3145                timeout_dur = Duration::MAX;
3146            }
3147
3148            let style = ExprPrepStyle::OneShot {
3149                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
3150                session: ctx.session(),
3151                catalog_state: catalog.state(),
3152            };
3153            for expr in assignments.values_mut().chain(returning.iter_mut()) {
3154                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3155            }
3156
3157            let make_diffs =
3158                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3159                    let arena = RowArena::new();
3160                    let mut diffs = Vec::new();
3161                    let mut datum_vec = mz_repr::DatumVec::new();
3162
3163                    while let Some(row) = rows.next() {
3164                        if !assignments.is_empty() {
3165                            assert!(
3166                                matches!(kind, MutationKind::Update),
3167                                "only updates support assignments"
3168                            );
3169                            let mut datums = datum_vec.borrow_with(row);
3170                            let mut updates = vec![];
3171                            for (idx, expr) in &assignments {
3172                                let updated = match expr.eval(&datums, &arena) {
3173                                    Ok(updated) => updated,
3174                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3175                                };
3176                                updates.push((*idx, updated));
3177                            }
3178                            for (idx, new_value) in updates {
3179                                datums[idx] = new_value;
3180                            }
3181                            let updated = Row::pack_slice(&datums);
3182                            diffs.push((updated, Diff::ONE));
3183                        }
3184                        match kind {
3185                            // Updates and deletes always remove the
3186                            // current row. Updates will also add an
3187                            // updated value.
3188                            MutationKind::Update | MutationKind::Delete => {
3189                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
3190                            }
3191                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3192                        }
3193                    }
3194
3195                    // Sum of all the rows' byte size, for checking if we go
3196                    // above the max_result_size threshold.
3197                    let mut byte_size: u64 = 0;
3198                    for (row, diff) in &diffs {
3199                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3200                        if diff.is_positive() {
3201                            for (idx, datum) in row.iter().enumerate() {
3202                                desc.constraints_met(idx, &datum)?;
3203                            }
3204                        }
3205                    }
3206                    Ok((diffs, byte_size))
3207                };
3208
3209            let diffs = match peek_response {
3210                ExecuteResponse::SendingRowsStreaming {
3211                    rows: mut rows_stream,
3212                    ..
3213                } => {
3214                    let mut byte_size: u64 = 0;
3215                    let mut diffs = Vec::new();
3216                    let result = loop {
3217                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3218                            Ok(Some(res)) => match res {
3219                                PeekResponseUnary::Rows(new_rows) => {
3220                                    match make_diffs(new_rows) {
3221                                        Ok((mut new_diffs, new_byte_size)) => {
3222                                            byte_size = byte_size.saturating_add(new_byte_size);
3223                                            if byte_size > max_result_size {
3224                                                break Err(AdapterError::ResultSize(format!(
3225                                                    "result exceeds max size of {max_result_size}"
3226                                                )));
3227                                            }
3228                                            diffs.append(&mut new_diffs)
3229                                        }
3230                                        Err(e) => break Err(e),
3231                                    };
3232                                }
3233                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3234                                PeekResponseUnary::Error(e) => {
3235                                    break Err(AdapterError::Unstructured(anyhow!(e)));
3236                                }
3237                            },
3238                            Ok(None) => break Ok(diffs),
3239                            Err(_) => {
3240                                // We timed out, so remove the pending peek. This is
3241                                // best-effort and doesn't guarantee we won't
3242                                // receive a response.
3243                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
3244                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3245                                    conn_id: ctx.session().conn_id().clone(),
3246                                });
3247                                if let Err(e) = result {
3248                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3249                                }
3250                                break Err(AdapterError::StatementTimeout);
3251                            }
3252                        }
3253                    };
3254
3255                    result
3256                }
3257                ExecuteResponse::SendingRowsImmediate { rows } => {
3258                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3259                }
3260                resp => Err(AdapterError::Unstructured(anyhow!(
3261                    "unexpected peek response: {resp:?}"
3262                ))),
3263            };
3264
3265            let mut returning_rows = Vec::new();
3266            let mut diff_err: Option<AdapterError> = None;
3267            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3268                let arena = RowArena::new();
3269                for (row, diff) in diffs {
3270                    if !diff.is_positive() {
3271                        continue;
3272                    }
3273                    let mut returning_row = Row::with_capacity(returning.len());
3274                    let mut packer = returning_row.packer();
3275                    for expr in &returning {
3276                        let datums: Vec<_> = row.iter().collect();
3277                        match expr.eval(&datums, &arena) {
3278                            Ok(datum) => {
3279                                packer.push(datum);
3280                            }
3281                            Err(err) => {
3282                                diff_err = Some(err.into());
3283                                break;
3284                            }
3285                        }
3286                    }
3287                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3288                    let diff = match NonZeroUsize::try_from(diff) {
3289                        Ok(diff) => diff,
3290                        Err(err) => {
3291                            diff_err = Some(err.into());
3292                            break;
3293                        }
3294                    };
3295                    returning_rows.push((returning_row, diff));
3296                    if diff_err.is_some() {
3297                        break;
3298                    }
3299                }
3300            }
3301            let diffs = if let Some(err) = diff_err {
3302                Err(err)
3303            } else {
3304                diffs
3305            };
3306
3307            // We need to clear out the timestamp context so the write doesn't fail due to a
3308            // read only transaction.
3309            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3310            // No matter what isolation level the client is using, we must linearize this
3311            // read. The write will be performed right after this, as part of a single
3312            // transaction, so the write must have a timestamp greater than or equal to the
3313            // read.
3314            //
3315            // Note: It's only OK for the write to have a greater timestamp than the read
3316            // because the write lock prevents any other writes from happening in between
3317            // the read and write.
3318            if let Some(timestamp_context) = timestamp_context {
3319                let (tx, rx) = tokio::sync::oneshot::channel();
3320                let conn_id = ctx.session().conn_id().clone();
3321                let pending_read_txn = PendingReadTxn {
3322                    txn: PendingRead::ReadThenWrite { ctx, tx },
3323                    timestamp_context,
3324                    created: Instant::now(),
3325                    num_requeues: 0,
3326                    otel_ctx: OpenTelemetryContext::obtain(),
3327                };
3328                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3329                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3330                if let Err(e) = result {
3331                    warn!(
3332                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3333                        e
3334                    );
3335                    return;
3336                }
3337                let result = rx.await;
3338                // It is not an error for these results to be ready after `tx` has been dropped.
3339                ctx = match result {
3340                    Ok(Some(ctx)) => ctx,
3341                    Ok(None) => {
3342                        // Coordinator took our context and will handle responding to the client.
3343                        // This usually indicates that our transaction was aborted.
3344                        return;
3345                    }
3346                    Err(e) => {
3347                        warn!(
3348                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3349                            e
3350                        );
3351                        return;
3352                    }
3353                };
3354            }
3355
3356            match diffs {
3357                Ok(diffs) => {
3358                    let result = Self::send_diffs(
3359                        ctx.session_mut(),
3360                        plan::SendDiffsPlan {
3361                            id,
3362                            updates: diffs,
3363                            kind,
3364                            returning: returning_rows,
3365                            max_result_size,
3366                        },
3367                    );
3368                    ctx.retire(result);
3369                }
3370                Err(e) => {
3371                    ctx.retire(Err(e));
3372                }
3373            }
3374        });
3375    }
3376
3377    #[instrument]
3378    pub(super) async fn sequence_alter_item_rename(
3379        &mut self,
3380        ctx: &mut ExecuteContext,
3381        plan: plan::AlterItemRenamePlan,
3382    ) -> Result<ExecuteResponse, AdapterError> {
3383        let op = catalog::Op::RenameItem {
3384            id: plan.id,
3385            current_full_name: plan.current_full_name,
3386            to_name: plan.to_name,
3387        };
3388        match self
3389            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3390            .await
3391        {
3392            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3393            Err(err) => Err(err),
3394        }
3395    }
3396
3397    #[instrument]
3398    pub(super) async fn sequence_alter_retain_history(
3399        &mut self,
3400        ctx: &mut ExecuteContext,
3401        plan: plan::AlterRetainHistoryPlan,
3402    ) -> Result<ExecuteResponse, AdapterError> {
3403        let ops = vec![catalog::Op::AlterRetainHistory {
3404            id: plan.id,
3405            value: plan.value,
3406            window: plan.window,
3407        }];
3408        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3409            Box::pin(async move {
3410                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3411                let cluster = match catalog_item {
3412                    CatalogItem::Table(_)
3413                    | CatalogItem::MaterializedView(_)
3414                    | CatalogItem::Source(_)
3415                    | CatalogItem::ContinualTask(_) => None,
3416                    CatalogItem::Index(index) => Some(index.cluster_id),
3417                    CatalogItem::Log(_)
3418                    | CatalogItem::View(_)
3419                    | CatalogItem::Sink(_)
3420                    | CatalogItem::Type(_)
3421                    | CatalogItem::Func(_)
3422                    | CatalogItem::Secret(_)
3423                    | CatalogItem::Connection(_) => unreachable!(),
3424                };
3425                match cluster {
3426                    Some(cluster) => {
3427                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3428                    }
3429                    None => {
3430                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3431                    }
3432                }
3433            })
3434        })
3435        .await?;
3436        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3437    }
3438
3439    #[instrument]
3440    pub(super) async fn sequence_alter_schema_rename(
3441        &mut self,
3442        ctx: &mut ExecuteContext,
3443        plan: plan::AlterSchemaRenamePlan,
3444    ) -> Result<ExecuteResponse, AdapterError> {
3445        let (database_spec, schema_spec) = plan.cur_schema_spec;
3446        let op = catalog::Op::RenameSchema {
3447            database_spec,
3448            schema_spec,
3449            new_name: plan.new_schema_name,
3450            check_reserved_names: true,
3451        };
3452        match self
3453            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3454            .await
3455        {
3456            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3457            Err(err) => Err(err),
3458        }
3459    }
3460
3461    #[instrument]
3462    pub(super) async fn sequence_alter_schema_swap(
3463        &mut self,
3464        ctx: &mut ExecuteContext,
3465        plan: plan::AlterSchemaSwapPlan,
3466    ) -> Result<ExecuteResponse, AdapterError> {
3467        let plan::AlterSchemaSwapPlan {
3468            schema_a_spec: (schema_a_db, schema_a),
3469            schema_a_name,
3470            schema_b_spec: (schema_b_db, schema_b),
3471            schema_b_name,
3472            name_temp,
3473        } = plan;
3474
3475        let op_a = catalog::Op::RenameSchema {
3476            database_spec: schema_a_db,
3477            schema_spec: schema_a,
3478            new_name: name_temp,
3479            check_reserved_names: false,
3480        };
3481        let op_b = catalog::Op::RenameSchema {
3482            database_spec: schema_b_db,
3483            schema_spec: schema_b,
3484            new_name: schema_a_name,
3485            check_reserved_names: false,
3486        };
3487        let op_c = catalog::Op::RenameSchema {
3488            database_spec: schema_a_db,
3489            schema_spec: schema_a,
3490            new_name: schema_b_name,
3491            check_reserved_names: false,
3492        };
3493
3494        match self
3495            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3496                Box::pin(async {})
3497            })
3498            .await
3499        {
3500            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3501            Err(err) => Err(err),
3502        }
3503    }
3504
3505    #[instrument]
3506    pub(super) async fn sequence_alter_role(
3507        &mut self,
3508        session: &Session,
3509        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3510    ) -> Result<ExecuteResponse, AdapterError> {
3511        let catalog = self.catalog().for_session(session);
3512        let role = catalog.get_role(&id);
3513
3514        // We'll send these notices to the user, if the operation is successful.
3515        let mut notices = vec![];
3516
3517        // Get the attributes and variables from the role, as they currently are.
3518        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3519        let mut vars = role.vars().clone();
3520
3521        // Whether to set the password to NULL. This is a special case since the existing
3522        // password is not stored in the role attributes.
3523        let mut nopassword = false;
3524
3525        // Apply our updates.
3526        match option {
3527            PlannedAlterRoleOption::Attributes(attrs) => {
3528                self.validate_role_attributes(&attrs.clone().into())?;
3529
3530                if let Some(inherit) = attrs.inherit {
3531                    attributes.inherit = inherit;
3532                }
3533
3534                if let Some(password) = attrs.password {
3535                    attributes.password = Some(password);
3536                }
3537
3538                if let Some(superuser) = attrs.superuser {
3539                    attributes.superuser = Some(superuser);
3540                }
3541
3542                if let Some(login) = attrs.login {
3543                    attributes.login = Some(login);
3544                }
3545
3546                if attrs.nopassword.unwrap_or(false) {
3547                    nopassword = true;
3548                }
3549
3550                if let Some(notice) = self.should_emit_rbac_notice(session) {
3551                    notices.push(notice);
3552                }
3553            }
3554            PlannedAlterRoleOption::Variable(variable) => {
3555                // Get the variable to make sure it's valid and visible.
3556                let session_var = session.vars().inspect(variable.name())?;
3557                // Return early if it's not visible.
3558                session_var.visible(session.user(), catalog.system_vars())?;
3559
3560                // Emit a warning when deprecated variables are used.
3561                // TODO(database-issues#8069) remove this after sufficient time has passed
3562                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3563                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3564                } else if let PlannedRoleVariable::Set {
3565                    name,
3566                    value: VariableValue::Values(vals),
3567                } = &variable
3568                {
3569                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3570                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3571                    }
3572                }
3573
3574                let var_name = match variable {
3575                    PlannedRoleVariable::Set { name, value } => {
3576                        // Update our persisted set.
3577                        match &value {
3578                            VariableValue::Default => {
3579                                vars.remove(&name);
3580                            }
3581                            VariableValue::Values(vals) => {
3582                                let var = match &vals[..] {
3583                                    [val] => OwnedVarInput::Flat(val.clone()),
3584                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3585                                };
3586                                // Make sure the input is valid.
3587                                session_var.check(var.borrow())?;
3588
3589                                vars.insert(name.clone(), var);
3590                            }
3591                        };
3592                        name
3593                    }
3594                    PlannedRoleVariable::Reset { name } => {
3595                        // Remove it from our persisted values.
3596                        vars.remove(&name);
3597                        name
3598                    }
3599                };
3600
3601                // Emit a notice that they need to reconnect to see the change take effect.
3602                notices.push(AdapterNotice::VarDefaultUpdated {
3603                    role: Some(name.clone()),
3604                    var_name: Some(var_name),
3605                });
3606            }
3607        }
3608
3609        let op = catalog::Op::AlterRole {
3610            id,
3611            name,
3612            attributes,
3613            nopassword,
3614            vars: RoleVars { map: vars },
3615        };
3616        let response = self
3617            .catalog_transact(Some(session), vec![op])
3618            .await
3619            .map(|_| ExecuteResponse::AlteredRole)?;
3620
3621        // Send all of our queued notices.
3622        session.add_notices(notices);
3623
3624        Ok(response)
3625    }
3626
3627    #[instrument]
3628    pub(super) async fn sequence_alter_sink_prepare(
3629        &mut self,
3630        ctx: ExecuteContext,
3631        plan: plan::AlterSinkPlan,
3632    ) {
3633        // Put a read hold on the new relation
3634        let id_bundle = crate::CollectionIdBundle {
3635            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3636            compute_ids: BTreeMap::new(),
3637        };
3638        let read_hold = self.acquire_read_holds(&id_bundle);
3639
3640        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3641            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3642            return;
3643        };
3644
3645        let otel_ctx = OpenTelemetryContext::obtain();
3646        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3647
3648        let plan_validity = PlanValidity::new(
3649            self.catalog().transient_revision(),
3650            BTreeSet::from_iter([plan.item_id, from_item_id]),
3651            Some(plan.in_cluster),
3652            None,
3653            ctx.session().role_metadata().clone(),
3654        );
3655
3656        info!(
3657            "preparing alter sink for {}: frontiers={:?} export={:?}",
3658            plan.global_id,
3659            self.controller
3660                .storage_collections
3661                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3662            self.controller.storage.export(plan.global_id)
3663        );
3664
3665        // Now we must wait for the sink to make enough progress such that there is overlap between
3666        // the new `from` collection's read hold and the sink's write frontier.
3667        //
3668        // TODO(database-issues#9820): If the sink is dropped while we are waiting for progress,
3669        // the watch set never completes and neither does the `ALTER SINK` command.
3670        self.install_storage_watch_set(
3671            ctx.session().conn_id().clone(),
3672            BTreeSet::from_iter([plan.global_id]),
3673            read_ts,
3674            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3675                ctx: Some(ctx),
3676                otel_ctx,
3677                plan,
3678                plan_validity,
3679                read_hold,
3680            }),
3681        );
3682    }
3683
3684    #[instrument]
3685    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3686        ctx.otel_ctx.attach_as_parent();
3687
3688        let plan::AlterSinkPlan {
3689            item_id,
3690            global_id,
3691            sink: sink_plan,
3692            with_snapshot,
3693            in_cluster,
3694        } = ctx.plan.clone();
3695
3696        // We avoid taking the DDL lock for `ALTER SINK SET FROM` commands, see
3697        // `Coordinator::must_serialize_ddl`. We therefore must assume that the world has
3698        // arbitrarily changed since we performed planning, and we must re-assert that it still
3699        // matches our requirements.
3700        //
3701        // The `PlanValidity` check ensures that both the sink and the new source relation still
3702        // exist. Apart from that we have to ensure that nobody else altered the sink in the mean
3703        // time, which we do by comparing the catalog sink version to the one in the plan.
3704        match ctx.plan_validity.check(self.catalog()) {
3705            Ok(()) => {}
3706            Err(err) => {
3707                ctx.retire(Err(err));
3708                return;
3709            }
3710        }
3711
3712        let entry = self.catalog().get_entry(&item_id);
3713        let CatalogItem::Sink(old_sink) = entry.item() else {
3714            panic!("invalid item kind for `AlterSinkPlan`");
3715        };
3716
3717        if sink_plan.version != old_sink.version + 1 {
3718            ctx.retire(Err(AdapterError::ChangedPlan(
3719                "sink was altered concurrently".into(),
3720            )));
3721            return;
3722        }
3723
3724        info!(
3725            "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3726            self.controller
3727                .storage_collections
3728                .collections_frontiers(vec![global_id, sink_plan.from]),
3729            self.controller.storage.export(global_id),
3730        );
3731
3732        // Assert that we can recover the updates that happened at the timestamps of the write
3733        // frontier. This must be true in this call.
3734        let write_frontier = &self
3735            .controller
3736            .storage
3737            .export(global_id)
3738            .expect("sink known to exist")
3739            .write_frontier;
3740        let as_of = ctx.read_hold.least_valid_read();
3741        assert!(
3742            write_frontier.iter().all(|t| as_of.less_than(t)),
3743            "{:?} should be strictly less than {:?}",
3744            &*as_of,
3745            &**write_frontier
3746        );
3747
3748        // Parse the `create_sql` so we can update it to the new sink definition.
3749        //
3750        // Note that we need to use the `create_sql` from the catalog here, not the one from the
3751        // sink plan. Even though we ensure that the sink version didn't change since planning, the
3752        // names in the `create_sql` may have changed, for example due to a schema swap.
3753        let create_sql = &old_sink.create_sql;
3754        let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3755        let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3756            unreachable!("invalid statement kind for sink");
3757        };
3758
3759        // Update the sink version.
3760        stmt.with_options
3761            .retain(|o| o.name != CreateSinkOptionName::Version);
3762        stmt.with_options.push(CreateSinkOption {
3763            name: CreateSinkOptionName::Version,
3764            value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3765                sink_plan.version.to_string(),
3766            ))),
3767        });
3768
3769        let conn_catalog = self.catalog().for_system_session();
3770        let (mut stmt, resolved_ids) =
3771            mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3772
3773        // Update the `from` relation.
3774        let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3775        let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3776        stmt.from = ResolvedItemName::Item {
3777            id: from_entry.id(),
3778            qualifiers: from_entry.name.qualifiers.clone(),
3779            full_name,
3780            print_id: true,
3781            version: from_entry.version,
3782        };
3783
3784        let new_sink = Sink {
3785            create_sql: stmt.to_ast_string_stable(),
3786            global_id,
3787            from: sink_plan.from,
3788            connection: sink_plan.connection.clone(),
3789            envelope: sink_plan.envelope,
3790            version: sink_plan.version,
3791            with_snapshot,
3792            resolved_ids: resolved_ids.clone(),
3793            cluster_id: in_cluster,
3794        };
3795
3796        let ops = vec![catalog::Op::UpdateItem {
3797            id: item_id,
3798            name: entry.name().clone(),
3799            to_item: CatalogItem::Sink(new_sink),
3800        }];
3801
3802        match self
3803            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3804            .await
3805        {
3806            Ok(()) => {}
3807            Err(err) => {
3808                ctx.retire(Err(err));
3809                return;
3810            }
3811        }
3812
3813        let storage_sink_desc = StorageSinkDesc {
3814            from: sink_plan.from,
3815            from_desc: from_entry
3816                .desc_opt()
3817                .expect("sinks can only be built on items with descs")
3818                .into_owned(),
3819            connection: sink_plan
3820                .connection
3821                .clone()
3822                .into_inline_connection(self.catalog().state()),
3823            envelope: sink_plan.envelope,
3824            as_of,
3825            with_snapshot,
3826            version: sink_plan.version,
3827            from_storage_metadata: (),
3828            to_storage_metadata: (),
3829        };
3830
3831        self.controller
3832            .storage
3833            .alter_export(
3834                global_id,
3835                ExportDescription {
3836                    sink: storage_sink_desc,
3837                    instance_id: in_cluster,
3838                },
3839            )
3840            .await
3841            .unwrap_or_terminate("cannot fail to alter source desc");
3842
3843        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3844    }
3845
3846    #[instrument]
3847    pub(super) async fn sequence_alter_connection(
3848        &mut self,
3849        ctx: ExecuteContext,
3850        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3851    ) {
3852        match action {
3853            AlterConnectionAction::RotateKeys => {
3854                self.sequence_rotate_keys(ctx, id).await;
3855            }
3856            AlterConnectionAction::AlterOptions {
3857                set_options,
3858                drop_options,
3859                validate,
3860            } => {
3861                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3862                    .await
3863            }
3864        }
3865    }
3866
3867    #[instrument]
3868    async fn sequence_alter_connection_options(
3869        &mut self,
3870        mut ctx: ExecuteContext,
3871        id: CatalogItemId,
3872        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3873        drop_options: BTreeSet<ConnectionOptionName>,
3874        validate: bool,
3875    ) {
3876        let cur_entry = self.catalog().get_entry(&id);
3877        let cur_conn = cur_entry.connection().expect("known to be connection");
3878        let connection_gid = cur_conn.global_id();
3879
3880        let inner = || -> Result<Connection, AdapterError> {
3881            // Parse statement.
3882            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3883                .expect("invalid create sql persisted to catalog")
3884                .into_element()
3885                .ast
3886            {
3887                Statement::CreateConnection(stmt) => stmt,
3888                _ => unreachable!("proved type is source"),
3889            };
3890
3891            let catalog = self.catalog().for_system_session();
3892
3893            // Resolve items in statement
3894            let (mut create_conn_stmt, resolved_ids) =
3895                mz_sql::names::resolve(&catalog, create_conn_stmt)
3896                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3897
3898            // Retain options that are neither set nor dropped.
3899            create_conn_stmt
3900                .values
3901                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3902
3903            // Set new values
3904            create_conn_stmt.values.extend(
3905                set_options
3906                    .into_iter()
3907                    .map(|(name, value)| ConnectionOption { name, value }),
3908            );
3909
3910            // Open a new catalog, which we will use to re-plan our
3911            // statement with the desired config.
3912            let mut catalog = self.catalog().for_system_session();
3913            catalog.mark_id_unresolvable_for_replanning(id);
3914
3915            // Re-define our source in terms of the amended statement
3916            let plan = match mz_sql::plan::plan(
3917                None,
3918                &catalog,
3919                Statement::CreateConnection(create_conn_stmt),
3920                &Params::empty(),
3921                &resolved_ids,
3922            )
3923            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3924            {
3925                Plan::CreateConnection(plan) => plan,
3926                _ => unreachable!("create source plan is only valid response"),
3927            };
3928
3929            // Parse statement.
3930            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3931                .expect("invalid create sql persisted to catalog")
3932                .into_element()
3933                .ast
3934            {
3935                Statement::CreateConnection(stmt) => stmt,
3936                _ => unreachable!("proved type is source"),
3937            };
3938
3939            let catalog = self.catalog().for_system_session();
3940
3941            // Resolve items in statement
3942            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3943                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3944
3945            Ok(Connection {
3946                create_sql: plan.connection.create_sql,
3947                global_id: cur_conn.global_id,
3948                details: plan.connection.details,
3949                resolved_ids: new_deps,
3950            })
3951        };
3952
3953        let conn = match inner() {
3954            Ok(conn) => conn,
3955            Err(e) => {
3956                return ctx.retire(Err(e));
3957            }
3958        };
3959
3960        if validate {
3961            let connection = conn
3962                .details
3963                .to_connection()
3964                .into_inline_connection(self.catalog().state());
3965
3966            let internal_cmd_tx = self.internal_cmd_tx.clone();
3967            let transient_revision = self.catalog().transient_revision();
3968            let conn_id = ctx.session().conn_id().clone();
3969            let otel_ctx = OpenTelemetryContext::obtain();
3970            let role_metadata = ctx.session().role_metadata().clone();
3971            let current_storage_parameters = self.controller.storage.config().clone();
3972
3973            task::spawn(
3974                || format!("validate_alter_connection:{conn_id}"),
3975                async move {
3976                    let resolved_ids = conn.resolved_ids.clone();
3977                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3978                    let result = match connection.validate(id, &current_storage_parameters).await {
3979                        Ok(()) => Ok(conn),
3980                        Err(err) => Err(err.into()),
3981                    };
3982
3983                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3984                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3985                        AlterConnectionValidationReady {
3986                            ctx,
3987                            result,
3988                            connection_id: id,
3989                            connection_gid,
3990                            plan_validity: PlanValidity::new(
3991                                transient_revision,
3992                                dependency_ids.clone(),
3993                                None,
3994                                None,
3995                                role_metadata,
3996                            ),
3997                            otel_ctx,
3998                            resolved_ids,
3999                        },
4000                    ));
4001                    if let Err(e) = result {
4002                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
4003                    }
4004                },
4005            );
4006        } else {
4007            let result = self
4008                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
4009                .await;
4010            ctx.retire(result);
4011        }
4012    }
4013
4014    #[instrument]
4015    pub(crate) async fn sequence_alter_connection_stage_finish(
4016        &mut self,
4017        session: &mut Session,
4018        id: CatalogItemId,
4019        connection: Connection,
4020    ) -> Result<ExecuteResponse, AdapterError> {
4021        match self.catalog.get_entry(&id).item() {
4022            CatalogItem::Connection(curr_conn) => {
4023                curr_conn
4024                    .details
4025                    .to_connection()
4026                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
4027                    .map_err(StorageError::from)?;
4028            }
4029            _ => unreachable!("known to be a connection"),
4030        };
4031
4032        let ops = vec![catalog::Op::UpdateItem {
4033            id,
4034            name: self.catalog.get_entry(&id).name().clone(),
4035            to_item: CatalogItem::Connection(connection.clone()),
4036        }];
4037
4038        self.catalog_transact(Some(session), ops).await?;
4039
4040        match connection.details {
4041            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
4042                let spec = VpcEndpointConfig {
4043                    aws_service_name: privatelink.service_name.to_owned(),
4044                    availability_zone_ids: privatelink.availability_zones.to_owned(),
4045                };
4046                self.cloud_resource_controller
4047                    .as_ref()
4048                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
4049                    .ensure_vpc_endpoint(id, spec)
4050                    .await?;
4051            }
4052            _ => {}
4053        };
4054
4055        let entry = self.catalog().get_entry(&id);
4056
4057        let mut connections = VecDeque::new();
4058        connections.push_front(entry.id());
4059
4060        let mut source_connections = BTreeMap::new();
4061        let mut sink_connections = BTreeMap::new();
4062        let mut source_export_data_configs = BTreeMap::new();
4063
4064        while let Some(id) = connections.pop_front() {
4065            for id in self.catalog.get_entry(&id).used_by() {
4066                let entry = self.catalog.get_entry(id);
4067                match entry.item() {
4068                    CatalogItem::Connection(_) => connections.push_back(*id),
4069                    CatalogItem::Source(source) => {
4070                        let desc = match &entry.source().expect("known to be source").data_source {
4071                            DataSourceDesc::Ingestion { desc, .. }
4072                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4073                                desc.clone().into_inline_connection(self.catalog().state())
4074                            }
4075                            _ => unreachable!("only ingestions reference connections"),
4076                        };
4077
4078                        source_connections.insert(source.global_id, desc.connection);
4079                    }
4080                    CatalogItem::Sink(sink) => {
4081                        let export = entry.sink().expect("known to be sink");
4082                        sink_connections.insert(
4083                            sink.global_id,
4084                            export
4085                                .connection
4086                                .clone()
4087                                .into_inline_connection(self.catalog().state()),
4088                        );
4089                    }
4090                    CatalogItem::Table(table) => {
4091                        // This is a source-fed table that reference a schema registry
4092                        // connection as a part of its encoding / data config
4093                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4094                            let data_config = export_data_config.clone();
4095                            source_export_data_configs.insert(
4096                                table.global_id_writes(),
4097                                data_config.into_inline_connection(self.catalog().state()),
4098                            );
4099                        }
4100                    }
4101                    t => unreachable!("connection dependency not expected on {:?}", t),
4102                }
4103            }
4104        }
4105
4106        if !source_connections.is_empty() {
4107            self.controller
4108                .storage
4109                .alter_ingestion_connections(source_connections)
4110                .await
4111                .unwrap_or_terminate("cannot fail to alter ingestion connection");
4112        }
4113
4114        if !sink_connections.is_empty() {
4115            self.controller
4116                .storage
4117                .alter_export_connections(sink_connections)
4118                .await
4119                .unwrap_or_terminate("altering exports after txn must succeed");
4120        }
4121
4122        if !source_export_data_configs.is_empty() {
4123            self.controller
4124                .storage
4125                .alter_ingestion_export_data_configs(source_export_data_configs)
4126                .await
4127                .unwrap_or_terminate("altering source export data configs after txn must succeed");
4128        }
4129
4130        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4131    }
4132
4133    #[instrument]
4134    pub(super) async fn sequence_alter_source(
4135        &mut self,
4136        session: &Session,
4137        plan::AlterSourcePlan {
4138            item_id,
4139            ingestion_id,
4140            action,
4141        }: plan::AlterSourcePlan,
4142    ) -> Result<ExecuteResponse, AdapterError> {
4143        let cur_entry = self.catalog().get_entry(&item_id);
4144        let cur_source = cur_entry.source().expect("known to be source");
4145
4146        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4147            // Parse statement.
4148            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4149                .expect("invalid create sql persisted to catalog")
4150                .into_element()
4151                .ast
4152            {
4153                Statement::CreateSource(stmt) => stmt,
4154                _ => unreachable!("proved type is source"),
4155            };
4156
4157            let catalog = coord.catalog().for_system_session();
4158
4159            // Resolve items in statement
4160            mz_sql::names::resolve(&catalog, create_source_stmt)
4161                .map_err(|e| AdapterError::internal(err_cx, e))
4162        };
4163
4164        match action {
4165            plan::AlterSourceAction::AddSubsourceExports {
4166                subsources,
4167                options,
4168            } => {
4169                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4170
4171                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4172                    text_columns: mut new_text_columns,
4173                    exclude_columns: mut new_exclude_columns,
4174                    ..
4175                } = options.try_into()?;
4176
4177                // Resolve items in statement
4178                let (mut create_source_stmt, resolved_ids) =
4179                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4180
4181                // Get all currently referred-to items
4182                let catalog = self.catalog();
4183                let curr_references: BTreeSet<_> = catalog
4184                    .get_entry(&item_id)
4185                    .used_by()
4186                    .into_iter()
4187                    .filter_map(|subsource| {
4188                        catalog
4189                            .get_entry(subsource)
4190                            .subsource_details()
4191                            .map(|(_id, reference, _details)| reference)
4192                    })
4193                    .collect();
4194
4195                // We are doing a lot of unwrapping, so just make an error to reference; all of
4196                // these invariants are guaranteed to be true because of how we plan subsources.
4197                let purification_err =
4198                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4199
4200                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
4201                // we remove support for implicitly created subsources from a `CREATE SOURCE`
4202                // statement.
4203                match &mut create_source_stmt.connection {
4204                    CreateSourceConnection::Postgres {
4205                        options: curr_options,
4206                        ..
4207                    } => {
4208                        let mz_sql::plan::PgConfigOptionExtracted {
4209                            mut text_columns, ..
4210                        } = curr_options.clone().try_into()?;
4211
4212                        // Drop text columns; we will add them back in
4213                        // as appropriate below.
4214                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4215
4216                        // Drop all text columns that are not currently referred to.
4217                        text_columns.retain(|column_qualified_reference| {
4218                            mz_ore::soft_assert_eq_or_log!(
4219                                column_qualified_reference.0.len(),
4220                                4,
4221                                "all TEXT COLUMNS values must be column-qualified references"
4222                            );
4223                            let mut table = column_qualified_reference.clone();
4224                            table.0.truncate(3);
4225                            curr_references.contains(&table)
4226                        });
4227
4228                        // Merge the current text columns into the new text columns.
4229                        new_text_columns.extend(text_columns);
4230
4231                        // If we have text columns, add them to the options.
4232                        if !new_text_columns.is_empty() {
4233                            new_text_columns.sort();
4234                            let new_text_columns = new_text_columns
4235                                .into_iter()
4236                                .map(WithOptionValue::UnresolvedItemName)
4237                                .collect();
4238
4239                            curr_options.push(PgConfigOption {
4240                                name: PgConfigOptionName::TextColumns,
4241                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4242                            });
4243                        }
4244                    }
4245                    CreateSourceConnection::MySql {
4246                        options: curr_options,
4247                        ..
4248                    } => {
4249                        let mz_sql::plan::MySqlConfigOptionExtracted {
4250                            mut text_columns,
4251                            mut exclude_columns,
4252                            ..
4253                        } = curr_options.clone().try_into()?;
4254
4255                        // Drop both ignore and text columns; we will add them back in
4256                        // as appropriate below.
4257                        curr_options.retain(|o| {
4258                            !matches!(
4259                                o.name,
4260                                MySqlConfigOptionName::TextColumns
4261                                    | MySqlConfigOptionName::ExcludeColumns
4262                            )
4263                        });
4264
4265                        // Drop all text / exclude columns that are not currently referred to.
4266                        let column_referenced =
4267                            |column_qualified_reference: &UnresolvedItemName| {
4268                                mz_ore::soft_assert_eq_or_log!(
4269                                    column_qualified_reference.0.len(),
4270                                    3,
4271                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4272                                );
4273                                let mut table = column_qualified_reference.clone();
4274                                table.0.truncate(2);
4275                                curr_references.contains(&table)
4276                            };
4277                        text_columns.retain(column_referenced);
4278                        exclude_columns.retain(column_referenced);
4279
4280                        // Merge the current text / exclude columns into the new text / exclude columns.
4281                        new_text_columns.extend(text_columns);
4282                        new_exclude_columns.extend(exclude_columns);
4283
4284                        // If we have text columns, add them to the options.
4285                        if !new_text_columns.is_empty() {
4286                            new_text_columns.sort();
4287                            let new_text_columns = new_text_columns
4288                                .into_iter()
4289                                .map(WithOptionValue::UnresolvedItemName)
4290                                .collect();
4291
4292                            curr_options.push(MySqlConfigOption {
4293                                name: MySqlConfigOptionName::TextColumns,
4294                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4295                            });
4296                        }
4297                        // If we have exclude columns, add them to the options.
4298                        if !new_exclude_columns.is_empty() {
4299                            new_exclude_columns.sort();
4300                            let new_exclude_columns = new_exclude_columns
4301                                .into_iter()
4302                                .map(WithOptionValue::UnresolvedItemName)
4303                                .collect();
4304
4305                            curr_options.push(MySqlConfigOption {
4306                                name: MySqlConfigOptionName::ExcludeColumns,
4307                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4308                            });
4309                        }
4310                    }
4311                    CreateSourceConnection::SqlServer {
4312                        options: curr_options,
4313                        ..
4314                    } => {
4315                        let mz_sql::plan::SqlServerConfigOptionExtracted {
4316                            mut text_columns,
4317                            mut exclude_columns,
4318                            ..
4319                        } = curr_options.clone().try_into()?;
4320
4321                        // Drop both ignore and text columns; we will add them back in
4322                        // as appropriate below.
4323                        curr_options.retain(|o| {
4324                            !matches!(
4325                                o.name,
4326                                SqlServerConfigOptionName::TextColumns
4327                                    | SqlServerConfigOptionName::ExcludeColumns
4328                            )
4329                        });
4330
4331                        // Drop all text / exclude columns that are not currently referred to.
4332                        let column_referenced =
4333                            |column_qualified_reference: &UnresolvedItemName| {
4334                                mz_ore::soft_assert_eq_or_log!(
4335                                    column_qualified_reference.0.len(),
4336                                    3,
4337                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4338                                );
4339                                let mut table = column_qualified_reference.clone();
4340                                table.0.truncate(2);
4341                                curr_references.contains(&table)
4342                            };
4343                        text_columns.retain(column_referenced);
4344                        exclude_columns.retain(column_referenced);
4345
4346                        // Merge the current text / exclude columns into the new text / exclude columns.
4347                        new_text_columns.extend(text_columns);
4348                        new_exclude_columns.extend(exclude_columns);
4349
4350                        // If we have text columns, add them to the options.
4351                        if !new_text_columns.is_empty() {
4352                            new_text_columns.sort();
4353                            let new_text_columns = new_text_columns
4354                                .into_iter()
4355                                .map(WithOptionValue::UnresolvedItemName)
4356                                .collect();
4357
4358                            curr_options.push(SqlServerConfigOption {
4359                                name: SqlServerConfigOptionName::TextColumns,
4360                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4361                            });
4362                        }
4363                        // If we have exclude columns, add them to the options.
4364                        if !new_exclude_columns.is_empty() {
4365                            new_exclude_columns.sort();
4366                            let new_exclude_columns = new_exclude_columns
4367                                .into_iter()
4368                                .map(WithOptionValue::UnresolvedItemName)
4369                                .collect();
4370
4371                            curr_options.push(SqlServerConfigOption {
4372                                name: SqlServerConfigOptionName::ExcludeColumns,
4373                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4374                            });
4375                        }
4376                    }
4377                    _ => return Err(purification_err()),
4378                };
4379
4380                let mut catalog = self.catalog().for_system_session();
4381                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4382
4383                // Re-define our source in terms of the amended statement
4384                let plan = match mz_sql::plan::plan(
4385                    None,
4386                    &catalog,
4387                    Statement::CreateSource(create_source_stmt),
4388                    &Params::empty(),
4389                    &resolved_ids,
4390                )
4391                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4392                {
4393                    Plan::CreateSource(plan) => plan,
4394                    _ => unreachable!("create source plan is only valid response"),
4395                };
4396
4397                // Asserting that we've done the right thing with dependencies
4398                // here requires mocking out objects in the catalog, which is a
4399                // large task for an operation we have to cover in tests anyway.
4400                let source = Source::new(
4401                    plan,
4402                    cur_source.global_id,
4403                    resolved_ids,
4404                    cur_source.custom_logical_compaction_window,
4405                    cur_source.is_retained_metrics_object,
4406                );
4407
4408                let source_compaction_window = source.custom_logical_compaction_window;
4409
4410                // Get new ingestion description for storage.
4411                let desc = match &source.data_source {
4412                    DataSourceDesc::Ingestion { desc, .. }
4413                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4414                        desc.clone().into_inline_connection(self.catalog().state())
4415                    }
4416                    _ => unreachable!("already verified of type ingestion"),
4417                };
4418
4419                self.controller
4420                    .storage
4421                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4422                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4423
4424                // Redefine source. This must be done before we create any new
4425                // subsources so that it has the right ingestion.
4426                let mut ops = vec![catalog::Op::UpdateItem {
4427                    id: item_id,
4428                    // Look this up again so we don't have to hold an immutable reference to the
4429                    // entry for so long.
4430                    name: self.catalog.get_entry(&item_id).name().clone(),
4431                    to_item: CatalogItem::Source(source),
4432                }];
4433
4434                let CreateSourceInner {
4435                    ops: new_ops,
4436                    sources,
4437                    if_not_exists_ids,
4438                } = self.create_source_inner(session, subsources).await?;
4439
4440                ops.extend(new_ops.into_iter());
4441
4442                assert!(
4443                    if_not_exists_ids.is_empty(),
4444                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4445                );
4446
4447                self.catalog_transact(Some(session), ops).await?;
4448
4449                let mut item_ids = BTreeSet::new();
4450                let mut collections = Vec::with_capacity(sources.len());
4451                for (item_id, source) in sources {
4452                    let status_id = self.catalog().resolve_builtin_storage_collection(
4453                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4454                    );
4455                    let source_status_collection_id =
4456                        Some(self.catalog().get_entry(&status_id).latest_global_id());
4457
4458                    let (data_source, status_collection_id) = match source.data_source {
4459                        // Subsources use source statuses.
4460                        DataSourceDesc::IngestionExport {
4461                            ingestion_id,
4462                            external_reference: _,
4463                            details,
4464                            data_config,
4465                        } => {
4466                            // TODO(parkmycar): We should probably check the type here, but I'm not sure if
4467                            // this will always be a Source or a Table.
4468                            let ingestion_id =
4469                                self.catalog().get_entry(&ingestion_id).latest_global_id();
4470                            (
4471                                DataSource::IngestionExport {
4472                                    ingestion_id,
4473                                    details,
4474                                    data_config: data_config
4475                                        .into_inline_connection(self.catalog().state()),
4476                                },
4477                                source_status_collection_id,
4478                            )
4479                        }
4480                        o => {
4481                            unreachable!(
4482                                "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4483                                o
4484                            )
4485                        }
4486                    };
4487
4488                    collections.push((
4489                        source.global_id,
4490                        CollectionDescription {
4491                            desc: source.desc.clone(),
4492                            data_source,
4493                            since: None,
4494                            status_collection_id,
4495                            timeline: Some(source.timeline.clone()),
4496                        },
4497                    ));
4498
4499                    item_ids.insert(item_id);
4500                }
4501
4502                let storage_metadata = self.catalog.state().storage_metadata();
4503
4504                self.controller
4505                    .storage
4506                    .create_collections(storage_metadata, None, collections)
4507                    .await
4508                    .unwrap_or_terminate("cannot fail to create collections");
4509
4510                self.initialize_storage_read_policies(
4511                    item_ids,
4512                    source_compaction_window.unwrap_or(CompactionWindow::Default),
4513                )
4514                .await;
4515            }
4516            plan::AlterSourceAction::RefreshReferences { references } => {
4517                self.catalog_transact(
4518                    Some(session),
4519                    vec![catalog::Op::UpdateSourceReferences {
4520                        source_id: item_id,
4521                        references: references.into(),
4522                    }],
4523                )
4524                .await?;
4525            }
4526        }
4527
4528        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4529    }
4530
4531    #[instrument]
4532    pub(super) async fn sequence_alter_system_set(
4533        &mut self,
4534        session: &Session,
4535        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4536    ) -> Result<ExecuteResponse, AdapterError> {
4537        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4538        // We want to ensure that the network policy we're switching too actually exists.
4539        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4540            self.validate_alter_system_network_policy(session, &value)?;
4541        }
4542
4543        let op = match value {
4544            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4545                name: name.clone(),
4546                value: OwnedVarInput::SqlSet(values),
4547            },
4548            plan::VariableValue::Default => {
4549                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4550            }
4551        };
4552        self.catalog_transact(Some(session), vec![op]).await?;
4553
4554        session.add_notice(AdapterNotice::VarDefaultUpdated {
4555            role: None,
4556            var_name: Some(name),
4557        });
4558        Ok(ExecuteResponse::AlteredSystemConfiguration)
4559    }
4560
4561    #[instrument]
4562    pub(super) async fn sequence_alter_system_reset(
4563        &mut self,
4564        session: &Session,
4565        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4566    ) -> Result<ExecuteResponse, AdapterError> {
4567        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4568        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4569        self.catalog_transact(Some(session), vec![op]).await?;
4570        session.add_notice(AdapterNotice::VarDefaultUpdated {
4571            role: None,
4572            var_name: Some(name),
4573        });
4574        Ok(ExecuteResponse::AlteredSystemConfiguration)
4575    }
4576
4577    #[instrument]
4578    pub(super) async fn sequence_alter_system_reset_all(
4579        &mut self,
4580        session: &Session,
4581        _: plan::AlterSystemResetAllPlan,
4582    ) -> Result<ExecuteResponse, AdapterError> {
4583        self.is_user_allowed_to_alter_system(session, None)?;
4584        let op = catalog::Op::ResetAllSystemConfiguration;
4585        self.catalog_transact(Some(session), vec![op]).await?;
4586        session.add_notice(AdapterNotice::VarDefaultUpdated {
4587            role: None,
4588            var_name: None,
4589        });
4590        Ok(ExecuteResponse::AlteredSystemConfiguration)
4591    }
4592
4593    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4594    fn is_user_allowed_to_alter_system(
4595        &self,
4596        session: &Session,
4597        var_name: Option<&str>,
4598    ) -> Result<(), AdapterError> {
4599        match (session.user().kind(), var_name) {
4600            // Only internal superusers can reset all system variables.
4601            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4602            // Whether or not a variable can be modified depends if we're an internal superuser.
4603            (UserKind::Superuser, Some(name))
4604                if session.user().is_internal()
4605                    || self.catalog().system_config().user_modifiable(name) =>
4606            {
4607                // In lieu of plumbing the user to all system config functions, just check that
4608                // the var is visible.
4609                let var = self.catalog().system_config().get(name)?;
4610                var.visible(session.user(), self.catalog().system_config())?;
4611                Ok(())
4612            }
4613            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4614            // session variables.
4615            (UserKind::Regular, Some(name))
4616                if self.catalog().system_config().user_modifiable(name) =>
4617            {
4618                Err(AdapterError::Unauthorized(
4619                    rbac::UnauthorizedError::Superuser {
4620                        action: format!("toggle the '{name}' system configuration parameter"),
4621                    },
4622                ))
4623            }
4624            _ => Err(AdapterError::Unauthorized(
4625                rbac::UnauthorizedError::MzSystem {
4626                    action: "alter system".into(),
4627                },
4628            )),
4629        }
4630    }
4631
4632    fn validate_alter_system_network_policy(
4633        &self,
4634        session: &Session,
4635        policy_value: &plan::VariableValue,
4636    ) -> Result<(), AdapterError> {
4637        let policy_name = match &policy_value {
4638            // Make sure the compiled in default still exists.
4639            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4640            plan::VariableValue::Values(values) if values.len() == 1 => {
4641                values.iter().next().cloned()
4642            }
4643            plan::VariableValue::Values(values) => {
4644                tracing::warn!(?values, "can't set multiple network policies at once");
4645                None
4646            }
4647        };
4648        let maybe_network_policy = policy_name
4649            .as_ref()
4650            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4651        let Some(network_policy) = maybe_network_policy else {
4652            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4653                VarError::InvalidParameterValue {
4654                    name: NETWORK_POLICY.name(),
4655                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4656                    reason: "no network policy with such name exists".to_string(),
4657                },
4658            )));
4659        };
4660        self.validate_alter_network_policy(session, &network_policy.rules)
4661    }
4662
4663    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4664    ///
4665    /// This helps prevent users from modifying network policies in a way that would lock out their
4666    /// current connection.
4667    fn validate_alter_network_policy(
4668        &self,
4669        session: &Session,
4670        policy_rules: &Vec<NetworkPolicyRule>,
4671    ) -> Result<(), AdapterError> {
4672        // If the user is not an internal user attempt to protect them from
4673        // blocking themselves.
4674        if session.user().is_internal() {
4675            return Ok(());
4676        }
4677        if let Some(ip) = session.meta().client_ip() {
4678            validate_ip_with_policy_rules(ip, policy_rules)
4679                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4680        } else {
4681            // Sessions without IPs are only temporarily constructed for default values
4682            // they should not be permitted here.
4683            return Err(AdapterError::NetworkPolicyDenied(
4684                NetworkPolicyError::MissingIp,
4685            ));
4686        }
4687        Ok(())
4688    }
4689
4690    // Returns the name of the portal to execute.
4691    #[instrument]
4692    pub(super) fn sequence_execute(
4693        &mut self,
4694        session: &mut Session,
4695        plan: plan::ExecutePlan,
4696    ) -> Result<String, AdapterError> {
4697        // Verify the stmt is still valid.
4698        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4699        let ps = session
4700            .get_prepared_statement_unverified(&plan.name)
4701            .expect("known to exist");
4702        let stmt = ps.stmt().cloned();
4703        let desc = ps.desc().clone();
4704        let state_revision = ps.state_revision;
4705        let logging = Arc::clone(ps.logging());
4706        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4707    }
4708
4709    #[instrument]
4710    pub(super) async fn sequence_grant_privileges(
4711        &mut self,
4712        session: &Session,
4713        plan::GrantPrivilegesPlan {
4714            update_privileges,
4715            grantees,
4716        }: plan::GrantPrivilegesPlan,
4717    ) -> Result<ExecuteResponse, AdapterError> {
4718        self.sequence_update_privileges(
4719            session,
4720            update_privileges,
4721            grantees,
4722            UpdatePrivilegeVariant::Grant,
4723        )
4724        .await
4725    }
4726
4727    #[instrument]
4728    pub(super) async fn sequence_revoke_privileges(
4729        &mut self,
4730        session: &Session,
4731        plan::RevokePrivilegesPlan {
4732            update_privileges,
4733            revokees,
4734        }: plan::RevokePrivilegesPlan,
4735    ) -> Result<ExecuteResponse, AdapterError> {
4736        self.sequence_update_privileges(
4737            session,
4738            update_privileges,
4739            revokees,
4740            UpdatePrivilegeVariant::Revoke,
4741        )
4742        .await
4743    }
4744
4745    #[instrument]
4746    async fn sequence_update_privileges(
4747        &mut self,
4748        session: &Session,
4749        update_privileges: Vec<UpdatePrivilege>,
4750        grantees: Vec<RoleId>,
4751        variant: UpdatePrivilegeVariant,
4752    ) -> Result<ExecuteResponse, AdapterError> {
4753        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4754        let mut warnings = Vec::new();
4755        let catalog = self.catalog().for_session(session);
4756
4757        for UpdatePrivilege {
4758            acl_mode,
4759            target_id,
4760            grantor,
4761        } in update_privileges
4762        {
4763            let actual_object_type = catalog.get_system_object_type(&target_id);
4764            // For all relations we allow all applicable table privileges, but send a warning if the
4765            // privilege isn't actually applicable to the object type.
4766            if actual_object_type.is_relation() {
4767                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4768                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4769                if !non_applicable_privileges.is_empty() {
4770                    let object_description =
4771                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4772                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4773                        non_applicable_privileges,
4774                        object_description,
4775                    })
4776                }
4777            }
4778
4779            if let SystemObjectId::Object(object_id) = &target_id {
4780                self.catalog()
4781                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4782            }
4783
4784            let privileges = self
4785                .catalog()
4786                .get_privileges(&target_id, session.conn_id())
4787                // Should be unreachable since the parser will refuse to parse grant/revoke
4788                // statements on objects without privileges.
4789                .ok_or(AdapterError::Unsupported(
4790                    "GRANTs/REVOKEs on an object type with no privileges",
4791                ))?;
4792
4793            for grantee in &grantees {
4794                self.catalog().ensure_not_system_role(grantee)?;
4795                self.catalog().ensure_not_predefined_role(grantee)?;
4796                let existing_privilege = privileges
4797                    .get_acl_item(grantee, &grantor)
4798                    .map(Cow::Borrowed)
4799                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4800
4801                match variant {
4802                    UpdatePrivilegeVariant::Grant
4803                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4804                    {
4805                        ops.push(catalog::Op::UpdatePrivilege {
4806                            target_id: target_id.clone(),
4807                            privilege: MzAclItem {
4808                                grantee: *grantee,
4809                                grantor,
4810                                acl_mode,
4811                            },
4812                            variant,
4813                        });
4814                    }
4815                    UpdatePrivilegeVariant::Revoke
4816                        if !existing_privilege
4817                            .acl_mode
4818                            .intersection(acl_mode)
4819                            .is_empty() =>
4820                    {
4821                        ops.push(catalog::Op::UpdatePrivilege {
4822                            target_id: target_id.clone(),
4823                            privilege: MzAclItem {
4824                                grantee: *grantee,
4825                                grantor,
4826                                acl_mode,
4827                            },
4828                            variant,
4829                        });
4830                    }
4831                    // no-op
4832                    _ => {}
4833                }
4834            }
4835        }
4836
4837        if ops.is_empty() {
4838            session.add_notices(warnings);
4839            return Ok(variant.into());
4840        }
4841
4842        let res = self
4843            .catalog_transact(Some(session), ops)
4844            .await
4845            .map(|_| match variant {
4846                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4847                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4848            });
4849        if res.is_ok() {
4850            session.add_notices(warnings);
4851        }
4852        res
4853    }
4854
4855    #[instrument]
4856    pub(super) async fn sequence_alter_default_privileges(
4857        &mut self,
4858        session: &Session,
4859        plan::AlterDefaultPrivilegesPlan {
4860            privilege_objects,
4861            privilege_acl_items,
4862            is_grant,
4863        }: plan::AlterDefaultPrivilegesPlan,
4864    ) -> Result<ExecuteResponse, AdapterError> {
4865        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4866        let variant = if is_grant {
4867            UpdatePrivilegeVariant::Grant
4868        } else {
4869            UpdatePrivilegeVariant::Revoke
4870        };
4871        for privilege_object in &privilege_objects {
4872            self.catalog()
4873                .ensure_not_system_role(&privilege_object.role_id)?;
4874            self.catalog()
4875                .ensure_not_predefined_role(&privilege_object.role_id)?;
4876            if let Some(database_id) = privilege_object.database_id {
4877                self.catalog()
4878                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4879            }
4880            if let Some(schema_id) = privilege_object.schema_id {
4881                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4882                let schema_spec: SchemaSpecifier = schema_id.into();
4883
4884                self.catalog().ensure_not_reserved_object(
4885                    &(database_spec, schema_spec).into(),
4886                    session.conn_id(),
4887                )?;
4888            }
4889            for privilege_acl_item in &privilege_acl_items {
4890                self.catalog()
4891                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4892                self.catalog()
4893                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4894                ops.push(catalog::Op::UpdateDefaultPrivilege {
4895                    privilege_object: privilege_object.clone(),
4896                    privilege_acl_item: privilege_acl_item.clone(),
4897                    variant,
4898                })
4899            }
4900        }
4901
4902        self.catalog_transact(Some(session), ops).await?;
4903        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4904    }
4905
4906    #[instrument]
4907    pub(super) async fn sequence_grant_role(
4908        &mut self,
4909        session: &Session,
4910        plan::GrantRolePlan {
4911            role_ids,
4912            member_ids,
4913            grantor_id,
4914        }: plan::GrantRolePlan,
4915    ) -> Result<ExecuteResponse, AdapterError> {
4916        let catalog = self.catalog();
4917        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4918        for role_id in role_ids {
4919            for member_id in &member_ids {
4920                let member_membership: BTreeSet<_> =
4921                    catalog.get_role(member_id).membership().keys().collect();
4922                if member_membership.contains(&role_id) {
4923                    let role_name = catalog.get_role(&role_id).name().to_string();
4924                    let member_name = catalog.get_role(member_id).name().to_string();
4925                    // We need this check so we don't accidentally return a success on a reserved role.
4926                    catalog.ensure_not_reserved_role(member_id)?;
4927                    catalog.ensure_grantable_role(&role_id)?;
4928                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4929                        role_name,
4930                        member_name,
4931                    });
4932                } else {
4933                    ops.push(catalog::Op::GrantRole {
4934                        role_id,
4935                        member_id: *member_id,
4936                        grantor_id,
4937                    });
4938                }
4939            }
4940        }
4941
4942        if ops.is_empty() {
4943            return Ok(ExecuteResponse::GrantedRole);
4944        }
4945
4946        self.catalog_transact(Some(session), ops)
4947            .await
4948            .map(|_| ExecuteResponse::GrantedRole)
4949    }
4950
4951    #[instrument]
4952    pub(super) async fn sequence_revoke_role(
4953        &mut self,
4954        session: &Session,
4955        plan::RevokeRolePlan {
4956            role_ids,
4957            member_ids,
4958            grantor_id,
4959        }: plan::RevokeRolePlan,
4960    ) -> Result<ExecuteResponse, AdapterError> {
4961        let catalog = self.catalog();
4962        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4963        for role_id in role_ids {
4964            for member_id in &member_ids {
4965                let member_membership: BTreeSet<_> =
4966                    catalog.get_role(member_id).membership().keys().collect();
4967                if !member_membership.contains(&role_id) {
4968                    let role_name = catalog.get_role(&role_id).name().to_string();
4969                    let member_name = catalog.get_role(member_id).name().to_string();
4970                    // We need this check so we don't accidentally return a success on a reserved role.
4971                    catalog.ensure_not_reserved_role(member_id)?;
4972                    catalog.ensure_grantable_role(&role_id)?;
4973                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4974                        role_name,
4975                        member_name,
4976                    });
4977                } else {
4978                    ops.push(catalog::Op::RevokeRole {
4979                        role_id,
4980                        member_id: *member_id,
4981                        grantor_id,
4982                    });
4983                }
4984            }
4985        }
4986
4987        if ops.is_empty() {
4988            return Ok(ExecuteResponse::RevokedRole);
4989        }
4990
4991        self.catalog_transact(Some(session), ops)
4992            .await
4993            .map(|_| ExecuteResponse::RevokedRole)
4994    }
4995
4996    #[instrument]
4997    pub(super) async fn sequence_alter_owner(
4998        &mut self,
4999        session: &Session,
5000        plan::AlterOwnerPlan {
5001            id,
5002            object_type,
5003            new_owner,
5004        }: plan::AlterOwnerPlan,
5005    ) -> Result<ExecuteResponse, AdapterError> {
5006        let mut ops = vec![catalog::Op::UpdateOwner {
5007            id: id.clone(),
5008            new_owner,
5009        }];
5010
5011        match &id {
5012            ObjectId::Item(global_id) => {
5013                let entry = self.catalog().get_entry(global_id);
5014
5015                // Cannot directly change the owner of an index.
5016                if entry.is_index() {
5017                    let name = self
5018                        .catalog()
5019                        .resolve_full_name(entry.name(), Some(session.conn_id()))
5020                        .to_string();
5021                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
5022                    return Ok(ExecuteResponse::AlteredObject(object_type));
5023                }
5024
5025                // Alter owner cascades down to dependent indexes.
5026                let dependent_index_ops = entry
5027                    .used_by()
5028                    .into_iter()
5029                    .filter(|id| self.catalog().get_entry(id).is_index())
5030                    .map(|id| catalog::Op::UpdateOwner {
5031                        id: ObjectId::Item(*id),
5032                        new_owner,
5033                    });
5034                ops.extend(dependent_index_ops);
5035
5036                // Alter owner cascades down to progress collections.
5037                let dependent_subsources =
5038                    entry
5039                        .progress_id()
5040                        .into_iter()
5041                        .map(|item_id| catalog::Op::UpdateOwner {
5042                            id: ObjectId::Item(item_id),
5043                            new_owner,
5044                        });
5045                ops.extend(dependent_subsources);
5046            }
5047            ObjectId::Cluster(cluster_id) => {
5048                let cluster = self.catalog().get_cluster(*cluster_id);
5049                // Alter owner cascades down to cluster replicas.
5050                let managed_cluster_replica_ops =
5051                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
5052                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
5053                        new_owner,
5054                    });
5055                ops.extend(managed_cluster_replica_ops);
5056            }
5057            _ => {}
5058        }
5059
5060        self.catalog_transact(Some(session), ops)
5061            .await
5062            .map(|_| ExecuteResponse::AlteredObject(object_type))
5063    }
5064
5065    #[instrument]
5066    pub(super) async fn sequence_reassign_owned(
5067        &mut self,
5068        session: &Session,
5069        plan::ReassignOwnedPlan {
5070            old_roles,
5071            new_role,
5072            reassign_ids,
5073        }: plan::ReassignOwnedPlan,
5074    ) -> Result<ExecuteResponse, AdapterError> {
5075        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5076            self.catalog().ensure_not_reserved_role(role_id)?;
5077        }
5078
5079        let ops = reassign_ids
5080            .into_iter()
5081            .map(|id| catalog::Op::UpdateOwner {
5082                id,
5083                new_owner: new_role,
5084            })
5085            .collect();
5086
5087        self.catalog_transact(Some(session), ops)
5088            .await
5089            .map(|_| ExecuteResponse::ReassignOwned)
5090    }
5091
5092    #[instrument]
5093    pub(crate) async fn handle_deferred_statement(&mut self) {
5094        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
5095        // was processed, removing the single element from deferred_statements, so it is expected
5096        // that this is sometimes empty.
5097        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5098            return;
5099        };
5100        match ps {
5101            crate::coord::PlanStatement::Statement { stmt, params } => {
5102                self.handle_execute_inner(stmt, params, ctx).await;
5103            }
5104            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5105                self.sequence_plan(ctx, plan, resolved_ids).await;
5106            }
5107        }
5108    }
5109
5110    #[instrument]
5111    // TODO(parkmycar): Remove this once we have an actual implementation.
5112    #[allow(clippy::unused_async)]
5113    pub(super) async fn sequence_alter_table(
5114        &mut self,
5115        ctx: &mut ExecuteContext,
5116        plan: plan::AlterTablePlan,
5117    ) -> Result<ExecuteResponse, AdapterError> {
5118        let plan::AlterTablePlan {
5119            relation_id,
5120            column_name,
5121            column_type,
5122            raw_sql_type,
5123        } = plan;
5124
5125        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
5126        let id_ts = self.get_catalog_write_ts().await;
5127        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5128        let ops = vec![catalog::Op::AlterAddColumn {
5129            id: relation_id,
5130            new_global_id,
5131            name: column_name,
5132            typ: column_type,
5133            sql: raw_sql_type,
5134        }];
5135
5136        let entry = self.catalog().get_entry(&relation_id);
5137        let CatalogItem::Table(table) = &entry.item else {
5138            let err = format!("expected table, found {:?}", entry.item);
5139            return Err(AdapterError::Internal(err));
5140        };
5141        // Expected schema version, before altering the table.
5142        let expected_version = table.desc.latest_version();
5143        let existing_global_id = table.global_id_writes();
5144
5145        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5146            Box::pin(async move {
5147                let entry = coord.catalog().get_entry(&relation_id);
5148                let CatalogItem::Table(table) = &entry.item else {
5149                    panic!("programming error, expected table found {:?}", entry.item);
5150                };
5151                let table = table.clone();
5152
5153                // Acquire a read hold on the original table for the duration of
5154                // the alter to prevent the since of the original table from
5155                // getting advanced, while the ALTER is running.
5156                let existing_table = crate::CollectionIdBundle {
5157                    storage_ids: btreeset![existing_global_id],
5158                    compute_ids: BTreeMap::new(),
5159                };
5160                let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5161
5162                let new_version = table.desc.latest_version();
5163                let new_desc = table
5164                    .desc
5165                    .at_version(RelationVersionSelector::Specific(new_version));
5166                let register_ts = coord.get_local_write_ts().await.timestamp;
5167
5168                // Alter the table description, creating a "new" collection.
5169                coord
5170                    .controller
5171                    .storage
5172                    .alter_table_desc(
5173                        existing_global_id,
5174                        new_global_id,
5175                        new_desc,
5176                        expected_version,
5177                        register_ts,
5178                    )
5179                    .await
5180                    .expect("failed to alter desc of table");
5181
5182                // Initialize the ReadPolicy which ensures we have the correct read holds.
5183                let compaction_window = table
5184                    .custom_logical_compaction_window
5185                    .unwrap_or(CompactionWindow::Default);
5186                coord
5187                    .initialize_read_policies(
5188                        &crate::CollectionIdBundle {
5189                            storage_ids: btreeset![new_global_id],
5190                            compute_ids: BTreeMap::new(),
5191                        },
5192                        compaction_window,
5193                    )
5194                    .await;
5195                coord.apply_local_write(register_ts).await;
5196
5197                // Alter is complete! We can drop our read hold.
5198                drop(existing_table_read_hold);
5199            })
5200        })
5201        .await?;
5202
5203        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5204    }
5205}
5206
5207#[derive(Debug)]
5208struct CachedStatisticsOracle {
5209    cache: BTreeMap<GlobalId, usize>,
5210}
5211
5212impl CachedStatisticsOracle {
5213    pub async fn new<T: TimelyTimestamp>(
5214        ids: &BTreeSet<GlobalId>,
5215        as_of: &Antichain<T>,
5216        storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5217    ) -> Result<Self, StorageError<T>> {
5218        let mut cache = BTreeMap::new();
5219
5220        for id in ids {
5221            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5222
5223            match stats {
5224                Ok(stats) => {
5225                    cache.insert(*id, stats.num_updates);
5226                }
5227                Err(StorageError::IdentifierMissing(id)) => {
5228                    ::tracing::debug!("no statistics for {id}")
5229                }
5230                Err(e) => return Err(e),
5231            }
5232        }
5233
5234        Ok(Self { cache })
5235    }
5236}
5237
5238impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5239    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5240        self.cache.get(&id).map(|estimate| *estimate)
5241    }
5242
5243    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5244        self.cache.clone()
5245    }
5246}
5247
5248impl Coordinator {
5249    pub(super) async fn statistics_oracle(
5250        &self,
5251        session: &Session,
5252        source_ids: &BTreeSet<GlobalId>,
5253        query_as_of: &Antichain<Timestamp>,
5254        is_oneshot: bool,
5255    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5256        if !session.vars().enable_session_cardinality_estimates() {
5257            return Ok(Box::new(EmptyStatisticsOracle));
5258        }
5259
5260        let timeout = if is_oneshot {
5261            // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
5262            self.catalog()
5263                .system_config()
5264                .optimizer_oneshot_stats_timeout()
5265        } else {
5266            self.catalog().system_config().optimizer_stats_timeout()
5267        };
5268
5269        let cached_stats = mz_ore::future::timeout(
5270            timeout,
5271            CachedStatisticsOracle::new(
5272                source_ids,
5273                query_as_of,
5274                self.controller.storage_collections.as_ref(),
5275            ),
5276        )
5277        .await;
5278
5279        match cached_stats {
5280            Ok(stats) => Ok(Box::new(stats)),
5281            Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5282                warn!(
5283                    is_oneshot = is_oneshot,
5284                    "optimizer statistics collection timed out after {}ms",
5285                    timeout.as_millis()
5286                );
5287
5288                Ok(Box::new(EmptyStatisticsOracle))
5289            }
5290            Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5291        }
5292    }
5293}
5294
5295/// Checks whether we should emit diagnostic
5296/// information associated with reading per-replica sources.
5297///
5298/// If an unrecoverable error is found (today: an untargeted read on a
5299/// cluster with a non-1 number of replicas), return that.  Otherwise,
5300/// return a list of associated notices (today: we always emit exactly
5301/// one notice if there are any per-replica log dependencies and if
5302/// `emit_introspection_query_notice` is set, and none otherwise.)
5303pub(super) fn check_log_reads(
5304    catalog: &Catalog,
5305    cluster: &Cluster,
5306    source_ids: &BTreeSet<GlobalId>,
5307    target_replica: &mut Option<ReplicaId>,
5308    vars: &SessionVars,
5309) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5310where
5311{
5312    let log_names = source_ids
5313        .iter()
5314        .map(|gid| catalog.resolve_item_id(gid))
5315        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5316        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5317        .collect::<Vec<_>>();
5318
5319    if log_names.is_empty() {
5320        return Ok(None);
5321    }
5322
5323    // Reading from log sources on replicated clusters is only allowed if a
5324    // target replica is selected. Otherwise, we have no way of knowing which
5325    // replica we read the introspection data from.
5326    let num_replicas = cluster.replicas().count();
5327    if target_replica.is_none() {
5328        if num_replicas == 1 {
5329            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5330        } else {
5331            return Err(AdapterError::UntargetedLogRead { log_names });
5332        }
5333    }
5334
5335    // Ensure that logging is initialized for the target replica, lest
5336    // we try to read from a non-existing arrangement.
5337    let replica_id = target_replica.expect("set to `Some` above");
5338    let replica = &cluster.replica(replica_id).expect("Replica must exist");
5339    if !replica.config.compute.logging.enabled() {
5340        return Err(AdapterError::IntrospectionDisabled { log_names });
5341    }
5342
5343    Ok(vars
5344        .emit_introspection_query_notice()
5345        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5346}
5347
5348impl Coordinator {
5349    /// Forward notices that we got from the optimizer.
5350    fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5351        // `for_session` below is expensive, so return early if there's nothing to do.
5352        if notices.is_empty() {
5353            return;
5354        }
5355        let humanizer = self.catalog.for_session(session);
5356        let system_vars = self.catalog.system_config();
5357        for notice in notices {
5358            let kind = OptimizerNoticeKind::from(notice);
5359            let notice_enabled = match kind {
5360                OptimizerNoticeKind::IndexAlreadyExists => {
5361                    system_vars.enable_notices_for_index_already_exists()
5362                }
5363                OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5364                    system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5365                }
5366                OptimizerNoticeKind::IndexKeyEmpty => {
5367                    system_vars.enable_notices_for_index_empty_key()
5368                }
5369            };
5370            if notice_enabled {
5371                // We don't need to redact the notice parts because
5372                // `emit_optimizer_notices` is only called by the `sequence_~`
5373                // method for the statement that produces that notice.
5374                session.add_notice(AdapterNotice::OptimizerNotice {
5375                    notice: notice.message(&humanizer, false).to_string(),
5376                    hint: notice.hint(&humanizer, false).to_string(),
5377                });
5378            }
5379            self.metrics
5380                .optimization_notices
5381                .with_label_values(&[kind.metric_label()])
5382                .inc_by(1);
5383        }
5384    }
5385
5386    /// Process the metainfo from a newly created non-transient dataflow.
5387    async fn process_dataflow_metainfo(
5388        &mut self,
5389        df_meta: DataflowMetainfo,
5390        export_id: GlobalId,
5391        ctx: Option<&mut ExecuteContext>,
5392        notice_ids: Vec<GlobalId>,
5393    ) -> Option<BuiltinTableAppendNotify> {
5394        // Emit raw notices to the user.
5395        if let Some(ctx) = ctx {
5396            self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5397        }
5398
5399        // Create a metainfo with rendered notices.
5400        let df_meta = self
5401            .catalog()
5402            .render_notices(df_meta, notice_ids, Some(export_id));
5403
5404        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
5405        // in-memory state.
5406        if self.catalog().state().system_config().enable_mz_notices()
5407            && !df_meta.optimizer_notices.is_empty()
5408        {
5409            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5410            self.catalog().state().pack_optimizer_notices(
5411                &mut builtin_table_updates,
5412                df_meta.optimizer_notices.iter(),
5413                Diff::ONE,
5414            );
5415
5416            // Save the metainfo.
5417            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5418
5419            Some(
5420                self.builtin_table_update()
5421                    .execute(builtin_table_updates)
5422                    .await
5423                    .0,
5424            )
5425        } else {
5426            // Save the metainfo.
5427            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5428
5429            None
5430        }
5431    }
5432}