mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27    CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46    CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47    RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50    AlterSourceAddSubsourceOption, CreateSourceOptionName, CreateSubsourceOption,
51    CreateSubsourceOptionName, SqlServerConfigOption, SqlServerConfigOptionName,
52};
53use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
54use mz_sql::catalog::{
55    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
56    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
57    ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
58};
59use mz_sql::names::{
60    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
61    SchemaSpecifier, SystemObjectId,
62};
63use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_storage_types::stats::RelationPartStats;
92use mz_transform::EmptyStatisticsOracle;
93use mz_transform::dataflow::DataflowMetainfo;
94use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
95use smallvec::SmallVec;
96use timely::progress::Antichain;
97use timely::progress::Timestamp as TimelyTimestamp;
98use tokio::sync::{oneshot, watch};
99use tracing::{Instrument, Span, info, warn};
100
101use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
102use crate::command::{ExecuteResponse, Response};
103use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
104use crate::coord::{
105    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
106    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
107    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
108    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
109    validate_ip_with_policy_rules,
110};
111use crate::error::AdapterError;
112use crate::notice::{AdapterNotice, DroppedInUseIndex};
113use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
114use crate::optimize::{self, Optimize};
115use crate::session::{
116    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
117    WriteLocks, WriteOp,
118};
119use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
120use crate::{PeekResponseUnary, ReadHolds};
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = match res {
255                        Ok(next) => return_if_err!(next, ctx),
256                        Err(err) => {
257                            tracing::error!("sequence_staged join error {err}");
258                            ctx.retire(Err(AdapterError::Internal(
259                                "sequence_staged join error".into(),
260                            )));
261                            return;
262                        }
263                    };
264                    f(ctx, next);
265                }
266                _ = rx, if cancel_enabled => {
267                    ctx.retire(Err(AdapterError::Canceled));
268                }
269            }
270        });
271    }
272
273    async fn create_source_inner(
274        &self,
275        session: &Session,
276        plans: Vec<plan::CreateSourcePlanBundle>,
277    ) -> Result<CreateSourceInner, AdapterError> {
278        let mut ops = vec![];
279        let mut sources = vec![];
280
281        let if_not_exists_ids = plans
282            .iter()
283            .filter_map(
284                |plan::CreateSourcePlanBundle {
285                     item_id,
286                     global_id: _,
287                     plan,
288                     resolved_ids: _,
289                     available_source_references: _,
290                 }| {
291                    if plan.if_not_exists {
292                        Some((*item_id, plan.name.clone()))
293                    } else {
294                        None
295                    }
296                },
297            )
298            .collect::<BTreeMap<_, _>>();
299
300        for plan::CreateSourcePlanBundle {
301            item_id,
302            global_id,
303            mut plan,
304            resolved_ids,
305            available_source_references,
306        } in plans
307        {
308            let name = plan.name.clone();
309
310            match plan.source.data_source {
311                plan::DataSourceDesc::Ingestion(ref desc)
312                | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
313                    let cluster_id = plan
314                        .in_cluster
315                        .expect("ingestion plans must specify cluster");
316                    match desc.connection {
317                        GenericSourceConnection::Postgres(_)
318                        | GenericSourceConnection::MySql(_)
319                        | GenericSourceConnection::SqlServer(_)
320                        | GenericSourceConnection::Kafka(_)
321                        | GenericSourceConnection::LoadGenerator(_) => {
322                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
323                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
324                                    .get(self.catalog().system_config().dyncfgs());
325
326                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
327                                {
328                                    return Err(AdapterError::Unsupported(
329                                        "sources in clusters with >1 replicas",
330                                    ));
331                                }
332                            }
333                        }
334                    }
335                }
336                plan::DataSourceDesc::Webhook { .. } => {
337                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
338                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
339                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
340                            .get(self.catalog().system_config().dyncfgs());
341
342                        if !enable_multi_replica_sources {
343                            if cluster.replica_ids().len() > 1 {
344                                return Err(AdapterError::Unsupported(
345                                    "webhook sources in clusters with >1 replicas",
346                                ));
347                            }
348                        }
349                    }
350                }
351                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
352            }
353
354            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
355            if let mz_sql::plan::DataSourceDesc::Webhook {
356                validate_using: Some(validate),
357                ..
358            } = &mut plan.source.data_source
359            {
360                if let Err(reason) = validate.reduce_expression().await {
361                    self.metrics
362                        .webhook_validation_reduce_failures
363                        .with_label_values(&[reason])
364                        .inc();
365                    return Err(AdapterError::Internal(format!(
366                        "failed to reduce check expression, {reason}"
367                    )));
368                }
369            }
370
371            // If this source contained a set of available source references, update the
372            // source references catalog table.
373            let mut reference_ops = vec![];
374            if let Some(references) = &available_source_references {
375                reference_ops.push(catalog::Op::UpdateSourceReferences {
376                    source_id: item_id,
377                    references: references.clone().into(),
378                });
379            }
380
381            let source = Source::new(plan, global_id, resolved_ids, None, false);
382            ops.push(catalog::Op::CreateItem {
383                id: item_id,
384                name,
385                item: CatalogItem::Source(source.clone()),
386                owner_id: *session.current_role_id(),
387            });
388            sources.push((item_id, source));
389            // These operations must be executed after the source is added to the catalog.
390            ops.extend(reference_ops);
391        }
392
393        Ok(CreateSourceInner {
394            ops,
395            sources,
396            if_not_exists_ids,
397        })
398    }
399
400    /// Subsources are planned differently from other statements because they
401    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
402    /// Because of this, we have usually "missed" the opportunity to plan them
403    /// through the normal statement execution life cycle (the exception being
404    /// during bootstrapping).
405    ///
406    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
407    pub(crate) fn plan_subsource(
408        &self,
409        session: &Session,
410        params: &mz_sql::plan::Params,
411        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
412        item_id: CatalogItemId,
413        global_id: GlobalId,
414    ) -> Result<CreateSourcePlanBundle, AdapterError> {
415        let catalog = self.catalog().for_session(session);
416        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
417
418        let plan = self.plan_statement(
419            session,
420            Statement::CreateSubsource(subsource_stmt),
421            params,
422            &resolved_ids,
423        )?;
424        let plan = match plan {
425            Plan::CreateSource(plan) => plan,
426            _ => unreachable!(),
427        };
428        Ok(CreateSourcePlanBundle {
429            item_id,
430            global_id,
431            plan,
432            resolved_ids,
433            available_source_references: None,
434        })
435    }
436
437    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
438    pub(crate) async fn plan_purified_alter_source_add_subsource(
439        &mut self,
440        session: &Session,
441        params: Params,
442        source_name: ResolvedItemName,
443        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
444        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
445    ) -> Result<(Plan, ResolvedIds), AdapterError> {
446        let mut subsource_plans = Vec::with_capacity(subsources.len());
447
448        // Generate subsource statements
449        let conn_catalog = self.catalog().for_system_session();
450        let pcx = plan::PlanContext::zero();
451        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
452
453        let entry = self.catalog().get_entry(source_name.item_id());
454        let source = entry.source().ok_or_else(|| {
455            AdapterError::internal(
456                "plan alter source",
457                format!("expected Source found {entry:?}"),
458            )
459        })?;
460
461        let item_id = entry.id();
462        let ingestion_id = source.global_id();
463        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
464
465        let id_ts = self.get_catalog_write_ts().await;
466        let ids = self
467            .catalog_mut()
468            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
469            .await?;
470        for (subsource_stmt, (item_id, global_id)) in
471            subsource_stmts.into_iter().zip_eq(ids.into_iter())
472        {
473            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
474            subsource_plans.push(s);
475        }
476
477        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
478            subsources: subsource_plans,
479            options,
480        };
481
482        Ok((
483            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
484                item_id,
485                ingestion_id,
486                action,
487            }),
488            ResolvedIds::empty(),
489        ))
490    }
491
492    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
493    pub(crate) fn plan_purified_alter_source_refresh_references(
494        &self,
495        _session: &Session,
496        _params: Params,
497        source_name: ResolvedItemName,
498        available_source_references: plan::SourceReferences,
499    ) -> Result<(Plan, ResolvedIds), AdapterError> {
500        let entry = self.catalog().get_entry(source_name.item_id());
501        let source = entry.source().ok_or_else(|| {
502            AdapterError::internal(
503                "plan alter source",
504                format!("expected Source found {entry:?}"),
505            )
506        })?;
507        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
508            references: available_source_references,
509        };
510
511        Ok((
512            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
513                item_id: entry.id(),
514                ingestion_id: source.global_id(),
515                action,
516            }),
517            ResolvedIds::empty(),
518        ))
519    }
520
521    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
522    /// the primary source, and any ingestion export subsources (e.g. PG
523    /// tables).
524    pub(crate) async fn plan_purified_create_source(
525        &mut self,
526        ctx: &ExecuteContext,
527        params: Params,
528        progress_stmt: CreateSubsourceStatement<Aug>,
529        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
530        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
531        available_source_references: plan::SourceReferences,
532    ) -> Result<(Plan, ResolvedIds), AdapterError> {
533        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
534
535        // 1. First plan the progress subsource.
536        //
537        // The primary source depends on this subsource because the primary
538        // source needs to know its shard ID, and the easiest way of
539        // guaranteeing that the shard ID is discoverable is to create this
540        // collection first.
541        assert_none!(progress_stmt.of_source);
542        let id_ts = self.get_catalog_write_ts().await;
543        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
544        let progress_plan =
545            self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
546        let progress_full_name = self
547            .catalog()
548            .resolve_full_name(&progress_plan.plan.name, None);
549        let progress_subsource = ResolvedItemName::Item {
550            id: progress_plan.item_id,
551            qualifiers: progress_plan.plan.name.qualifiers.clone(),
552            full_name: progress_full_name,
553            print_id: true,
554            version: RelationVersionSelector::Latest,
555        };
556
557        create_source_plans.push(progress_plan);
558
559        source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
560
561        let catalog = self.catalog().for_session(ctx.session());
562        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
563
564        let propagated_with_options: Vec<_> = source_stmt
565            .with_options
566            .iter()
567            .filter_map(|opt| match opt.name {
568                CreateSourceOptionName::TimestampInterval => None,
569                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
570                    name: CreateSubsourceOptionName::RetainHistory,
571                    value: opt.value.clone(),
572                }),
573            })
574            .collect();
575
576        // 2. Then plan the main source.
577        let source_plan = match self.plan_statement(
578            ctx.session(),
579            Statement::CreateSource(source_stmt),
580            &params,
581            &resolved_ids,
582        )? {
583            Plan::CreateSource(plan) => plan,
584            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
585        };
586
587        let id_ts = self.get_catalog_write_ts().await;
588        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
589
590        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
591        let of_source = ResolvedItemName::Item {
592            id: item_id,
593            qualifiers: source_plan.name.qualifiers.clone(),
594            full_name: source_full_name,
595            print_id: true,
596            version: RelationVersionSelector::Latest,
597        };
598
599        // Generate subsource statements
600        let conn_catalog = self.catalog().for_system_session();
601        let pcx = plan::PlanContext::zero();
602        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
603
604        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
605
606        for subsource_stmt in subsource_stmts.iter_mut() {
607            subsource_stmt
608                .with_options
609                .extend(propagated_with_options.iter().cloned())
610        }
611
612        create_source_plans.push(CreateSourcePlanBundle {
613            item_id,
614            global_id,
615            plan: source_plan,
616            resolved_ids: resolved_ids.clone(),
617            available_source_references: Some(available_source_references),
618        });
619
620        // 3. Finally, plan all the subsources
621        let id_ts = self.get_catalog_write_ts().await;
622        let ids = self
623            .catalog_mut()
624            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
625            .await?;
626        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
627            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
628            create_source_plans.push(plan);
629        }
630
631        Ok((
632            Plan::CreateSources(create_source_plans),
633            ResolvedIds::empty(),
634        ))
635    }
636
637    #[instrument]
638    pub(super) async fn sequence_create_source(
639        &mut self,
640        ctx: &mut ExecuteContext,
641        plans: Vec<plan::CreateSourcePlanBundle>,
642    ) -> Result<ExecuteResponse, AdapterError> {
643        let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
644        let CreateSourceInner {
645            ops,
646            sources,
647            if_not_exists_ids,
648        } = self.create_source_inner(ctx.session(), plans).await?;
649
650        let transact_result = self
651            .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
652                Box::pin(async move {
653                    // TODO(roshan): This will be unnecessary when we drop support for
654                    // automatically created subsources.
655                    // To improve the performance of creating a source and many
656                    // subsources, we create all of the collections at once. If we
657                    // created each collection independently, we would reschedule
658                    // the primary source's execution for each subsources, causing
659                    // unncessary thrashing.
660                    //
661                    // Note that creating all of the collections at once should
662                    // never be semantic bearing; we should always be able to break
663                    // this apart into creating the collections sequentially.
664                    let mut collections = Vec::with_capacity(sources.len());
665
666                    for (item_id, source) in sources {
667                        let source_status_item_id =
668                            coord.catalog().resolve_builtin_storage_collection(
669                                &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
670                            );
671                        let source_status_collection_id = Some(
672                            coord
673                                .catalog()
674                                .get_entry(&source_status_item_id)
675                                .latest_global_id(),
676                        );
677
678                        let (data_source, status_collection_id) = match source.data_source {
679                            DataSourceDesc::Ingestion { desc, cluster_id } => {
680                                let desc = desc.into_inline_connection(coord.catalog().state());
681
682                                let ingestion =
683                                    IngestionDescription::new(desc, cluster_id, source.global_id);
684
685                                (
686                                    DataSource::Ingestion(ingestion),
687                                    source_status_collection_id,
688                                )
689                            }
690                            DataSourceDesc::OldSyntaxIngestion {
691                                desc,
692                                progress_subsource,
693                                data_config,
694                                details,
695                                cluster_id,
696                            } => {
697                                let desc = desc.into_inline_connection(coord.catalog().state());
698                                let data_config =
699                                    data_config.into_inline_connection(coord.catalog().state());
700                                // TODO(parkmycar): We should probably check the type here, but I'm not
701                                // sure if this will always be a Source or a Table.
702                                let progress_subsource = coord
703                                    .catalog()
704                                    .get_entry(&progress_subsource)
705                                    .latest_global_id();
706
707                                let mut ingestion =
708                                    IngestionDescription::new(desc, cluster_id, progress_subsource);
709                                let legacy_export = SourceExport {
710                                    storage_metadata: (),
711                                    data_config,
712                                    details,
713                                };
714                                ingestion
715                                    .source_exports
716                                    .insert(source.global_id, legacy_export);
717
718                                (
719                                    DataSource::Ingestion(ingestion),
720                                    source_status_collection_id,
721                                )
722                            }
723                            DataSourceDesc::IngestionExport {
724                                ingestion_id,
725                                external_reference: _,
726                                details,
727                                data_config,
728                            } => {
729                                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
730                                // this will always be a Source or a Table.
731                                let ingestion_id =
732                                    coord.catalog().get_entry(&ingestion_id).latest_global_id();
733                                (
734                                    DataSource::IngestionExport {
735                                        ingestion_id,
736                                        details,
737                                        data_config: data_config
738                                            .into_inline_connection(coord.catalog().state()),
739                                    },
740                                    source_status_collection_id,
741                                )
742                            }
743                            DataSourceDesc::Progress => (DataSource::Progress, None),
744                            DataSourceDesc::Webhook { .. } => {
745                                if let Some(url) =
746                                    coord.catalog().state().try_get_webhook_url(&item_id)
747                                {
748                                    if let Some(ctx) = ctx.as_ref() {
749                                        ctx.session()
750                                            .add_notice(AdapterNotice::WebhookSourceCreated { url })
751                                    }
752                                }
753
754                                (DataSource::Webhook, None)
755                            }
756                            DataSourceDesc::Introspection(_) => {
757                                unreachable!(
758                                    "cannot create sources with introspection data sources"
759                                )
760                            }
761                        };
762
763                        collections.push((
764                            source.global_id,
765                            CollectionDescription::<Timestamp> {
766                                desc: source.desc.clone(),
767                                data_source,
768                                timeline: Some(source.timeline),
769                                since: None,
770                                status_collection_id,
771                            },
772                        ));
773                    }
774
775                    let storage_metadata = coord.catalog.state().storage_metadata();
776
777                    coord
778                        .controller
779                        .storage
780                        .create_collections(storage_metadata, None, collections)
781                        .await
782                        .unwrap_or_terminate("cannot fail to create collections");
783
784                    // It is _very_ important that we only initialize read policies
785                    // after we have created all the sources/collections. Some of
786                    // the sources created in this collection might have
787                    // dependencies on other sources, so the controller must get a
788                    // chance to install read holds before we set a policy that
789                    // might make the since advance.
790                    //
791                    // One instance of this is the remap shard: it presents as a
792                    // SUBSOURCE, and all other SUBSOURCES of a SOURCE will depend
793                    // on it. Both subsources and sources will show up as a `Source`
794                    // in the above.
795                    // Although there should only be one parent source that sequence_create_source is
796                    // ever called with, hedge our bets a bit and collect the compaction windows for
797                    // each id in the bundle (these should all be identical). This is some extra work
798                    // but seems safer.
799                    let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
800                    for (compaction_window, storage_policies) in read_policies {
801                        coord
802                            .initialize_storage_read_policies(storage_policies, compaction_window)
803                            .await;
804                    }
805                })
806            })
807            .await;
808
809        match transact_result {
810            Ok(()) => Ok(ExecuteResponse::CreatedSource),
811            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
812                kind:
813                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
814            })) if if_not_exists_ids.contains_key(&id) => {
815                ctx.session()
816                    .add_notice(AdapterNotice::ObjectAlreadyExists {
817                        name: if_not_exists_ids[&id].item.clone(),
818                        ty: "source",
819                    });
820                Ok(ExecuteResponse::CreatedSource)
821            }
822            Err(err) => Err(err),
823        }
824    }
825
826    #[instrument]
827    pub(super) async fn sequence_create_connection(
828        &mut self,
829        mut ctx: ExecuteContext,
830        plan: plan::CreateConnectionPlan,
831        resolved_ids: ResolvedIds,
832    ) {
833        let id_ts = self.get_catalog_write_ts().await;
834        let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
835        {
836            Ok(item_id) => item_id,
837            Err(err) => return ctx.retire(Err(err.into())),
838        };
839
840        match &plan.connection.details {
841            ConnectionDetails::Ssh { key_1, key_2, .. } => {
842                let key_1 = match key_1.as_key_pair() {
843                    Some(key_1) => key_1.clone(),
844                    None => {
845                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
846                            "the PUBLIC KEY 1 option cannot be explicitly specified"
847                        ))));
848                    }
849                };
850
851                let key_2 = match key_2.as_key_pair() {
852                    Some(key_2) => key_2.clone(),
853                    None => {
854                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
855                            "the PUBLIC KEY 2 option cannot be explicitly specified"
856                        ))));
857                    }
858                };
859
860                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
861                let secret = key_set.to_bytes();
862                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
863                    return ctx.retire(Err(err.into()));
864                }
865            }
866            _ => (),
867        };
868
869        if plan.validate {
870            let internal_cmd_tx = self.internal_cmd_tx.clone();
871            let transient_revision = self.catalog().transient_revision();
872            let conn_id = ctx.session().conn_id().clone();
873            let otel_ctx = OpenTelemetryContext::obtain();
874            let role_metadata = ctx.session().role_metadata().clone();
875
876            let connection = plan
877                .connection
878                .details
879                .to_connection()
880                .into_inline_connection(self.catalog().state());
881
882            let current_storage_parameters = self.controller.storage.config().clone();
883            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
884                let result = match connection
885                    .validate(connection_id, &current_storage_parameters)
886                    .await
887                {
888                    Ok(()) => Ok(plan),
889                    Err(err) => Err(err.into()),
890                };
891
892                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
893                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
894                    CreateConnectionValidationReady {
895                        ctx,
896                        result,
897                        connection_id,
898                        connection_gid,
899                        plan_validity: PlanValidity::new(
900                            transient_revision,
901                            resolved_ids.items().copied().collect(),
902                            None,
903                            None,
904                            role_metadata,
905                        ),
906                        otel_ctx,
907                        resolved_ids: resolved_ids.clone(),
908                    },
909                ));
910                if let Err(e) = result {
911                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
912                }
913            });
914        } else {
915            let result = self
916                .sequence_create_connection_stage_finish(
917                    &mut ctx,
918                    connection_id,
919                    connection_gid,
920                    plan,
921                    resolved_ids,
922                )
923                .await;
924            ctx.retire(result);
925        }
926    }
927
928    #[instrument]
929    pub(crate) async fn sequence_create_connection_stage_finish(
930        &mut self,
931        ctx: &mut ExecuteContext,
932        connection_id: CatalogItemId,
933        connection_gid: GlobalId,
934        plan: plan::CreateConnectionPlan,
935        resolved_ids: ResolvedIds,
936    ) -> Result<ExecuteResponse, AdapterError> {
937        let ops = vec![catalog::Op::CreateItem {
938            id: connection_id,
939            name: plan.name.clone(),
940            item: CatalogItem::Connection(Connection {
941                create_sql: plan.connection.create_sql,
942                global_id: connection_gid,
943                details: plan.connection.details.clone(),
944                resolved_ids,
945            }),
946            owner_id: *ctx.session().current_role_id(),
947        }];
948
949        let transact_result = self
950            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
951                Box::pin(async move {
952                    match plan.connection.details {
953                        ConnectionDetails::AwsPrivatelink(ref privatelink) => {
954                            let spec = VpcEndpointConfig {
955                                aws_service_name: privatelink.service_name.to_owned(),
956                                availability_zone_ids: privatelink.availability_zones.to_owned(),
957                            };
958                            let cloud_resource_controller =
959                                match coord.cloud_resource_controller.as_ref().cloned() {
960                                    Some(controller) => controller,
961                                    None => {
962                                        tracing::warn!("AWS PrivateLink connections unsupported");
963                                        return;
964                                    }
965                                };
966                            if let Err(err) = cloud_resource_controller
967                                .ensure_vpc_endpoint(connection_id, spec)
968                                .await
969                            {
970                                tracing::warn!(?err, "failed to ensure vpc endpoint!");
971                            }
972                        }
973                        _ => {}
974                    }
975                })
976            })
977            .await;
978
979        match transact_result {
980            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
981            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
982                kind:
983                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
984            })) if plan.if_not_exists => {
985                ctx.session()
986                    .add_notice(AdapterNotice::ObjectAlreadyExists {
987                        name: plan.name.item,
988                        ty: "connection",
989                    });
990                Ok(ExecuteResponse::CreatedConnection)
991            }
992            Err(err) => Err(err),
993        }
994    }
995
996    #[instrument]
997    pub(super) async fn sequence_create_database(
998        &mut self,
999        session: &mut Session,
1000        plan: plan::CreateDatabasePlan,
1001    ) -> Result<ExecuteResponse, AdapterError> {
1002        let ops = vec![catalog::Op::CreateDatabase {
1003            name: plan.name.clone(),
1004            owner_id: *session.current_role_id(),
1005        }];
1006        match self.catalog_transact(Some(session), ops).await {
1007            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
1008            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1009                kind:
1010                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
1011            })) if plan.if_not_exists => {
1012                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
1013                Ok(ExecuteResponse::CreatedDatabase)
1014            }
1015            Err(err) => Err(err),
1016        }
1017    }
1018
1019    #[instrument]
1020    pub(super) async fn sequence_create_schema(
1021        &mut self,
1022        session: &mut Session,
1023        plan: plan::CreateSchemaPlan,
1024    ) -> Result<ExecuteResponse, AdapterError> {
1025        let op = catalog::Op::CreateSchema {
1026            database_id: plan.database_spec,
1027            schema_name: plan.schema_name.clone(),
1028            owner_id: *session.current_role_id(),
1029        };
1030        match self.catalog_transact(Some(session), vec![op]).await {
1031            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1032            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1033                kind:
1034                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1035            })) if plan.if_not_exists => {
1036                session.add_notice(AdapterNotice::SchemaAlreadyExists {
1037                    name: plan.schema_name,
1038                });
1039                Ok(ExecuteResponse::CreatedSchema)
1040            }
1041            Err(err) => Err(err),
1042        }
1043    }
1044
1045    /// Validates the role attributes for a `CREATE ROLE` statement.
1046    fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
1047        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1048            if attributes.superuser.is_some()
1049                || attributes.password.is_some()
1050                || attributes.login.is_some()
1051            {
1052                return Err(AdapterError::UnavailableFeature {
1053                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1054                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1055                });
1056            }
1057        }
1058        Ok(())
1059    }
1060
1061    #[instrument]
1062    pub(super) async fn sequence_create_role(
1063        &mut self,
1064        conn_id: Option<&ConnectionId>,
1065        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1066    ) -> Result<ExecuteResponse, AdapterError> {
1067        self.validate_role_attributes(&attributes.clone())?;
1068        let op = catalog::Op::CreateRole { name, attributes };
1069        self.catalog_transact_conn(conn_id, vec![op])
1070            .await
1071            .map(|_| ExecuteResponse::CreatedRole)
1072    }
1073
1074    #[instrument]
1075    pub(super) async fn sequence_create_network_policy(
1076        &mut self,
1077        session: &Session,
1078        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1079    ) -> Result<ExecuteResponse, AdapterError> {
1080        let op = catalog::Op::CreateNetworkPolicy {
1081            rules,
1082            name,
1083            owner_id: *session.current_role_id(),
1084        };
1085        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1086            .await
1087            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1088    }
1089
1090    #[instrument]
1091    pub(super) async fn sequence_alter_network_policy(
1092        &mut self,
1093        session: &Session,
1094        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1095    ) -> Result<ExecuteResponse, AdapterError> {
1096        // TODO(network_policy): Consider role based network policies here.
1097        let current_network_policy_name = self
1098            .owned_catalog()
1099            .system_config()
1100            .default_network_policy_name();
1101        // Check if the way we're alerting the policy is still valid for the current connection.
1102        if current_network_policy_name == name {
1103            self.validate_alter_network_policy(session, &rules)?;
1104        }
1105
1106        let op = catalog::Op::AlterNetworkPolicy {
1107            id,
1108            rules,
1109            name,
1110            owner_id: *session.current_role_id(),
1111        };
1112        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1113            .await
1114            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1115    }
1116
1117    #[instrument]
1118    pub(super) async fn sequence_create_table(
1119        &mut self,
1120        ctx: &mut ExecuteContext,
1121        plan: plan::CreateTablePlan,
1122        resolved_ids: ResolvedIds,
1123    ) -> Result<ExecuteResponse, AdapterError> {
1124        let plan::CreateTablePlan {
1125            name,
1126            table,
1127            if_not_exists,
1128        } = plan;
1129
1130        let conn_id = if table.temporary {
1131            Some(ctx.session().conn_id())
1132        } else {
1133            None
1134        };
1135        let id_ts = self.get_catalog_write_ts().await;
1136        let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1137        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1138
1139        let data_source = match table.data_source {
1140            plan::TableDataSource::TableWrites { defaults } => {
1141                TableDataSource::TableWrites { defaults }
1142            }
1143            plan::TableDataSource::DataSource {
1144                desc: data_source_plan,
1145                timeline,
1146            } => match data_source_plan {
1147                plan::DataSourceDesc::IngestionExport {
1148                    ingestion_id,
1149                    external_reference,
1150                    details,
1151                    data_config,
1152                } => TableDataSource::DataSource {
1153                    desc: DataSourceDesc::IngestionExport {
1154                        ingestion_id,
1155                        external_reference,
1156                        details,
1157                        data_config,
1158                    },
1159                    timeline,
1160                },
1161                plan::DataSourceDesc::Webhook {
1162                    validate_using,
1163                    body_format,
1164                    headers,
1165                    cluster_id,
1166                } => TableDataSource::DataSource {
1167                    desc: DataSourceDesc::Webhook {
1168                        validate_using,
1169                        body_format,
1170                        headers,
1171                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1172                    },
1173                    timeline,
1174                },
1175                o => {
1176                    unreachable!("CREATE TABLE data source got {:?}", o)
1177                }
1178            },
1179        };
1180        let table = Table {
1181            create_sql: Some(table.create_sql),
1182            desc: table.desc,
1183            collections,
1184            conn_id: conn_id.cloned(),
1185            resolved_ids,
1186            custom_logical_compaction_window: table.compaction_window,
1187            is_retained_metrics_object: false,
1188            data_source,
1189        };
1190        let ops = vec![catalog::Op::CreateItem {
1191            id: table_id,
1192            name: name.clone(),
1193            item: CatalogItem::Table(table.clone()),
1194            owner_id: *ctx.session().current_role_id(),
1195        }];
1196
1197        let catalog_result = self
1198            .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1199                Box::pin(async move {
1200                    // The table data_source determines whether this table will be written to
1201                    // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1202                    // (e.g. a source-fed table).
1203                    let (collections, register_ts, read_policies) = match table.data_source {
1204                        TableDataSource::TableWrites { defaults: _ } => {
1205                            // Determine the initial validity for the table.
1206                            let register_ts = coord.get_local_write_ts().await.timestamp;
1207
1208                            // After acquiring `register_ts` but before using it, we need to
1209                            // be sure we're still the leader. Otherwise a new generation
1210                            // may also be trying to use `register_ts` for a different
1211                            // purpose.
1212                            //
1213                            // See database-issues#8273.
1214                            coord
1215                                .catalog
1216                                .confirm_leadership()
1217                                .await
1218                                .unwrap_or_terminate("unable to confirm leadership");
1219
1220                            if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1221                                coord.set_statement_execution_timestamp(id, register_ts);
1222                            }
1223
1224                            // When initially creating a table it should only have a single version.
1225                            let relation_version = RelationVersion::root();
1226                            assert_eq!(table.desc.latest_version(), relation_version);
1227                            let relation_desc = table
1228                                .desc
1229                                .at_version(RelationVersionSelector::Specific(relation_version));
1230                            // We assert above we have a single version, and thus we are the primary.
1231                            let collection_desc =
1232                                CollectionDescription::for_table(relation_desc, None);
1233                            let collections = vec![(global_id, collection_desc)];
1234
1235                            let compaction_window = table
1236                                .custom_logical_compaction_window
1237                                .unwrap_or(CompactionWindow::Default);
1238                            let read_policies =
1239                                BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1240
1241                            (collections, Some(register_ts), read_policies)
1242                        }
1243                        TableDataSource::DataSource {
1244                            desc: data_source,
1245                            timeline,
1246                        } => {
1247                            match data_source {
1248                                DataSourceDesc::IngestionExport {
1249                                    ingestion_id,
1250                                    external_reference: _,
1251                                    details,
1252                                    data_config,
1253                                } => {
1254                                    // TODO: It's a little weird that a table will be present in this
1255                                    // source status collection, we might want to split out into a separate
1256                                    // status collection.
1257                                    let source_status_item_id =
1258                                        coord.catalog().resolve_builtin_storage_collection(
1259                                            &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1260                                        );
1261                                    let status_collection_id = Some(
1262                                        coord
1263                                            .catalog()
1264                                            .get_entry(&source_status_item_id)
1265                                            .latest_global_id(),
1266                                    );
1267                                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1268                                    // this will always be a Source or a Table.
1269                                    let ingestion_id =
1270                                        coord.catalog().get_entry(&ingestion_id).latest_global_id();
1271                                    // Create the underlying collection with the latest schema from the Table.
1272                                    let collection_desc = CollectionDescription::<Timestamp> {
1273                                        desc: table
1274                                            .desc
1275                                            .at_version(RelationVersionSelector::Latest),
1276                                        data_source: DataSource::IngestionExport {
1277                                            ingestion_id,
1278                                            details,
1279                                            data_config: data_config
1280                                                .into_inline_connection(coord.catalog.state()),
1281                                        },
1282                                        since: None,
1283                                        status_collection_id,
1284                                        timeline: Some(timeline.clone()),
1285                                    };
1286
1287                                    let collections = vec![(global_id, collection_desc)];
1288                                    let read_policies = coord
1289                                        .catalog()
1290                                        .state()
1291                                        .source_compaction_windows(vec![table_id]);
1292
1293                                    (collections, None, read_policies)
1294                                }
1295                                DataSourceDesc::Webhook { .. } => {
1296                                    if let Some(url) =
1297                                        coord.catalog().state().try_get_webhook_url(&table_id)
1298                                    {
1299                                        if let Some(ctx) = ctx.as_ref() {
1300                                            ctx.session().add_notice(
1301                                                AdapterNotice::WebhookSourceCreated { url },
1302                                            )
1303                                        }
1304                                    }
1305
1306                                    // Create the underlying collection with the latest schema from the Table.
1307                                    assert_eq!(
1308                                        table.desc.latest_version(),
1309                                        RelationVersion::root(),
1310                                        "found webhook with more than 1 relation version, {:?}",
1311                                        table.desc
1312                                    );
1313                                    let desc = table.desc.latest();
1314
1315                                    let collection_desc = CollectionDescription {
1316                                        desc,
1317                                        data_source: DataSource::Webhook,
1318                                        since: None,
1319                                        status_collection_id: None,
1320                                        timeline: Some(timeline.clone()),
1321                                    };
1322                                    let collections = vec![(global_id, collection_desc)];
1323                                    let read_policies = coord
1324                                        .catalog()
1325                                        .state()
1326                                        .source_compaction_windows(vec![table_id]);
1327
1328                                    (collections, None, read_policies)
1329                                }
1330                                _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1331                            }
1332                        }
1333                    };
1334
1335                    // Create the collections.
1336                    let storage_metadata = coord.catalog.state().storage_metadata();
1337                    coord
1338                        .controller
1339                        .storage
1340                        .create_collections(storage_metadata, register_ts, collections)
1341                        .await
1342                        .unwrap_or_terminate("cannot fail to create collections");
1343
1344                    // Mark the register timestamp as completed.
1345                    if let Some(register_ts) = register_ts {
1346                        coord.apply_local_write(register_ts).await;
1347                    }
1348
1349                    // Initialize the Read Policies.
1350                    for (compaction_window, storage_policies) in read_policies {
1351                        coord
1352                            .initialize_storage_read_policies(storage_policies, compaction_window)
1353                            .await;
1354                    }
1355                })
1356            })
1357            .await;
1358
1359        match catalog_result {
1360            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1361            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1362                kind:
1363                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1364            })) if if_not_exists => {
1365                ctx.session_mut()
1366                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1367                        name: name.item,
1368                        ty: "table",
1369                    });
1370                Ok(ExecuteResponse::CreatedTable)
1371            }
1372            Err(err) => Err(err),
1373        }
1374    }
1375
1376    #[instrument]
1377    pub(super) async fn sequence_create_sink(
1378        &mut self,
1379        ctx: ExecuteContext,
1380        plan: plan::CreateSinkPlan,
1381        resolved_ids: ResolvedIds,
1382    ) {
1383        let plan::CreateSinkPlan {
1384            name,
1385            sink,
1386            with_snapshot,
1387            if_not_exists,
1388            in_cluster,
1389        } = plan;
1390
1391        // First try to allocate an ID and an OID. If either fails, we're done.
1392        let id_ts = self.get_catalog_write_ts().await;
1393        let (item_id, global_id) =
1394            return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1395
1396        let catalog_sink = Sink {
1397            create_sql: sink.create_sql,
1398            global_id,
1399            from: sink.from,
1400            connection: sink.connection,
1401            envelope: sink.envelope,
1402            version: sink.version,
1403            with_snapshot,
1404            resolved_ids,
1405            cluster_id: in_cluster,
1406        };
1407
1408        let ops = vec![catalog::Op::CreateItem {
1409            id: item_id,
1410            name: name.clone(),
1411            item: CatalogItem::Sink(catalog_sink.clone()),
1412            owner_id: *ctx.session().current_role_id(),
1413        }];
1414
1415        let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1416        if let Err(e) = self
1417            .controller
1418            .storage
1419            .check_exists(sink.from)
1420            .map_err(|e| match e {
1421                StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1422                    "{} is a {}, which cannot be exported as a sink",
1423                    from.name().item.clone(),
1424                    from.item().typ().to_string(),
1425                )),
1426                e => AdapterError::Storage(e),
1427            })
1428        {
1429            ctx.retire(Err(e));
1430            return;
1431        }
1432
1433        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1434
1435        match result {
1436            Ok(()) => {}
1437            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1438                kind:
1439                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1440            })) if if_not_exists => {
1441                ctx.session()
1442                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1443                        name: name.item,
1444                        ty: "sink",
1445                    });
1446                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1447                return;
1448            }
1449            Err(e) => {
1450                ctx.retire(Err(e));
1451                return;
1452            }
1453        };
1454
1455        self.create_storage_export(global_id, &catalog_sink)
1456            .await
1457            .unwrap_or_terminate("cannot fail to create exports");
1458
1459        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1460            .await;
1461
1462        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1463    }
1464
1465    /// Validates that a view definition does not contain any expressions that may lead to
1466    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1467    ///
1468    /// We prevent these expressions so that we can add columns to system tables without
1469    /// changing the definition of the view.
1470    ///
1471    /// Here is a bit of a hand wavy proof as to why we only need to check the
1472    /// immediate view definition for system objects and ambiguous column
1473    /// references, and not the entire dependency tree:
1474    ///
1475    ///   - A view with no object references cannot have any ambiguous column
1476    ///   references to a system object, because it has no system objects.
1477    ///   - A view with a direct reference to a system object and a * or
1478    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1479    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1480    ///   any ambiguous column references to a system object, because all column
1481    ///   references are explicitly named.
1482    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1483    ///   system object cannot have any ambiguous column references to a system
1484    ///   object, because there are no system objects in the top level view and
1485    ///   all sub-views are guaranteed to have no ambiguous column references to
1486    ///   system objects.
1487    pub(super) fn validate_system_column_references(
1488        &self,
1489        uses_ambiguous_columns: bool,
1490        depends_on: &BTreeSet<GlobalId>,
1491    ) -> Result<(), AdapterError> {
1492        if uses_ambiguous_columns
1493            && depends_on
1494                .iter()
1495                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1496        {
1497            Err(AdapterError::AmbiguousSystemColumnReference)
1498        } else {
1499            Ok(())
1500        }
1501    }
1502
1503    #[instrument]
1504    pub(super) async fn sequence_create_type(
1505        &mut self,
1506        session: &Session,
1507        plan: plan::CreateTypePlan,
1508        resolved_ids: ResolvedIds,
1509    ) -> Result<ExecuteResponse, AdapterError> {
1510        let id_ts = self.get_catalog_write_ts().await;
1511        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1512        let typ = Type {
1513            create_sql: Some(plan.typ.create_sql),
1514            global_id,
1515            desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1516            details: CatalogTypeDetails {
1517                array_id: None,
1518                typ: plan.typ.inner,
1519                pg_metadata: None,
1520            },
1521            resolved_ids,
1522        };
1523        let op = catalog::Op::CreateItem {
1524            id: item_id,
1525            name: plan.name,
1526            item: CatalogItem::Type(typ),
1527            owner_id: *session.current_role_id(),
1528        };
1529        match self.catalog_transact(Some(session), vec![op]).await {
1530            Ok(()) => Ok(ExecuteResponse::CreatedType),
1531            Err(err) => Err(err),
1532        }
1533    }
1534
1535    #[instrument]
1536    pub(super) async fn sequence_comment_on(
1537        &mut self,
1538        session: &Session,
1539        plan: plan::CommentPlan,
1540    ) -> Result<ExecuteResponse, AdapterError> {
1541        let op = catalog::Op::Comment {
1542            object_id: plan.object_id,
1543            sub_component: plan.sub_component,
1544            comment: plan.comment,
1545        };
1546        self.catalog_transact(Some(session), vec![op]).await?;
1547        Ok(ExecuteResponse::Comment)
1548    }
1549
1550    #[instrument]
1551    pub(super) async fn sequence_drop_objects(
1552        &mut self,
1553        session: &Session,
1554        plan::DropObjectsPlan {
1555            drop_ids,
1556            object_type,
1557            referenced_ids,
1558        }: plan::DropObjectsPlan,
1559    ) -> Result<ExecuteResponse, AdapterError> {
1560        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1561        let mut objects = Vec::new();
1562        for obj_id in &drop_ids {
1563            if !referenced_ids_hashset.contains(obj_id) {
1564                let object_info = ErrorMessageObjectDescription::from_id(
1565                    obj_id,
1566                    &self.catalog().for_session(session),
1567                )
1568                .to_string();
1569                objects.push(object_info);
1570            }
1571        }
1572
1573        if !objects.is_empty() {
1574            session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1575        }
1576
1577        let DropOps {
1578            ops,
1579            dropped_active_db,
1580            dropped_active_cluster,
1581            dropped_in_use_indexes,
1582        } = self.sequence_drop_common(session, drop_ids)?;
1583
1584        self.catalog_transact(Some(session), ops).await?;
1585
1586        fail::fail_point!("after_sequencer_drop_replica");
1587
1588        if dropped_active_db {
1589            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1590                name: session.vars().database().to_string(),
1591            });
1592        }
1593        if dropped_active_cluster {
1594            session.add_notice(AdapterNotice::DroppedActiveCluster {
1595                name: session.vars().cluster().to_string(),
1596            });
1597        }
1598        for dropped_in_use_index in dropped_in_use_indexes {
1599            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1600            self.metrics
1601                .optimization_notices
1602                .with_label_values(&["DroppedInUseIndex"])
1603                .inc_by(1);
1604        }
1605        Ok(ExecuteResponse::DroppedObject(object_type))
1606    }
1607
1608    fn validate_dropped_role_ownership(
1609        &self,
1610        session: &Session,
1611        dropped_roles: &BTreeMap<RoleId, &str>,
1612    ) -> Result<(), AdapterError> {
1613        fn privilege_check(
1614            privileges: &PrivilegeMap,
1615            dropped_roles: &BTreeMap<RoleId, &str>,
1616            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1617            object_id: &SystemObjectId,
1618            catalog: &ConnCatalog,
1619        ) {
1620            for privilege in privileges.all_values() {
1621                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1622                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1623                    let object_description =
1624                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1625                    dependent_objects
1626                        .entry(role_name.to_string())
1627                        .or_default()
1628                        .push(format!(
1629                            "privileges on {object_description} granted by {grantor_name}",
1630                        ));
1631                }
1632                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1633                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1634                    let object_description =
1635                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1636                    dependent_objects
1637                        .entry(role_name.to_string())
1638                        .or_default()
1639                        .push(format!(
1640                            "privileges granted on {object_description} to {grantee_name}"
1641                        ));
1642                }
1643            }
1644        }
1645
1646        let catalog = self.catalog().for_session(session);
1647        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1648        for entry in self.catalog.entries() {
1649            let id = SystemObjectId::Object(entry.id().into());
1650            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1651                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1652                dependent_objects
1653                    .entry(role_name.to_string())
1654                    .or_default()
1655                    .push(format!("owner of {object_description}"));
1656            }
1657            privilege_check(
1658                entry.privileges(),
1659                dropped_roles,
1660                &mut dependent_objects,
1661                &id,
1662                &catalog,
1663            );
1664        }
1665        for database in self.catalog.databases() {
1666            let database_id = SystemObjectId::Object(database.id().into());
1667            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1668                let object_description =
1669                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1670                dependent_objects
1671                    .entry(role_name.to_string())
1672                    .or_default()
1673                    .push(format!("owner of {object_description}"));
1674            }
1675            privilege_check(
1676                &database.privileges,
1677                dropped_roles,
1678                &mut dependent_objects,
1679                &database_id,
1680                &catalog,
1681            );
1682            for schema in database.schemas_by_id.values() {
1683                let schema_id = SystemObjectId::Object(
1684                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1685                );
1686                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1687                    let object_description =
1688                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1689                    dependent_objects
1690                        .entry(role_name.to_string())
1691                        .or_default()
1692                        .push(format!("owner of {object_description}"));
1693                }
1694                privilege_check(
1695                    &schema.privileges,
1696                    dropped_roles,
1697                    &mut dependent_objects,
1698                    &schema_id,
1699                    &catalog,
1700                );
1701            }
1702        }
1703        for cluster in self.catalog.clusters() {
1704            let cluster_id = SystemObjectId::Object(cluster.id().into());
1705            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1706                let object_description =
1707                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1708                dependent_objects
1709                    .entry(role_name.to_string())
1710                    .or_default()
1711                    .push(format!("owner of {object_description}"));
1712            }
1713            privilege_check(
1714                &cluster.privileges,
1715                dropped_roles,
1716                &mut dependent_objects,
1717                &cluster_id,
1718                &catalog,
1719            );
1720            for replica in cluster.replicas() {
1721                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1722                    let replica_id =
1723                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1724                    let object_description =
1725                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1726                    dependent_objects
1727                        .entry(role_name.to_string())
1728                        .or_default()
1729                        .push(format!("owner of {object_description}"));
1730                }
1731            }
1732        }
1733        privilege_check(
1734            self.catalog().system_privileges(),
1735            dropped_roles,
1736            &mut dependent_objects,
1737            &SystemObjectId::System,
1738            &catalog,
1739        );
1740        for (default_privilege_object, default_privilege_acl_items) in
1741            self.catalog.default_privileges()
1742        {
1743            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1744                dependent_objects
1745                    .entry(role_name.to_string())
1746                    .or_default()
1747                    .push(format!(
1748                        "default privileges on {}S created by {}",
1749                        default_privilege_object.object_type, role_name
1750                    ));
1751            }
1752            for default_privilege_acl_item in default_privilege_acl_items {
1753                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1754                    dependent_objects
1755                        .entry(role_name.to_string())
1756                        .or_default()
1757                        .push(format!(
1758                            "default privileges on {}S granted to {}",
1759                            default_privilege_object.object_type, role_name
1760                        ));
1761                }
1762            }
1763        }
1764
1765        if !dependent_objects.is_empty() {
1766            Err(AdapterError::DependentObject(dependent_objects))
1767        } else {
1768            Ok(())
1769        }
1770    }
1771
1772    #[instrument]
1773    pub(super) async fn sequence_drop_owned(
1774        &mut self,
1775        session: &Session,
1776        plan: plan::DropOwnedPlan,
1777    ) -> Result<ExecuteResponse, AdapterError> {
1778        for role_id in &plan.role_ids {
1779            self.catalog().ensure_not_reserved_role(role_id)?;
1780        }
1781
1782        let mut privilege_revokes = plan.privilege_revokes;
1783
1784        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1785        let session_catalog = self.catalog().for_session(session);
1786        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1787            && !session.is_superuser()
1788        {
1789            // Obtain all roles that the current session is a member of.
1790            let role_membership =
1791                session_catalog.collect_role_membership(session.current_role_id());
1792            let invalid_revokes: BTreeSet<_> = privilege_revokes
1793                .extract_if(.., |(_, privilege)| {
1794                    !role_membership.contains(&privilege.grantor)
1795                })
1796                .map(|(object_id, _)| object_id)
1797                .collect();
1798            for invalid_revoke in invalid_revokes {
1799                let object_description =
1800                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1801                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1802            }
1803        }
1804
1805        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1806            catalog::Op::UpdatePrivilege {
1807                target_id: object_id,
1808                privilege,
1809                variant: UpdatePrivilegeVariant::Revoke,
1810            }
1811        });
1812        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1813            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1814                privilege_object,
1815                privilege_acl_item,
1816                variant: UpdatePrivilegeVariant::Revoke,
1817            },
1818        );
1819        let DropOps {
1820            ops: drop_ops,
1821            dropped_active_db,
1822            dropped_active_cluster,
1823            dropped_in_use_indexes,
1824        } = self.sequence_drop_common(session, plan.drop_ids)?;
1825
1826        let ops = privilege_revoke_ops
1827            .chain(default_privilege_revoke_ops)
1828            .chain(drop_ops.into_iter())
1829            .collect();
1830
1831        self.catalog_transact(Some(session), ops).await?;
1832
1833        if dropped_active_db {
1834            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1835                name: session.vars().database().to_string(),
1836            });
1837        }
1838        if dropped_active_cluster {
1839            session.add_notice(AdapterNotice::DroppedActiveCluster {
1840                name: session.vars().cluster().to_string(),
1841            });
1842        }
1843        for dropped_in_use_index in dropped_in_use_indexes {
1844            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1845        }
1846        Ok(ExecuteResponse::DroppedOwned)
1847    }
1848
1849    fn sequence_drop_common(
1850        &self,
1851        session: &Session,
1852        ids: Vec<ObjectId>,
1853    ) -> Result<DropOps, AdapterError> {
1854        let mut dropped_active_db = false;
1855        let mut dropped_active_cluster = false;
1856        let mut dropped_in_use_indexes = Vec::new();
1857        let mut dropped_roles = BTreeMap::new();
1858        let mut dropped_databases = BTreeSet::new();
1859        let mut dropped_schemas = BTreeSet::new();
1860        // Dropping either the group role or the member role of a role membership will trigger a
1861        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1862        // role membership twice.
1863        let mut role_revokes = BTreeSet::new();
1864        // Dropping a database or a schema will revoke all default roles associated with that
1865        // database or schema.
1866        let mut default_privilege_revokes = BTreeSet::new();
1867
1868        // Clusters we're dropping
1869        let mut clusters_to_drop = BTreeSet::new();
1870
1871        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1872        for id in &ids {
1873            match id {
1874                ObjectId::Database(id) => {
1875                    let name = self.catalog().get_database(id).name();
1876                    if name == session.vars().database() {
1877                        dropped_active_db = true;
1878                    }
1879                    dropped_databases.insert(id);
1880                }
1881                ObjectId::Schema((_, spec)) => {
1882                    if let SchemaSpecifier::Id(id) = spec {
1883                        dropped_schemas.insert(id);
1884                    }
1885                }
1886                ObjectId::Cluster(id) => {
1887                    clusters_to_drop.insert(*id);
1888                    if let Some(active_id) = self
1889                        .catalog()
1890                        .active_cluster(session)
1891                        .ok()
1892                        .map(|cluster| cluster.id())
1893                    {
1894                        if id == &active_id {
1895                            dropped_active_cluster = true;
1896                        }
1897                    }
1898                }
1899                ObjectId::Role(id) => {
1900                    let role = self.catalog().get_role(id);
1901                    let name = role.name();
1902                    dropped_roles.insert(*id, name);
1903                    // We must revoke all role memberships that the dropped roles belongs to.
1904                    for (group_id, grantor_id) in &role.membership.map {
1905                        role_revokes.insert((*group_id, *id, *grantor_id));
1906                    }
1907                }
1908                ObjectId::Item(id) => {
1909                    if let Some(index) = self.catalog().get_entry(id).index() {
1910                        let humanizer = self.catalog().for_session(session);
1911                        let dependants = self
1912                            .controller
1913                            .compute
1914                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1915                            .ok()
1916                            .into_iter()
1917                            .flatten()
1918                            .filter(|dependant_id| {
1919                                // Transient Ids belong to Peeks. We are not interested for now in
1920                                // peeks depending on a dropped index.
1921                                // TODO: show a different notice in this case. Something like
1922                                // "There is an in-progress ad hoc SELECT that uses the dropped
1923                                // index. The resources used by the index will be freed when all
1924                                // such SELECTs complete."
1925                                if dependant_id.is_transient() {
1926                                    return false;
1927                                }
1928                                // The item should exist, but don't panic if it doesn't.
1929                                let Some(dependent_id) = humanizer
1930                                    .try_get_item_by_global_id(dependant_id)
1931                                    .map(|item| item.id())
1932                                else {
1933                                    return false;
1934                                };
1935                                // If the dependent object is also being dropped, then there is no
1936                                // problem, so we don't want a notice.
1937                                !ids_set.contains(&ObjectId::Item(dependent_id))
1938                            })
1939                            .flat_map(|dependant_id| {
1940                                // If we are not able to find a name for this ID it probably means
1941                                // we have already dropped the compute collection, in which case we
1942                                // can ignore it.
1943                                humanizer.humanize_id(dependant_id)
1944                            })
1945                            .collect_vec();
1946                        if !dependants.is_empty() {
1947                            dropped_in_use_indexes.push(DroppedInUseIndex {
1948                                index_name: humanizer
1949                                    .humanize_id(index.global_id())
1950                                    .unwrap_or_else(|| id.to_string()),
1951                                dependant_objects: dependants,
1952                            });
1953                        }
1954                    }
1955                }
1956                _ => {}
1957            }
1958        }
1959
1960        for id in &ids {
1961            match id {
1962                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1963                // unless they are internal replicas, which exist outside the scope
1964                // of managed clusters.
1965                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1966                    if !clusters_to_drop.contains(cluster_id) {
1967                        let cluster = self.catalog.get_cluster(*cluster_id);
1968                        if cluster.is_managed() {
1969                            let replica =
1970                                cluster.replica(*replica_id).expect("Catalog out of sync");
1971                            if !replica.config.location.internal() {
1972                                coord_bail!("cannot drop replica of managed cluster");
1973                            }
1974                        }
1975                    }
1976                }
1977                _ => {}
1978            }
1979        }
1980
1981        for role_id in dropped_roles.keys() {
1982            self.catalog().ensure_not_reserved_role(role_id)?;
1983        }
1984        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1985        // If any role is a member of a dropped role, then we must revoke that membership.
1986        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1987        for role in self.catalog().user_roles() {
1988            for dropped_role_id in
1989                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1990            {
1991                role_revokes.insert((
1992                    **dropped_role_id,
1993                    role.id(),
1994                    *role
1995                        .membership
1996                        .map
1997                        .get(*dropped_role_id)
1998                        .expect("included in keys above"),
1999                ));
2000            }
2001        }
2002
2003        for (default_privilege_object, default_privilege_acls) in
2004            self.catalog().default_privileges()
2005        {
2006            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
2007                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
2008            {
2009                for default_privilege_acl in default_privilege_acls {
2010                    default_privilege_revokes.insert((
2011                        default_privilege_object.clone(),
2012                        default_privilege_acl.clone(),
2013                    ));
2014                }
2015            }
2016        }
2017
2018        let ops = role_revokes
2019            .into_iter()
2020            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2021                role_id,
2022                member_id,
2023                grantor_id,
2024            })
2025            .chain(default_privilege_revokes.into_iter().map(
2026                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2027                    privilege_object,
2028                    privilege_acl_item,
2029                    variant: UpdatePrivilegeVariant::Revoke,
2030                },
2031            ))
2032            .chain(iter::once(catalog::Op::DropObjects(
2033                ids.into_iter()
2034                    .map(DropObjectInfo::manual_drop_from_object_id)
2035                    .collect(),
2036            )))
2037            .collect();
2038
2039        Ok(DropOps {
2040            ops,
2041            dropped_active_db,
2042            dropped_active_cluster,
2043            dropped_in_use_indexes,
2044        })
2045    }
2046
2047    pub(super) fn sequence_explain_schema(
2048        &self,
2049        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2050    ) -> Result<ExecuteResponse, AdapterError> {
2051        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2052            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2053        })?;
2054
2055        let json_string = json_string(&json_value);
2056        let row = Row::pack_slice(&[Datum::String(&json_string)]);
2057        Ok(Self::send_immediate_rows(row))
2058    }
2059
2060    pub(super) fn sequence_show_all_variables(
2061        &self,
2062        session: &Session,
2063    ) -> Result<ExecuteResponse, AdapterError> {
2064        let mut rows = viewable_variables(self.catalog().state(), session)
2065            .map(|v| (v.name(), v.value(), v.description()))
2066            .collect::<Vec<_>>();
2067        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2068
2069        // TODO(parkmycar): Pack all of these into a single RowCollection.
2070        let rows: Vec<_> = rows
2071            .into_iter()
2072            .map(|(name, val, desc)| {
2073                Row::pack_slice(&[
2074                    Datum::String(name),
2075                    Datum::String(&val),
2076                    Datum::String(desc),
2077                ])
2078            })
2079            .collect();
2080        Ok(Self::send_immediate_rows(rows))
2081    }
2082
2083    pub(super) fn sequence_show_variable(
2084        &self,
2085        session: &Session,
2086        plan: plan::ShowVariablePlan,
2087    ) -> Result<ExecuteResponse, AdapterError> {
2088        if &plan.name == SCHEMA_ALIAS {
2089            let schemas = self.catalog.resolve_search_path(session);
2090            let schema = schemas.first();
2091            return match schema {
2092                Some((database_spec, schema_spec)) => {
2093                    let schema_name = &self
2094                        .catalog
2095                        .get_schema(database_spec, schema_spec, session.conn_id())
2096                        .name()
2097                        .schema;
2098                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
2099                    Ok(Self::send_immediate_rows(row))
2100                }
2101                None => {
2102                    if session.vars().current_object_missing_warnings() {
2103                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2104                            search_path: session
2105                                .vars()
2106                                .search_path()
2107                                .into_iter()
2108                                .map(|schema| schema.to_string())
2109                                .collect(),
2110                        });
2111                    }
2112                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2113                }
2114            };
2115        }
2116
2117        let variable = session
2118            .vars()
2119            .get(self.catalog().system_config(), &plan.name)
2120            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2121
2122        // In lieu of plumbing the user to all system config functions, just check that the var is
2123        // visible.
2124        variable.visible(session.user(), self.catalog().system_config())?;
2125
2126        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2127        if variable.name() == vars::DATABASE.name()
2128            && matches!(
2129                self.catalog().resolve_database(&variable.value()),
2130                Err(CatalogError::UnknownDatabase(_))
2131            )
2132            && session.vars().current_object_missing_warnings()
2133        {
2134            let name = variable.value();
2135            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2136        } else if variable.name() == vars::CLUSTER.name()
2137            && matches!(
2138                self.catalog().resolve_cluster(&variable.value()),
2139                Err(CatalogError::UnknownCluster(_))
2140            )
2141            && session.vars().current_object_missing_warnings()
2142        {
2143            let name = variable.value();
2144            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2145        }
2146        Ok(Self::send_immediate_rows(row))
2147    }
2148
2149    #[instrument]
2150    pub(super) async fn sequence_inspect_shard(
2151        &self,
2152        session: &Session,
2153        plan: plan::InspectShardPlan,
2154    ) -> Result<ExecuteResponse, AdapterError> {
2155        // TODO: Not thrilled about this rbac special case here, but probably
2156        // sufficient for now.
2157        if !session.user().is_internal() {
2158            return Err(AdapterError::Unauthorized(
2159                rbac::UnauthorizedError::MzSystem {
2160                    action: "inspect".into(),
2161                },
2162            ));
2163        }
2164        let state = self
2165            .controller
2166            .storage
2167            .inspect_persist_state(plan.id)
2168            .await?;
2169        let jsonb = Jsonb::from_serde_json(state)?;
2170        Ok(Self::send_immediate_rows(jsonb.into_row()))
2171    }
2172
2173    #[instrument]
2174    pub(super) fn sequence_set_variable(
2175        &self,
2176        session: &mut Session,
2177        plan: plan::SetVariablePlan,
2178    ) -> Result<ExecuteResponse, AdapterError> {
2179        let (name, local) = (plan.name, plan.local);
2180        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2181            self.validate_set_isolation_level(session)?;
2182        }
2183        if &name == vars::CLUSTER.name() {
2184            self.validate_set_cluster(session)?;
2185        }
2186
2187        let vars = session.vars_mut();
2188        let values = match plan.value {
2189            plan::VariableValue::Default => None,
2190            plan::VariableValue::Values(values) => Some(values),
2191        };
2192
2193        match values {
2194            Some(values) => {
2195                vars.set(
2196                    self.catalog().system_config(),
2197                    &name,
2198                    VarInput::SqlSet(&values),
2199                    local,
2200                )?;
2201
2202                let vars = session.vars();
2203
2204                // Emit a warning when deprecated variables are used.
2205                // TODO(database-issues#8069) remove this after sufficient time has passed
2206                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2207                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2208                } else if name == vars::CLUSTER.name()
2209                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2210                {
2211                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2212                }
2213
2214                // Database or cluster value does not correspond to a catalog item.
2215                if name.as_str() == vars::DATABASE.name()
2216                    && matches!(
2217                        self.catalog().resolve_database(vars.database()),
2218                        Err(CatalogError::UnknownDatabase(_))
2219                    )
2220                    && session.vars().current_object_missing_warnings()
2221                {
2222                    let name = vars.database().to_string();
2223                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2224                } else if name.as_str() == vars::CLUSTER.name()
2225                    && matches!(
2226                        self.catalog().resolve_cluster(vars.cluster()),
2227                        Err(CatalogError::UnknownCluster(_))
2228                    )
2229                    && session.vars().current_object_missing_warnings()
2230                {
2231                    let name = vars.cluster().to_string();
2232                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2233                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2234                    let v = values.into_first().to_lowercase();
2235                    if v == IsolationLevel::ReadUncommitted.as_str()
2236                        || v == IsolationLevel::ReadCommitted.as_str()
2237                        || v == IsolationLevel::RepeatableRead.as_str()
2238                    {
2239                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2240                            isolation_level: v,
2241                        });
2242                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2243                        session.add_notice(AdapterNotice::StrongSessionSerializable);
2244                    }
2245                }
2246            }
2247            None => vars.reset(self.catalog().system_config(), &name, local)?,
2248        }
2249
2250        Ok(ExecuteResponse::SetVariable { name, reset: false })
2251    }
2252
2253    pub(super) fn sequence_reset_variable(
2254        &self,
2255        session: &mut Session,
2256        plan: plan::ResetVariablePlan,
2257    ) -> Result<ExecuteResponse, AdapterError> {
2258        let name = plan.name;
2259        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2260            self.validate_set_isolation_level(session)?;
2261        }
2262        if &name == vars::CLUSTER.name() {
2263            self.validate_set_cluster(session)?;
2264        }
2265        session
2266            .vars_mut()
2267            .reset(self.catalog().system_config(), &name, false)?;
2268        Ok(ExecuteResponse::SetVariable { name, reset: true })
2269    }
2270
2271    pub(super) fn sequence_set_transaction(
2272        &self,
2273        session: &mut Session,
2274        plan: plan::SetTransactionPlan,
2275    ) -> Result<ExecuteResponse, AdapterError> {
2276        // TODO(jkosh44) Only supports isolation levels for now.
2277        for mode in plan.modes {
2278            match mode {
2279                TransactionMode::AccessMode(_) => {
2280                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2281                }
2282                TransactionMode::IsolationLevel(isolation_level) => {
2283                    self.validate_set_isolation_level(session)?;
2284
2285                    session.vars_mut().set(
2286                        self.catalog().system_config(),
2287                        TRANSACTION_ISOLATION_VAR_NAME,
2288                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2289                        plan.local,
2290                    )?
2291                }
2292            }
2293        }
2294        Ok(ExecuteResponse::SetVariable {
2295            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2296            reset: false,
2297        })
2298    }
2299
2300    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2301        if session.transaction().contains_ops() {
2302            Err(AdapterError::InvalidSetIsolationLevel)
2303        } else {
2304            Ok(())
2305        }
2306    }
2307
2308    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2309        if session.transaction().contains_ops() {
2310            Err(AdapterError::InvalidSetCluster)
2311        } else {
2312            Ok(())
2313        }
2314    }
2315
2316    #[instrument]
2317    pub(super) async fn sequence_end_transaction(
2318        &mut self,
2319        mut ctx: ExecuteContext,
2320        mut action: EndTransactionAction,
2321    ) {
2322        // If the transaction has failed, we can only rollback.
2323        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2324            (&action, ctx.session().transaction())
2325        {
2326            action = EndTransactionAction::Rollback;
2327        }
2328        let response = match action {
2329            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2330                params: BTreeMap::new(),
2331            }),
2332            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2333                params: BTreeMap::new(),
2334            }),
2335        };
2336
2337        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2338
2339        let (response, action) = match result {
2340            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2341                (response, action)
2342            }
2343            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2344                // Make sure we have the correct set of write locks for this transaction.
2345                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2346                // transactions.
2347                let validated_locks = match write_lock_guards {
2348                    None => None,
2349                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2350                        Ok(locks) => Some(locks),
2351                        Err(missing) => {
2352                            tracing::error!(?missing, "programming error, missing write locks");
2353                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2354                        }
2355                    },
2356                };
2357
2358                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2359                for WriteOp { id, rows } in writes {
2360                    let total_rows = collected_writes.entry(id).or_default();
2361                    total_rows.push(rows);
2362                }
2363
2364                self.submit_write(PendingWriteTxn::User {
2365                    span: Span::current(),
2366                    writes: collected_writes,
2367                    write_locks: validated_locks,
2368                    pending_txn: PendingTxn {
2369                        ctx,
2370                        response,
2371                        action,
2372                    },
2373                });
2374                return;
2375            }
2376            Ok((
2377                Some(TransactionOps::Peeks {
2378                    determination,
2379                    requires_linearization: RequireLinearization::Required,
2380                    ..
2381                }),
2382                _,
2383            )) if ctx.session().vars().transaction_isolation()
2384                == &IsolationLevel::StrictSerializable =>
2385            {
2386                let conn_id = ctx.session().conn_id().clone();
2387                let pending_read_txn = PendingReadTxn {
2388                    txn: PendingRead::Read {
2389                        txn: PendingTxn {
2390                            ctx,
2391                            response,
2392                            action,
2393                        },
2394                    },
2395                    timestamp_context: determination.timestamp_context,
2396                    created: Instant::now(),
2397                    num_requeues: 0,
2398                    otel_ctx: OpenTelemetryContext::obtain(),
2399                };
2400                self.strict_serializable_reads_tx
2401                    .send((conn_id, pending_read_txn))
2402                    .expect("sending to strict_serializable_reads_tx cannot fail");
2403                return;
2404            }
2405            Ok((
2406                Some(TransactionOps::Peeks {
2407                    determination,
2408                    requires_linearization: RequireLinearization::Required,
2409                    ..
2410                }),
2411                _,
2412            )) if ctx.session().vars().transaction_isolation()
2413                == &IsolationLevel::StrongSessionSerializable =>
2414            {
2415                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2416                    ctx.session_mut()
2417                        .ensure_timestamp_oracle(timeline.clone())
2418                        .apply_write(*ts);
2419                }
2420                (response, action)
2421            }
2422            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2423                self.internal_cmd_tx
2424                    .send(Message::ExecuteSingleStatementTransaction {
2425                        ctx,
2426                        otel_ctx: OpenTelemetryContext::obtain(),
2427                        stmt,
2428                        params,
2429                    })
2430                    .expect("must send");
2431                return;
2432            }
2433            Ok((_, _)) => (response, action),
2434            Err(err) => (Err(err), EndTransactionAction::Rollback),
2435        };
2436        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2437        // Append any parameters that changed to the response.
2438        let response = response.map(|mut r| {
2439            r.extend_params(changed);
2440            ExecuteResponse::from(r)
2441        });
2442
2443        ctx.retire(response);
2444    }
2445
2446    #[instrument]
2447    async fn sequence_end_transaction_inner(
2448        &mut self,
2449        ctx: &mut ExecuteContext,
2450        action: EndTransactionAction,
2451    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2452        let txn = self.clear_transaction(ctx.session_mut()).await;
2453
2454        if let EndTransactionAction::Commit = action {
2455            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2456                match &mut ops {
2457                    TransactionOps::Writes(writes) => {
2458                        for WriteOp { id, .. } in &mut writes.iter() {
2459                            // Re-verify this id exists.
2460                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2461                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2462                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2463                                        CatalogError::UnknownItem(id.to_string()),
2464                                    ),
2465                                })
2466                            })?;
2467                        }
2468
2469                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2470                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2471                    }
2472                    TransactionOps::DDL {
2473                        ops,
2474                        state: _,
2475                        side_effects,
2476                        revision,
2477                    } => {
2478                        // Make sure our catalog hasn't changed.
2479                        if *revision != self.catalog().transient_revision() {
2480                            return Err(AdapterError::DDLTransactionRace);
2481                        }
2482                        // Commit all of our queued ops.
2483                        let ops = std::mem::take(ops);
2484                        let side_effects = std::mem::take(side_effects);
2485                        self.catalog_transact_with_side_effects(
2486                            Some(ctx),
2487                            ops,
2488                            move |a, mut ctx| {
2489                                Box::pin(async move {
2490                                    for side_effect in side_effects {
2491                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2492                                    }
2493                                })
2494                            },
2495                        )
2496                        .await?;
2497                    }
2498                    _ => (),
2499                }
2500                return Ok((Some(ops), write_lock_guards));
2501            }
2502        }
2503
2504        Ok((None, None))
2505    }
2506
2507    pub(super) async fn sequence_side_effecting_func(
2508        &mut self,
2509        ctx: ExecuteContext,
2510        plan: SideEffectingFunc,
2511    ) {
2512        match plan {
2513            SideEffectingFunc::PgCancelBackend { connection_id } => {
2514                if ctx.session().conn_id().unhandled() == connection_id {
2515                    // As a special case, if we're canceling ourselves, we send
2516                    // back a canceled resposne to the client issuing the query,
2517                    // and so we need to do no further processing of the cancel.
2518                    ctx.retire(Err(AdapterError::Canceled));
2519                    return;
2520                }
2521
2522                let res = if let Some((id_handle, _conn_meta)) =
2523                    self.active_conns.get_key_value(&connection_id)
2524                {
2525                    // check_plan already verified role membership.
2526                    self.handle_privileged_cancel(id_handle.clone()).await;
2527                    Datum::True
2528                } else {
2529                    Datum::False
2530                };
2531                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2532            }
2533        }
2534    }
2535
2536    /// Checks to see if the session needs a real time recency timestamp and if so returns
2537    /// a future that will return the timestamp.
2538    pub(super) async fn determine_real_time_recent_timestamp(
2539        &self,
2540        session: &Session,
2541        source_ids: impl Iterator<Item = CatalogItemId>,
2542    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2543    {
2544        let vars = session.vars();
2545
2546        // Ideally this logic belongs inside of
2547        // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2548        // logic in there would make it extremely difficult and inconvenient to pull the waiting off
2549        // of the main coord thread.
2550        let r = if vars.real_time_recency()
2551            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2552            && !session.contains_read_timestamp()
2553        {
2554            // Find all dependencies transitively because we need to ensure that
2555            // RTR queries determine the timestamp from the sources' (i.e.
2556            // storage objects that ingest data from external systems) remap
2557            // data. We "cheat" a little bit and filter out any IDs that aren't
2558            // user objects because we know they are not a RTR source.
2559            let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2560            // If none of the sources are user objects, we don't need to provide
2561            // a RTR timestamp.
2562            if to_visit.is_empty() {
2563                return Ok(None);
2564            }
2565
2566            let mut timestamp_objects = BTreeSet::new();
2567
2568            while let Some(id) = to_visit.pop_front() {
2569                timestamp_objects.insert(id);
2570                to_visit.extend(
2571                    self.catalog()
2572                        .get_entry(&id)
2573                        .uses()
2574                        .into_iter()
2575                        .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2576                );
2577            }
2578            let timestamp_objects = timestamp_objects
2579                .into_iter()
2580                .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2581                .collect();
2582
2583            let r = self
2584                .controller
2585                .determine_real_time_recent_timestamp(
2586                    timestamp_objects,
2587                    *vars.real_time_recency_timeout(),
2588                )
2589                .await?;
2590
2591            Some(r)
2592        } else {
2593            None
2594        };
2595
2596        Ok(r)
2597    }
2598
2599    #[instrument]
2600    pub(super) async fn sequence_explain_plan(
2601        &mut self,
2602        ctx: ExecuteContext,
2603        plan: plan::ExplainPlanPlan,
2604        target_cluster: TargetCluster,
2605    ) {
2606        match &plan.explainee {
2607            plan::Explainee::Statement(stmt) => match stmt {
2608                plan::ExplaineeStatement::CreateView { .. } => {
2609                    self.explain_create_view(ctx, plan).await;
2610                }
2611                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2612                    self.explain_create_materialized_view(ctx, plan).await;
2613                }
2614                plan::ExplaineeStatement::CreateIndex { .. } => {
2615                    self.explain_create_index(ctx, plan).await;
2616                }
2617                plan::ExplaineeStatement::Select { .. } => {
2618                    self.explain_peek(ctx, plan, target_cluster).await;
2619                }
2620            },
2621            plan::Explainee::View(_) => {
2622                let result = self.explain_view(&ctx, plan);
2623                ctx.retire(result);
2624            }
2625            plan::Explainee::MaterializedView(_) => {
2626                let result = self.explain_materialized_view(&ctx, plan);
2627                ctx.retire(result);
2628            }
2629            plan::Explainee::Index(_) => {
2630                let result = self.explain_index(&ctx, plan);
2631                ctx.retire(result);
2632            }
2633            plan::Explainee::ReplanView(_) => {
2634                self.explain_replan_view(ctx, plan).await;
2635            }
2636            plan::Explainee::ReplanMaterializedView(_) => {
2637                self.explain_replan_materialized_view(ctx, plan).await;
2638            }
2639            plan::Explainee::ReplanIndex(_) => {
2640                self.explain_replan_index(ctx, plan).await;
2641            }
2642        };
2643    }
2644
2645    pub(super) async fn sequence_explain_pushdown(
2646        &mut self,
2647        ctx: ExecuteContext,
2648        plan: plan::ExplainPushdownPlan,
2649        target_cluster: TargetCluster,
2650    ) {
2651        match plan.explainee {
2652            Explainee::Statement(ExplaineeStatement::Select {
2653                broken: false,
2654                plan,
2655                desc: _,
2656            }) => {
2657                let stage = return_if_err!(
2658                    self.peek_validate(
2659                        ctx.session(),
2660                        plan,
2661                        target_cluster,
2662                        None,
2663                        ExplainContext::Pushdown,
2664                        Some(ctx.session().vars().max_query_result_size()),
2665                    ),
2666                    ctx
2667                );
2668                self.sequence_staged(ctx, Span::current(), stage).await;
2669            }
2670            Explainee::MaterializedView(item_id) => {
2671                self.explain_pushdown_materialized_view(ctx, item_id).await;
2672            }
2673            _ => {
2674                ctx.retire(Err(AdapterError::Unsupported(
2675                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2676                )));
2677            }
2678        };
2679    }
2680
2681    async fn render_explain_pushdown(
2682        &self,
2683        ctx: ExecuteContext,
2684        as_of: Antichain<Timestamp>,
2685        mz_now: ResultSpec<'static>,
2686        read_holds: Option<ReadHolds<Timestamp>>,
2687        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2688    ) {
2689        let fut = self
2690            .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2691            .await;
2692        task::spawn(|| "render explain pushdown", async move {
2693            // Transfer the necessary read holds over to the background task
2694            let _read_holds = read_holds;
2695            let res = fut.await;
2696            ctx.retire(res);
2697        });
2698    }
2699
2700    async fn render_explain_pushdown_prepare<
2701        I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2702    >(
2703        &self,
2704        session: &Session,
2705        as_of: Antichain<Timestamp>,
2706        mz_now: ResultSpec<'static>,
2707        imports: I,
2708    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2709        let explain_timeout = *session.vars().statement_timeout();
2710        let mut futures = FuturesOrdered::new();
2711        for (id, mfp) in imports {
2712            let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2713            let full_name = self
2714                .catalog
2715                .for_session(session)
2716                .resolve_full_name(&catalog_entry.name);
2717            let name = format!("{}", full_name);
2718            let relation_desc = catalog_entry
2719                .desc_opt()
2720                .expect("source should have a proper desc")
2721                .into_owned();
2722            let stats_future = self
2723                .controller
2724                .storage_collections
2725                .snapshot_parts_stats(id, as_of.clone())
2726                .await;
2727
2728            let mz_now = mz_now.clone();
2729            // These futures may block if the source is not yet readable at the as-of;
2730            // stash them in `futures` and only block on them in a separate task.
2731            futures.push_back(async move {
2732                let snapshot_stats = match stats_future.await {
2733                    Ok(stats) => stats,
2734                    Err(e) => return Err(e),
2735                };
2736                let mut total_bytes = 0;
2737                let mut total_parts = 0;
2738                let mut selected_bytes = 0;
2739                let mut selected_parts = 0;
2740                for SnapshotPartStats {
2741                    encoded_size_bytes: bytes,
2742                    stats,
2743                } in &snapshot_stats.parts
2744                {
2745                    let bytes = u64::cast_from(*bytes);
2746                    total_bytes += bytes;
2747                    total_parts += 1u64;
2748                    let selected = match stats {
2749                        None => true,
2750                        Some(stats) => {
2751                            let stats = stats.decode();
2752                            let stats = RelationPartStats::new(
2753                                name.as_str(),
2754                                &snapshot_stats.metrics.pushdown.part_stats,
2755                                &relation_desc,
2756                                &stats,
2757                            );
2758                            stats.may_match_mfp(mz_now.clone(), &mfp)
2759                        }
2760                    };
2761
2762                    if selected {
2763                        selected_bytes += bytes;
2764                        selected_parts += 1u64;
2765                    }
2766                }
2767                Ok(Row::pack_slice(&[
2768                    name.as_str().into(),
2769                    total_bytes.into(),
2770                    selected_bytes.into(),
2771                    total_parts.into(),
2772                    selected_parts.into(),
2773                ]))
2774            });
2775        }
2776
2777        let fut = async move {
2778            match tokio::time::timeout(
2779                explain_timeout,
2780                futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2781            )
2782            .await
2783            {
2784                Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2785                    rows: Box::new(rows.into_row_iter()),
2786                }),
2787                Ok(Err(err)) => Err(err.into()),
2788                Err(_) => Err(AdapterError::StatementTimeout),
2789            }
2790        };
2791        fut
2792    }
2793
2794    #[instrument]
2795    pub(super) async fn sequence_insert(
2796        &mut self,
2797        mut ctx: ExecuteContext,
2798        plan: plan::InsertPlan,
2799    ) {
2800        // Normally, this would get checked when trying to add "write ops" to
2801        // the transaction but we go down diverging paths below, based on
2802        // whether the INSERT is only constant values or not.
2803        //
2804        // For the non-constant case we sequence an implicit read-then-write,
2805        // which messes with the transaction ops and would allow an implicit
2806        // read-then-write to sneak into a read-only transaction.
2807        if !ctx.session_mut().transaction().allows_writes() {
2808            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2809            return;
2810        }
2811
2812        // The structure of this code originates from a time where
2813        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2814        // optimized `MirRelationExpr`.
2815        //
2816        // Ideally, we would like to make the `selection.as_const().is_some()`
2817        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2818        // are planned as a Wrap($n, $vals) call, so until we can reduce
2819        // HirRelationExpr this will always returns false.
2820        //
2821        // Unfortunately, hitting the default path of the match below also
2822        // causes a lot of tests to fail, so we opted to go with the extra
2823        // `plan.values.clone()` statements when producing the `optimized_mir`
2824        // and re-optimize the values in the `sequence_read_then_write` call.
2825        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2826            // We don't perform any optimizations on an expression that is already
2827            // a constant for writes, as we want to maximize bulk-insert throughput.
2828            let expr = return_if_err!(
2829                plan.values
2830                    .clone()
2831                    .lower(self.catalog().system_config(), None),
2832                ctx
2833            );
2834            OptimizedMirRelationExpr(expr)
2835        } else {
2836            // Collect optimizer parameters.
2837            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2838
2839            // (`optimize::view::Optimizer` has a special case for constant queries.)
2840            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2841
2842            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2843            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2844        };
2845
2846        match optimized_mir.into_inner() {
2847            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2848                let catalog = self.owned_catalog();
2849                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2850                    let result =
2851                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2852                    ctx.retire(result);
2853                });
2854            }
2855            // All non-constant values must be planned as read-then-writes.
2856            _ => {
2857                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2858                    Some(table) => {
2859                        let fullname = self
2860                            .catalog()
2861                            .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2862                        // Inserts always occur at the latest version of the table.
2863                        table
2864                            .desc_latest(&fullname)
2865                            .expect("desc called on table")
2866                            .arity()
2867                    }
2868                    None => {
2869                        ctx.retire(Err(AdapterError::Catalog(
2870                            mz_catalog::memory::error::Error {
2871                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2872                                    CatalogError::UnknownItem(plan.id.to_string()),
2873                                ),
2874                            },
2875                        )));
2876                        return;
2877                    }
2878                };
2879
2880                let finishing = RowSetFinishing {
2881                    order_by: vec![],
2882                    limit: None,
2883                    offset: 0,
2884                    project: (0..desc_arity).collect(),
2885                };
2886
2887                let read_then_write_plan = plan::ReadThenWritePlan {
2888                    id: plan.id,
2889                    selection: plan.values,
2890                    finishing,
2891                    assignments: BTreeMap::new(),
2892                    kind: MutationKind::Insert,
2893                    returning: plan.returning,
2894                };
2895
2896                self.sequence_read_then_write(ctx, read_then_write_plan)
2897                    .await;
2898            }
2899        }
2900    }
2901
2902    /// ReadThenWrite is a plan whose writes depend on the results of a
2903    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2904    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2905    /// serializability violation could occur.
2906    #[instrument]
2907    pub(super) async fn sequence_read_then_write(
2908        &mut self,
2909        mut ctx: ExecuteContext,
2910        plan: plan::ReadThenWritePlan,
2911    ) {
2912        let mut source_ids: BTreeSet<_> = plan
2913            .selection
2914            .depends_on()
2915            .into_iter()
2916            .map(|gid| self.catalog().resolve_item_id(&gid))
2917            .collect();
2918        source_ids.insert(plan.id);
2919
2920        // If the transaction doesn't already have write locks, acquire them.
2921        if ctx.session().transaction().write_locks().is_none() {
2922            // Pre-define all of the locks we need.
2923            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2924
2925            // Try acquiring all of our locks.
2926            for id in &source_ids {
2927                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2928                    write_locks.insert_lock(*id, lock);
2929                }
2930            }
2931
2932            // See if we acquired all of the neccessary locks.
2933            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2934                Ok(locks) => locks,
2935                Err(missing) => {
2936                    // Defer our write if we couldn't acquire all of the locks.
2937                    let role_metadata = ctx.session().role_metadata().clone();
2938                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2939                    let plan = DeferredPlan {
2940                        ctx,
2941                        plan: Plan::ReadThenWrite(plan),
2942                        validity: PlanValidity::new(
2943                            self.catalog.transient_revision(),
2944                            source_ids.clone(),
2945                            None,
2946                            None,
2947                            role_metadata,
2948                        ),
2949                        requires_locks: source_ids,
2950                    };
2951                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2952                }
2953            };
2954
2955            ctx.session_mut()
2956                .try_grant_write_locks(write_locks)
2957                .expect("session has already been granted write locks");
2958        }
2959
2960        let plan::ReadThenWritePlan {
2961            id,
2962            kind,
2963            selection,
2964            mut assignments,
2965            finishing,
2966            mut returning,
2967        } = plan;
2968
2969        // Read then writes can be queued, so re-verify the id exists.
2970        let desc = match self.catalog().try_get_entry(&id) {
2971            Some(table) => {
2972                let full_name = self
2973                    .catalog()
2974                    .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2975                // Inserts always occur at the latest version of the table.
2976                table
2977                    .desc_latest(&full_name)
2978                    .expect("desc called on table")
2979                    .into_owned()
2980            }
2981            None => {
2982                ctx.retire(Err(AdapterError::Catalog(
2983                    mz_catalog::memory::error::Error {
2984                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2985                            id.to_string(),
2986                        )),
2987                    },
2988                )));
2989                return;
2990            }
2991        };
2992
2993        // Disallow mz_now in any position because read time and write time differ.
2994        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2995            || assignments.values().any(|e| e.contains_temporal())
2996            || returning.iter().any(|e| e.contains_temporal());
2997        if contains_temporal {
2998            ctx.retire(Err(AdapterError::Unsupported(
2999                "calls to mz_now in write statements",
3000            )));
3001            return;
3002        }
3003
3004        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
3005        //
3006        // - They do not refer to any objects whose notion of time moves differently than that of
3007        // user tables. This limitation is meant to ensure no writes occur between this read and the
3008        // subsequent write.
3009        // - They do not use mz_now(), whose time produced during read will differ from the write
3010        //   timestamp.
3011        fn validate_read_dependencies(
3012            catalog: &Catalog,
3013            id: &CatalogItemId,
3014        ) -> Result<(), AdapterError> {
3015            use CatalogItemType::*;
3016            use mz_catalog::memory::objects;
3017            let mut ids_to_check = Vec::new();
3018            let valid = match catalog.try_get_entry(id) {
3019                Some(entry) => {
3020                    if let CatalogItem::View(objects::View { optimized_expr, .. })
3021                    | CatalogItem::MaterializedView(objects::MaterializedView {
3022                        optimized_expr,
3023                        ..
3024                    }) = entry.item()
3025                    {
3026                        if optimized_expr.contains_temporal() {
3027                            return Err(AdapterError::Unsupported(
3028                                "calls to mz_now in write statements",
3029                            ));
3030                        }
3031                    }
3032                    match entry.item().typ() {
3033                        typ @ (Func | View | MaterializedView | ContinualTask) => {
3034                            ids_to_check.extend(entry.uses());
3035                            let valid_id = id.is_user() || matches!(typ, Func);
3036                            valid_id
3037                        }
3038                        Source | Secret | Connection => false,
3039                        // Cannot select from sinks or indexes.
3040                        Sink | Index => unreachable!(),
3041                        Table => {
3042                            if !id.is_user() {
3043                                // We can't read from non-user tables
3044                                false
3045                            } else {
3046                                // We can't read from tables that are source-exports
3047                                entry.source_export_details().is_none()
3048                            }
3049                        }
3050                        Type => true,
3051                    }
3052                }
3053                None => false,
3054            };
3055            if !valid {
3056                return Err(AdapterError::InvalidTableMutationSelection);
3057            }
3058            for id in ids_to_check {
3059                validate_read_dependencies(catalog, &id)?;
3060            }
3061            Ok(())
3062        }
3063
3064        for gid in selection.depends_on() {
3065            let item_id = self.catalog().resolve_item_id(&gid);
3066            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3067                ctx.retire(Err(err));
3068                return;
3069            }
3070        }
3071
3072        let (peek_tx, peek_rx) = oneshot::channel();
3073        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3074        let (tx, _, session, extra) = ctx.into_parts();
3075        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
3076        // execution context, because this peek does not directly correspond to an execute,
3077        // and so we don't need to take any action on its retirement.
3078        // TODO[btv]: we might consider extending statement logging to log the inner
3079        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
3080        // and mint a new "real" execution context here. We'd also have to add some logic to
3081        // make sure such "sub-statements" are always sampled when the top-level statement is
3082        //
3083        // It's debatable whether this makes sense conceptually,
3084        // because the inner fragment here is not actually a
3085        // "statement" in its own right.
3086        let peek_ctx = ExecuteContext::from_parts(
3087            peek_client_tx,
3088            self.internal_cmd_tx.clone(),
3089            session,
3090            Default::default(),
3091        );
3092
3093        self.sequence_peek(
3094            peek_ctx,
3095            plan::SelectPlan {
3096                select: None,
3097                source: selection,
3098                when: QueryWhen::FreshestTableWrite,
3099                finishing,
3100                copy_to: None,
3101            },
3102            TargetCluster::Active,
3103            None,
3104        )
3105        .await;
3106
3107        let internal_cmd_tx = self.internal_cmd_tx.clone();
3108        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3109        let catalog = self.owned_catalog();
3110        let max_result_size = self.catalog().system_config().max_result_size();
3111
3112        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3113            let (peek_response, session) = match peek_rx.await {
3114                Ok(Response {
3115                    result: Ok(resp),
3116                    session,
3117                    otel_ctx,
3118                }) => {
3119                    otel_ctx.attach_as_parent();
3120                    (resp, session)
3121                }
3122                Ok(Response {
3123                    result: Err(e),
3124                    session,
3125                    otel_ctx,
3126                }) => {
3127                    let ctx =
3128                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3129                    otel_ctx.attach_as_parent();
3130                    ctx.retire(Err(e));
3131                    return;
3132                }
3133                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
3134                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3135            };
3136            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3137            let mut timeout_dur = *ctx.session().vars().statement_timeout();
3138
3139            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
3140            if timeout_dur == Duration::ZERO {
3141                timeout_dur = Duration::MAX;
3142            }
3143
3144            let style = ExprPrepStyle::OneShot {
3145                logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above.
3146                session: ctx.session(),
3147                catalog_state: catalog.state(),
3148            };
3149            for expr in assignments.values_mut().chain(returning.iter_mut()) {
3150                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3151            }
3152
3153            let make_diffs =
3154                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3155                    let arena = RowArena::new();
3156                    let mut diffs = Vec::new();
3157                    let mut datum_vec = mz_repr::DatumVec::new();
3158
3159                    while let Some(row) = rows.next() {
3160                        if !assignments.is_empty() {
3161                            assert!(
3162                                matches!(kind, MutationKind::Update),
3163                                "only updates support assignments"
3164                            );
3165                            let mut datums = datum_vec.borrow_with(row);
3166                            let mut updates = vec![];
3167                            for (idx, expr) in &assignments {
3168                                let updated = match expr.eval(&datums, &arena) {
3169                                    Ok(updated) => updated,
3170                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3171                                };
3172                                updates.push((*idx, updated));
3173                            }
3174                            for (idx, new_value) in updates {
3175                                datums[idx] = new_value;
3176                            }
3177                            let updated = Row::pack_slice(&datums);
3178                            diffs.push((updated, Diff::ONE));
3179                        }
3180                        match kind {
3181                            // Updates and deletes always remove the
3182                            // current row. Updates will also add an
3183                            // updated value.
3184                            MutationKind::Update | MutationKind::Delete => {
3185                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
3186                            }
3187                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3188                        }
3189                    }
3190
3191                    // Sum of all the rows' byte size, for checking if we go
3192                    // above the max_result_size threshold.
3193                    let mut byte_size: u64 = 0;
3194                    for (row, diff) in &diffs {
3195                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3196                        if diff.is_positive() {
3197                            for (idx, datum) in row.iter().enumerate() {
3198                                desc.constraints_met(idx, &datum)?;
3199                            }
3200                        }
3201                    }
3202                    Ok((diffs, byte_size))
3203                };
3204
3205            let diffs = match peek_response {
3206                ExecuteResponse::SendingRowsStreaming {
3207                    rows: mut rows_stream,
3208                    ..
3209                } => {
3210                    let mut byte_size: u64 = 0;
3211                    let mut diffs = Vec::new();
3212                    let result = loop {
3213                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3214                            Ok(Some(res)) => match res {
3215                                PeekResponseUnary::Rows(new_rows) => {
3216                                    match make_diffs(new_rows) {
3217                                        Ok((mut new_diffs, new_byte_size)) => {
3218                                            byte_size = byte_size.saturating_add(new_byte_size);
3219                                            if byte_size > max_result_size {
3220                                                break Err(AdapterError::ResultSize(format!(
3221                                                    "result exceeds max size of {max_result_size}"
3222                                                )));
3223                                            }
3224                                            diffs.append(&mut new_diffs)
3225                                        }
3226                                        Err(e) => break Err(e),
3227                                    };
3228                                }
3229                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3230                                PeekResponseUnary::Error(e) => {
3231                                    break Err(AdapterError::Unstructured(anyhow!(e)));
3232                                }
3233                            },
3234                            Ok(None) => break Ok(diffs),
3235                            Err(_) => {
3236                                // We timed out, so remove the pending peek. This is
3237                                // best-effort and doesn't guarantee we won't
3238                                // receive a response.
3239                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
3240                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3241                                    conn_id: ctx.session().conn_id().clone(),
3242                                });
3243                                if let Err(e) = result {
3244                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3245                                }
3246                                break Err(AdapterError::StatementTimeout);
3247                            }
3248                        }
3249                    };
3250
3251                    result
3252                }
3253                ExecuteResponse::SendingRowsImmediate { rows } => {
3254                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3255                }
3256                resp => Err(AdapterError::Unstructured(anyhow!(
3257                    "unexpected peek response: {resp:?}"
3258                ))),
3259            };
3260
3261            let mut returning_rows = Vec::new();
3262            let mut diff_err: Option<AdapterError> = None;
3263            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3264                let arena = RowArena::new();
3265                for (row, diff) in diffs {
3266                    if !diff.is_positive() {
3267                        continue;
3268                    }
3269                    let mut returning_row = Row::with_capacity(returning.len());
3270                    let mut packer = returning_row.packer();
3271                    for expr in &returning {
3272                        let datums: Vec<_> = row.iter().collect();
3273                        match expr.eval(&datums, &arena) {
3274                            Ok(datum) => {
3275                                packer.push(datum);
3276                            }
3277                            Err(err) => {
3278                                diff_err = Some(err.into());
3279                                break;
3280                            }
3281                        }
3282                    }
3283                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3284                    let diff = match NonZeroUsize::try_from(diff) {
3285                        Ok(diff) => diff,
3286                        Err(err) => {
3287                            diff_err = Some(err.into());
3288                            break;
3289                        }
3290                    };
3291                    returning_rows.push((returning_row, diff));
3292                    if diff_err.is_some() {
3293                        break;
3294                    }
3295                }
3296            }
3297            let diffs = if let Some(err) = diff_err {
3298                Err(err)
3299            } else {
3300                diffs
3301            };
3302
3303            // We need to clear out the timestamp context so the write doesn't fail due to a
3304            // read only transaction.
3305            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3306            // No matter what isolation level the client is using, we must linearize this
3307            // read. The write will be performed right after this, as part of a single
3308            // transaction, so the write must have a timestamp greater than or equal to the
3309            // read.
3310            //
3311            // Note: It's only OK for the write to have a greater timestamp than the read
3312            // because the write lock prevents any other writes from happening in between
3313            // the read and write.
3314            if let Some(timestamp_context) = timestamp_context {
3315                let (tx, rx) = tokio::sync::oneshot::channel();
3316                let conn_id = ctx.session().conn_id().clone();
3317                let pending_read_txn = PendingReadTxn {
3318                    txn: PendingRead::ReadThenWrite { ctx, tx },
3319                    timestamp_context,
3320                    created: Instant::now(),
3321                    num_requeues: 0,
3322                    otel_ctx: OpenTelemetryContext::obtain(),
3323                };
3324                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3325                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3326                if let Err(e) = result {
3327                    warn!(
3328                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3329                        e
3330                    );
3331                    return;
3332                }
3333                let result = rx.await;
3334                // It is not an error for these results to be ready after `tx` has been dropped.
3335                ctx = match result {
3336                    Ok(Some(ctx)) => ctx,
3337                    Ok(None) => {
3338                        // Coordinator took our context and will handle responding to the client.
3339                        // This usually indicates that our transaction was aborted.
3340                        return;
3341                    }
3342                    Err(e) => {
3343                        warn!(
3344                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3345                            e
3346                        );
3347                        return;
3348                    }
3349                };
3350            }
3351
3352            match diffs {
3353                Ok(diffs) => {
3354                    let result = Self::send_diffs(
3355                        ctx.session_mut(),
3356                        plan::SendDiffsPlan {
3357                            id,
3358                            updates: diffs,
3359                            kind,
3360                            returning: returning_rows,
3361                            max_result_size,
3362                        },
3363                    );
3364                    ctx.retire(result);
3365                }
3366                Err(e) => {
3367                    ctx.retire(Err(e));
3368                }
3369            }
3370        });
3371    }
3372
3373    #[instrument]
3374    pub(super) async fn sequence_alter_item_rename(
3375        &mut self,
3376        ctx: &mut ExecuteContext,
3377        plan: plan::AlterItemRenamePlan,
3378    ) -> Result<ExecuteResponse, AdapterError> {
3379        let op = catalog::Op::RenameItem {
3380            id: plan.id,
3381            current_full_name: plan.current_full_name,
3382            to_name: plan.to_name,
3383        };
3384        match self
3385            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3386            .await
3387        {
3388            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3389            Err(err) => Err(err),
3390        }
3391    }
3392
3393    #[instrument]
3394    pub(super) async fn sequence_alter_retain_history(
3395        &mut self,
3396        ctx: &mut ExecuteContext,
3397        plan: plan::AlterRetainHistoryPlan,
3398    ) -> Result<ExecuteResponse, AdapterError> {
3399        let ops = vec![catalog::Op::AlterRetainHistory {
3400            id: plan.id,
3401            value: plan.value,
3402            window: plan.window,
3403        }];
3404        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3405            Box::pin(async move {
3406                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3407                let cluster = match catalog_item {
3408                    CatalogItem::Table(_)
3409                    | CatalogItem::MaterializedView(_)
3410                    | CatalogItem::Source(_)
3411                    | CatalogItem::ContinualTask(_) => None,
3412                    CatalogItem::Index(index) => Some(index.cluster_id),
3413                    CatalogItem::Log(_)
3414                    | CatalogItem::View(_)
3415                    | CatalogItem::Sink(_)
3416                    | CatalogItem::Type(_)
3417                    | CatalogItem::Func(_)
3418                    | CatalogItem::Secret(_)
3419                    | CatalogItem::Connection(_) => unreachable!(),
3420                };
3421                match cluster {
3422                    Some(cluster) => {
3423                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3424                    }
3425                    None => {
3426                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3427                    }
3428                }
3429            })
3430        })
3431        .await?;
3432        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3433    }
3434
3435    #[instrument]
3436    pub(super) async fn sequence_alter_schema_rename(
3437        &mut self,
3438        ctx: &mut ExecuteContext,
3439        plan: plan::AlterSchemaRenamePlan,
3440    ) -> Result<ExecuteResponse, AdapterError> {
3441        let (database_spec, schema_spec) = plan.cur_schema_spec;
3442        let op = catalog::Op::RenameSchema {
3443            database_spec,
3444            schema_spec,
3445            new_name: plan.new_schema_name,
3446            check_reserved_names: true,
3447        };
3448        match self
3449            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3450            .await
3451        {
3452            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3453            Err(err) => Err(err),
3454        }
3455    }
3456
3457    #[instrument]
3458    pub(super) async fn sequence_alter_schema_swap(
3459        &mut self,
3460        ctx: &mut ExecuteContext,
3461        plan: plan::AlterSchemaSwapPlan,
3462    ) -> Result<ExecuteResponse, AdapterError> {
3463        let plan::AlterSchemaSwapPlan {
3464            schema_a_spec: (schema_a_db, schema_a),
3465            schema_a_name,
3466            schema_b_spec: (schema_b_db, schema_b),
3467            schema_b_name,
3468            name_temp,
3469        } = plan;
3470
3471        let op_a = catalog::Op::RenameSchema {
3472            database_spec: schema_a_db,
3473            schema_spec: schema_a,
3474            new_name: name_temp,
3475            check_reserved_names: false,
3476        };
3477        let op_b = catalog::Op::RenameSchema {
3478            database_spec: schema_b_db,
3479            schema_spec: schema_b,
3480            new_name: schema_a_name,
3481            check_reserved_names: false,
3482        };
3483        let op_c = catalog::Op::RenameSchema {
3484            database_spec: schema_a_db,
3485            schema_spec: schema_a,
3486            new_name: schema_b_name,
3487            check_reserved_names: false,
3488        };
3489
3490        match self
3491            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3492                Box::pin(async {})
3493            })
3494            .await
3495        {
3496            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3497            Err(err) => Err(err),
3498        }
3499    }
3500
3501    #[instrument]
3502    pub(super) async fn sequence_alter_role(
3503        &mut self,
3504        session: &Session,
3505        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3506    ) -> Result<ExecuteResponse, AdapterError> {
3507        let catalog = self.catalog().for_session(session);
3508        let role = catalog.get_role(&id);
3509
3510        // We'll send these notices to the user, if the operation is successful.
3511        let mut notices = vec![];
3512
3513        // Get the attributes and variables from the role, as they currently are.
3514        let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3515        let mut vars = role.vars().clone();
3516
3517        // Whether to set the password to NULL. This is a special case since the existing
3518        // password is not stored in the role attributes.
3519        let mut nopassword = false;
3520
3521        // Apply our updates.
3522        match option {
3523            PlannedAlterRoleOption::Attributes(attrs) => {
3524                self.validate_role_attributes(&attrs.clone().into())?;
3525
3526                if let Some(inherit) = attrs.inherit {
3527                    attributes.inherit = inherit;
3528                }
3529
3530                if let Some(password) = attrs.password {
3531                    attributes.password = Some(password);
3532                }
3533
3534                if let Some(superuser) = attrs.superuser {
3535                    attributes.superuser = Some(superuser);
3536                }
3537
3538                if let Some(login) = attrs.login {
3539                    attributes.login = Some(login);
3540                }
3541
3542                if attrs.nopassword.unwrap_or(false) {
3543                    nopassword = true;
3544                }
3545
3546                if let Some(notice) = self.should_emit_rbac_notice(session) {
3547                    notices.push(notice);
3548                }
3549            }
3550            PlannedAlterRoleOption::Variable(variable) => {
3551                // Get the variable to make sure it's valid and visible.
3552                let session_var = session.vars().inspect(variable.name())?;
3553                // Return early if it's not visible.
3554                session_var.visible(session.user(), catalog.system_vars())?;
3555
3556                // Emit a warning when deprecated variables are used.
3557                // TODO(database-issues#8069) remove this after sufficient time has passed
3558                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3559                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3560                } else if let PlannedRoleVariable::Set {
3561                    name,
3562                    value: VariableValue::Values(vals),
3563                } = &variable
3564                {
3565                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3566                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3567                    }
3568                }
3569
3570                let var_name = match variable {
3571                    PlannedRoleVariable::Set { name, value } => {
3572                        // Update our persisted set.
3573                        match &value {
3574                            VariableValue::Default => {
3575                                vars.remove(&name);
3576                            }
3577                            VariableValue::Values(vals) => {
3578                                let var = match &vals[..] {
3579                                    [val] => OwnedVarInput::Flat(val.clone()),
3580                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3581                                };
3582                                // Make sure the input is valid.
3583                                session_var.check(var.borrow())?;
3584
3585                                vars.insert(name.clone(), var);
3586                            }
3587                        };
3588                        name
3589                    }
3590                    PlannedRoleVariable::Reset { name } => {
3591                        // Remove it from our persisted values.
3592                        vars.remove(&name);
3593                        name
3594                    }
3595                };
3596
3597                // Emit a notice that they need to reconnect to see the change take effect.
3598                notices.push(AdapterNotice::VarDefaultUpdated {
3599                    role: Some(name.clone()),
3600                    var_name: Some(var_name),
3601                });
3602            }
3603        }
3604
3605        let op = catalog::Op::AlterRole {
3606            id,
3607            name,
3608            attributes,
3609            nopassword,
3610            vars: RoleVars { map: vars },
3611        };
3612        let response = self
3613            .catalog_transact(Some(session), vec![op])
3614            .await
3615            .map(|_| ExecuteResponse::AlteredRole)?;
3616
3617        // Send all of our queued notices.
3618        session.add_notices(notices);
3619
3620        Ok(response)
3621    }
3622
3623    #[instrument]
3624    pub(super) async fn sequence_alter_sink_prepare(
3625        &mut self,
3626        ctx: ExecuteContext,
3627        plan: plan::AlterSinkPlan,
3628    ) {
3629        // Put a read hold on the new relation
3630        let id_bundle = crate::CollectionIdBundle {
3631            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3632            compute_ids: BTreeMap::new(),
3633        };
3634        let read_hold = self.acquire_read_holds(&id_bundle);
3635
3636        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3637            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3638            return;
3639        };
3640
3641        let otel_ctx = OpenTelemetryContext::obtain();
3642        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3643
3644        let plan_validity = PlanValidity::new(
3645            self.catalog().transient_revision(),
3646            BTreeSet::from_iter([from_item_id]),
3647            Some(plan.in_cluster),
3648            None,
3649            ctx.session().role_metadata().clone(),
3650        );
3651
3652        // Re-resolve items in the altered statement
3653        // Parse statement.
3654        let create_sink_stmt = match mz_sql::parse::parse(&plan.sink.create_sql)
3655            .expect("invalid create sink sql")
3656            .into_element()
3657            .ast
3658        {
3659            Statement::CreateSink(stmt) => stmt,
3660            _ => unreachable!("invalid statment kind for sink"),
3661        };
3662        let catalog = self.catalog().for_system_session();
3663        let (_, resolved_ids) = match mz_sql::names::resolve(&catalog, create_sink_stmt) {
3664            Ok(ok) => ok,
3665            Err(e) => {
3666                ctx.retire(Err(AdapterError::internal("ALTER SINK", e)));
3667                return;
3668            }
3669        };
3670
3671        info!(
3672            "preparing alter sink for {}: frontiers={:?} export={:?}",
3673            plan.global_id,
3674            self.controller
3675                .storage_collections
3676                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3677            self.controller.storage.export(plan.global_id)
3678        );
3679
3680        // Now we must wait for the sink to make enough progress such that there is overlap between
3681        // the new `from` collection's read hold and the sink's write frontier.
3682        self.install_storage_watch_set(
3683            ctx.session().conn_id().clone(),
3684            BTreeSet::from_iter([plan.global_id]),
3685            read_ts,
3686            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3687                ctx: Some(ctx),
3688                otel_ctx,
3689                plan,
3690                plan_validity,
3691                resolved_ids,
3692                read_hold,
3693            }),
3694        );
3695    }
3696
3697    #[instrument]
3698    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3699        ctx.otel_ctx.attach_as_parent();
3700        match ctx.plan_validity.check(self.catalog()) {
3701            Ok(()) => {}
3702            Err(err) => {
3703                ctx.retire(Err(err));
3704                return;
3705            }
3706        }
3707        {
3708            let plan = &ctx.plan;
3709            info!(
3710                "finishing alter sink for {}: frontiers={:?} export={:?}",
3711                plan.global_id,
3712                self.controller
3713                    .storage_collections
3714                    .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3715                self.controller.storage.export(plan.global_id)
3716            );
3717        }
3718
3719        let plan::AlterSinkPlan {
3720            item_id,
3721            global_id,
3722            sink,
3723            with_snapshot,
3724            in_cluster,
3725        } = ctx.plan.clone();
3726        // Assert that we can recover the updates that happened at the timestamps of the write
3727        // frontier. This must be true in this call.
3728        let write_frontier = &self
3729            .controller
3730            .storage
3731            .export(global_id)
3732            .expect("sink known to exist")
3733            .write_frontier;
3734        let as_of = ctx.read_hold.least_valid_read();
3735        assert!(
3736            write_frontier.iter().all(|t| as_of.less_than(t)),
3737            "{:?} should be strictly less than {:?}",
3738            &*as_of,
3739            &**write_frontier
3740        );
3741
3742        let catalog_sink = Sink {
3743            create_sql: sink.create_sql,
3744            global_id,
3745            from: sink.from,
3746            connection: sink.connection.clone(),
3747            envelope: sink.envelope,
3748            version: sink.version,
3749            with_snapshot,
3750            resolved_ids: ctx.resolved_ids.clone(),
3751            cluster_id: in_cluster,
3752        };
3753
3754        let ops = vec![catalog::Op::UpdateItem {
3755            id: item_id,
3756            name: self.catalog.get_entry(&item_id).name().clone(),
3757            to_item: CatalogItem::Sink(catalog_sink),
3758        }];
3759
3760        match self
3761            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3762            .await
3763        {
3764            Ok(()) => {}
3765            Err(err) => {
3766                ctx.retire(Err(err));
3767                return;
3768            }
3769        }
3770
3771        let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3772        let storage_sink_desc = StorageSinkDesc {
3773            from: sink.from,
3774            from_desc: from_entry
3775                .desc_opt()
3776                .expect("sinks can only be built on items with descs")
3777                .into_owned(),
3778            connection: sink
3779                .connection
3780                .clone()
3781                .into_inline_connection(self.catalog().state()),
3782            envelope: sink.envelope,
3783            as_of,
3784            with_snapshot,
3785            version: sink.version,
3786            from_storage_metadata: (),
3787            to_storage_metadata: (),
3788        };
3789
3790        self.controller
3791            .storage
3792            .alter_export(
3793                global_id,
3794                ExportDescription {
3795                    sink: storage_sink_desc,
3796                    instance_id: in_cluster,
3797                },
3798            )
3799            .await
3800            .unwrap_or_terminate("cannot fail to alter source desc");
3801
3802        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3803    }
3804
3805    #[instrument]
3806    pub(super) async fn sequence_alter_connection(
3807        &mut self,
3808        ctx: ExecuteContext,
3809        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3810    ) {
3811        match action {
3812            AlterConnectionAction::RotateKeys => {
3813                self.sequence_rotate_keys(ctx, id).await;
3814            }
3815            AlterConnectionAction::AlterOptions {
3816                set_options,
3817                drop_options,
3818                validate,
3819            } => {
3820                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3821                    .await
3822            }
3823        }
3824    }
3825
3826    #[instrument]
3827    async fn sequence_alter_connection_options(
3828        &mut self,
3829        mut ctx: ExecuteContext,
3830        id: CatalogItemId,
3831        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3832        drop_options: BTreeSet<ConnectionOptionName>,
3833        validate: bool,
3834    ) {
3835        let cur_entry = self.catalog().get_entry(&id);
3836        let cur_conn = cur_entry.connection().expect("known to be connection");
3837        let connection_gid = cur_conn.global_id();
3838
3839        let inner = || -> Result<Connection, AdapterError> {
3840            // Parse statement.
3841            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3842                .expect("invalid create sql persisted to catalog")
3843                .into_element()
3844                .ast
3845            {
3846                Statement::CreateConnection(stmt) => stmt,
3847                _ => unreachable!("proved type is source"),
3848            };
3849
3850            let catalog = self.catalog().for_system_session();
3851
3852            // Resolve items in statement
3853            let (mut create_conn_stmt, resolved_ids) =
3854                mz_sql::names::resolve(&catalog, create_conn_stmt)
3855                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3856
3857            // Retain options that are neither set nor dropped.
3858            create_conn_stmt
3859                .values
3860                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3861
3862            // Set new values
3863            create_conn_stmt.values.extend(
3864                set_options
3865                    .into_iter()
3866                    .map(|(name, value)| ConnectionOption { name, value }),
3867            );
3868
3869            // Open a new catalog, which we will use to re-plan our
3870            // statement with the desired config.
3871            let mut catalog = self.catalog().for_system_session();
3872            catalog.mark_id_unresolvable_for_replanning(id);
3873
3874            // Re-define our source in terms of the amended statement
3875            let plan = match mz_sql::plan::plan(
3876                None,
3877                &catalog,
3878                Statement::CreateConnection(create_conn_stmt),
3879                &Params::empty(),
3880                &resolved_ids,
3881            )
3882            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3883            {
3884                Plan::CreateConnection(plan) => plan,
3885                _ => unreachable!("create source plan is only valid response"),
3886            };
3887
3888            // Parse statement.
3889            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3890                .expect("invalid create sql persisted to catalog")
3891                .into_element()
3892                .ast
3893            {
3894                Statement::CreateConnection(stmt) => stmt,
3895                _ => unreachable!("proved type is source"),
3896            };
3897
3898            let catalog = self.catalog().for_system_session();
3899
3900            // Resolve items in statement
3901            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3902                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3903
3904            Ok(Connection {
3905                create_sql: plan.connection.create_sql,
3906                global_id: cur_conn.global_id,
3907                details: plan.connection.details,
3908                resolved_ids: new_deps,
3909            })
3910        };
3911
3912        let conn = match inner() {
3913            Ok(conn) => conn,
3914            Err(e) => {
3915                return ctx.retire(Err(e));
3916            }
3917        };
3918
3919        if validate {
3920            let connection = conn
3921                .details
3922                .to_connection()
3923                .into_inline_connection(self.catalog().state());
3924
3925            let internal_cmd_tx = self.internal_cmd_tx.clone();
3926            let transient_revision = self.catalog().transient_revision();
3927            let conn_id = ctx.session().conn_id().clone();
3928            let otel_ctx = OpenTelemetryContext::obtain();
3929            let role_metadata = ctx.session().role_metadata().clone();
3930            let current_storage_parameters = self.controller.storage.config().clone();
3931
3932            task::spawn(
3933                || format!("validate_alter_connection:{conn_id}"),
3934                async move {
3935                    let resolved_ids = conn.resolved_ids.clone();
3936                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3937                    let result = match connection.validate(id, &current_storage_parameters).await {
3938                        Ok(()) => Ok(conn),
3939                        Err(err) => Err(err.into()),
3940                    };
3941
3942                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3943                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3944                        AlterConnectionValidationReady {
3945                            ctx,
3946                            result,
3947                            connection_id: id,
3948                            connection_gid,
3949                            plan_validity: PlanValidity::new(
3950                                transient_revision,
3951                                dependency_ids.clone(),
3952                                None,
3953                                None,
3954                                role_metadata,
3955                            ),
3956                            otel_ctx,
3957                            resolved_ids,
3958                        },
3959                    ));
3960                    if let Err(e) = result {
3961                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3962                    }
3963                },
3964            );
3965        } else {
3966            let result = self
3967                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3968                .await;
3969            ctx.retire(result);
3970        }
3971    }
3972
3973    #[instrument]
3974    pub(crate) async fn sequence_alter_connection_stage_finish(
3975        &mut self,
3976        session: &mut Session,
3977        id: CatalogItemId,
3978        connection: Connection,
3979    ) -> Result<ExecuteResponse, AdapterError> {
3980        match self.catalog.get_entry(&id).item() {
3981            CatalogItem::Connection(curr_conn) => {
3982                curr_conn
3983                    .details
3984                    .to_connection()
3985                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3986                    .map_err(StorageError::from)?;
3987            }
3988            _ => unreachable!("known to be a connection"),
3989        };
3990
3991        let ops = vec![catalog::Op::UpdateItem {
3992            id,
3993            name: self.catalog.get_entry(&id).name().clone(),
3994            to_item: CatalogItem::Connection(connection.clone()),
3995        }];
3996
3997        self.catalog_transact(Some(session), ops).await?;
3998
3999        match connection.details {
4000            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
4001                let spec = VpcEndpointConfig {
4002                    aws_service_name: privatelink.service_name.to_owned(),
4003                    availability_zone_ids: privatelink.availability_zones.to_owned(),
4004                };
4005                self.cloud_resource_controller
4006                    .as_ref()
4007                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
4008                    .ensure_vpc_endpoint(id, spec)
4009                    .await?;
4010            }
4011            _ => {}
4012        };
4013
4014        let entry = self.catalog().get_entry(&id);
4015
4016        let mut connections = VecDeque::new();
4017        connections.push_front(entry.id());
4018
4019        let mut source_connections = BTreeMap::new();
4020        let mut sink_connections = BTreeMap::new();
4021        let mut source_export_data_configs = BTreeMap::new();
4022
4023        while let Some(id) = connections.pop_front() {
4024            for id in self.catalog.get_entry(&id).used_by() {
4025                let entry = self.catalog.get_entry(id);
4026                match entry.item() {
4027                    CatalogItem::Connection(_) => connections.push_back(*id),
4028                    CatalogItem::Source(source) => {
4029                        let desc = match &entry.source().expect("known to be source").data_source {
4030                            DataSourceDesc::Ingestion { desc, .. }
4031                            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4032                                desc.clone().into_inline_connection(self.catalog().state())
4033                            }
4034                            _ => unreachable!("only ingestions reference connections"),
4035                        };
4036
4037                        source_connections.insert(source.global_id, desc.connection);
4038                    }
4039                    CatalogItem::Sink(sink) => {
4040                        let export = entry.sink().expect("known to be sink");
4041                        sink_connections.insert(
4042                            sink.global_id,
4043                            export
4044                                .connection
4045                                .clone()
4046                                .into_inline_connection(self.catalog().state()),
4047                        );
4048                    }
4049                    CatalogItem::Table(table) => {
4050                        // This is a source-fed table that reference a schema registry
4051                        // connection as a part of its encoding / data config
4052                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4053                            let data_config = export_data_config.clone();
4054                            source_export_data_configs.insert(
4055                                table.global_id_writes(),
4056                                data_config.into_inline_connection(self.catalog().state()),
4057                            );
4058                        }
4059                    }
4060                    t => unreachable!("connection dependency not expected on {:?}", t),
4061                }
4062            }
4063        }
4064
4065        if !source_connections.is_empty() {
4066            self.controller
4067                .storage
4068                .alter_ingestion_connections(source_connections)
4069                .await
4070                .unwrap_or_terminate("cannot fail to alter ingestion connection");
4071        }
4072
4073        if !sink_connections.is_empty() {
4074            self.controller
4075                .storage
4076                .alter_export_connections(sink_connections)
4077                .await
4078                .unwrap_or_terminate("altering exports after txn must succeed");
4079        }
4080
4081        if !source_export_data_configs.is_empty() {
4082            self.controller
4083                .storage
4084                .alter_ingestion_export_data_configs(source_export_data_configs)
4085                .await
4086                .unwrap_or_terminate("altering source export data configs after txn must succeed");
4087        }
4088
4089        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4090    }
4091
4092    #[instrument]
4093    pub(super) async fn sequence_alter_source(
4094        &mut self,
4095        session: &Session,
4096        plan::AlterSourcePlan {
4097            item_id,
4098            ingestion_id,
4099            action,
4100        }: plan::AlterSourcePlan,
4101    ) -> Result<ExecuteResponse, AdapterError> {
4102        let cur_entry = self.catalog().get_entry(&item_id);
4103        let cur_source = cur_entry.source().expect("known to be source");
4104
4105        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4106            // Parse statement.
4107            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4108                .expect("invalid create sql persisted to catalog")
4109                .into_element()
4110                .ast
4111            {
4112                Statement::CreateSource(stmt) => stmt,
4113                _ => unreachable!("proved type is source"),
4114            };
4115
4116            let catalog = coord.catalog().for_system_session();
4117
4118            // Resolve items in statement
4119            mz_sql::names::resolve(&catalog, create_source_stmt)
4120                .map_err(|e| AdapterError::internal(err_cx, e))
4121        };
4122
4123        match action {
4124            plan::AlterSourceAction::AddSubsourceExports {
4125                subsources,
4126                options,
4127            } => {
4128                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4129
4130                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4131                    text_columns: mut new_text_columns,
4132                    exclude_columns: mut new_exclude_columns,
4133                    ..
4134                } = options.try_into()?;
4135
4136                // Resolve items in statement
4137                let (mut create_source_stmt, resolved_ids) =
4138                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4139
4140                // Get all currently referred-to items
4141                let catalog = self.catalog();
4142                let curr_references: BTreeSet<_> = catalog
4143                    .get_entry(&item_id)
4144                    .used_by()
4145                    .into_iter()
4146                    .filter_map(|subsource| {
4147                        catalog
4148                            .get_entry(subsource)
4149                            .subsource_details()
4150                            .map(|(_id, reference, _details)| reference)
4151                    })
4152                    .collect();
4153
4154                // We are doing a lot of unwrapping, so just make an error to reference; all of
4155                // these invariants are guaranteed to be true because of how we plan subsources.
4156                let purification_err =
4157                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4158
4159                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
4160                // we remove support for implicitly created subsources from a `CREATE SOURCE`
4161                // statement.
4162                match &mut create_source_stmt.connection {
4163                    CreateSourceConnection::Postgres {
4164                        options: curr_options,
4165                        ..
4166                    } => {
4167                        let mz_sql::plan::PgConfigOptionExtracted {
4168                            mut text_columns, ..
4169                        } = curr_options.clone().try_into()?;
4170
4171                        // Drop text columns; we will add them back in
4172                        // as appropriate below.
4173                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4174
4175                        // Drop all text columns that are not currently referred to.
4176                        text_columns.retain(|column_qualified_reference| {
4177                            mz_ore::soft_assert_eq_or_log!(
4178                                column_qualified_reference.0.len(),
4179                                4,
4180                                "all TEXT COLUMNS values must be column-qualified references"
4181                            );
4182                            let mut table = column_qualified_reference.clone();
4183                            table.0.truncate(3);
4184                            curr_references.contains(&table)
4185                        });
4186
4187                        // Merge the current text columns into the new text columns.
4188                        new_text_columns.extend(text_columns);
4189
4190                        // If we have text columns, add them to the options.
4191                        if !new_text_columns.is_empty() {
4192                            new_text_columns.sort();
4193                            let new_text_columns = new_text_columns
4194                                .into_iter()
4195                                .map(WithOptionValue::UnresolvedItemName)
4196                                .collect();
4197
4198                            curr_options.push(PgConfigOption {
4199                                name: PgConfigOptionName::TextColumns,
4200                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4201                            });
4202                        }
4203                    }
4204                    CreateSourceConnection::MySql {
4205                        options: curr_options,
4206                        ..
4207                    } => {
4208                        let mz_sql::plan::MySqlConfigOptionExtracted {
4209                            mut text_columns,
4210                            mut exclude_columns,
4211                            ..
4212                        } = curr_options.clone().try_into()?;
4213
4214                        // Drop both ignore and text columns; we will add them back in
4215                        // as appropriate below.
4216                        curr_options.retain(|o| {
4217                            !matches!(
4218                                o.name,
4219                                MySqlConfigOptionName::TextColumns
4220                                    | MySqlConfigOptionName::ExcludeColumns
4221                            )
4222                        });
4223
4224                        // Drop all text / exclude columns that are not currently referred to.
4225                        let column_referenced =
4226                            |column_qualified_reference: &UnresolvedItemName| {
4227                                mz_ore::soft_assert_eq_or_log!(
4228                                    column_qualified_reference.0.len(),
4229                                    3,
4230                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4231                                );
4232                                let mut table = column_qualified_reference.clone();
4233                                table.0.truncate(2);
4234                                curr_references.contains(&table)
4235                            };
4236                        text_columns.retain(column_referenced);
4237                        exclude_columns.retain(column_referenced);
4238
4239                        // Merge the current text / exclude columns into the new text / exclude columns.
4240                        new_text_columns.extend(text_columns);
4241                        new_exclude_columns.extend(exclude_columns);
4242
4243                        // If we have text columns, add them to the options.
4244                        if !new_text_columns.is_empty() {
4245                            new_text_columns.sort();
4246                            let new_text_columns = new_text_columns
4247                                .into_iter()
4248                                .map(WithOptionValue::UnresolvedItemName)
4249                                .collect();
4250
4251                            curr_options.push(MySqlConfigOption {
4252                                name: MySqlConfigOptionName::TextColumns,
4253                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4254                            });
4255                        }
4256                        // If we have exclude columns, add them to the options.
4257                        if !new_exclude_columns.is_empty() {
4258                            new_exclude_columns.sort();
4259                            let new_exclude_columns = new_exclude_columns
4260                                .into_iter()
4261                                .map(WithOptionValue::UnresolvedItemName)
4262                                .collect();
4263
4264                            curr_options.push(MySqlConfigOption {
4265                                name: MySqlConfigOptionName::ExcludeColumns,
4266                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4267                            });
4268                        }
4269                    }
4270                    CreateSourceConnection::SqlServer {
4271                        options: curr_options,
4272                        ..
4273                    } => {
4274                        let mz_sql::plan::SqlServerConfigOptionExtracted {
4275                            mut text_columns,
4276                            mut exclude_columns,
4277                            ..
4278                        } = curr_options.clone().try_into()?;
4279
4280                        // Drop both ignore and text columns; we will add them back in
4281                        // as appropriate below.
4282                        curr_options.retain(|o| {
4283                            !matches!(
4284                                o.name,
4285                                SqlServerConfigOptionName::TextColumns
4286                                    | SqlServerConfigOptionName::ExcludeColumns
4287                            )
4288                        });
4289
4290                        // Drop all text / exclude columns that are not currently referred to.
4291                        let column_referenced =
4292                            |column_qualified_reference: &UnresolvedItemName| {
4293                                mz_ore::soft_assert_eq_or_log!(
4294                                    column_qualified_reference.0.len(),
4295                                    3,
4296                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4297                                );
4298                                let mut table = column_qualified_reference.clone();
4299                                table.0.truncate(2);
4300                                curr_references.contains(&table)
4301                            };
4302                        text_columns.retain(column_referenced);
4303                        exclude_columns.retain(column_referenced);
4304
4305                        // Merge the current text / exclude columns into the new text / exclude columns.
4306                        new_text_columns.extend(text_columns);
4307                        new_exclude_columns.extend(exclude_columns);
4308
4309                        // If we have text columns, add them to the options.
4310                        if !new_text_columns.is_empty() {
4311                            new_text_columns.sort();
4312                            let new_text_columns = new_text_columns
4313                                .into_iter()
4314                                .map(WithOptionValue::UnresolvedItemName)
4315                                .collect();
4316
4317                            curr_options.push(SqlServerConfigOption {
4318                                name: SqlServerConfigOptionName::TextColumns,
4319                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4320                            });
4321                        }
4322                        // If we have exclude columns, add them to the options.
4323                        if !new_exclude_columns.is_empty() {
4324                            new_exclude_columns.sort();
4325                            let new_exclude_columns = new_exclude_columns
4326                                .into_iter()
4327                                .map(WithOptionValue::UnresolvedItemName)
4328                                .collect();
4329
4330                            curr_options.push(SqlServerConfigOption {
4331                                name: SqlServerConfigOptionName::ExcludeColumns,
4332                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4333                            });
4334                        }
4335                    }
4336                    _ => return Err(purification_err()),
4337                };
4338
4339                let mut catalog = self.catalog().for_system_session();
4340                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4341
4342                // Re-define our source in terms of the amended statement
4343                let plan = match mz_sql::plan::plan(
4344                    None,
4345                    &catalog,
4346                    Statement::CreateSource(create_source_stmt),
4347                    &Params::empty(),
4348                    &resolved_ids,
4349                )
4350                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4351                {
4352                    Plan::CreateSource(plan) => plan,
4353                    _ => unreachable!("create source plan is only valid response"),
4354                };
4355
4356                // Asserting that we've done the right thing with dependencies
4357                // here requires mocking out objects in the catalog, which is a
4358                // large task for an operation we have to cover in tests anyway.
4359                let source = Source::new(
4360                    plan,
4361                    cur_source.global_id,
4362                    resolved_ids,
4363                    cur_source.custom_logical_compaction_window,
4364                    cur_source.is_retained_metrics_object,
4365                );
4366
4367                let source_compaction_window = source.custom_logical_compaction_window;
4368
4369                // Get new ingestion description for storage.
4370                let desc = match &source.data_source {
4371                    DataSourceDesc::Ingestion { desc, .. }
4372                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4373                        desc.clone().into_inline_connection(self.catalog().state())
4374                    }
4375                    _ => unreachable!("already verified of type ingestion"),
4376                };
4377
4378                self.controller
4379                    .storage
4380                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4381                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4382
4383                // Redefine source. This must be done before we create any new
4384                // subsources so that it has the right ingestion.
4385                let mut ops = vec![catalog::Op::UpdateItem {
4386                    id: item_id,
4387                    // Look this up again so we don't have to hold an immutable reference to the
4388                    // entry for so long.
4389                    name: self.catalog.get_entry(&item_id).name().clone(),
4390                    to_item: CatalogItem::Source(source),
4391                }];
4392
4393                let CreateSourceInner {
4394                    ops: new_ops,
4395                    sources,
4396                    if_not_exists_ids,
4397                } = self.create_source_inner(session, subsources).await?;
4398
4399                ops.extend(new_ops.into_iter());
4400
4401                assert!(
4402                    if_not_exists_ids.is_empty(),
4403                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4404                );
4405
4406                self.catalog_transact(Some(session), ops).await?;
4407
4408                let mut item_ids = BTreeSet::new();
4409                let mut collections = Vec::with_capacity(sources.len());
4410                for (item_id, source) in sources {
4411                    let status_id = self.catalog().resolve_builtin_storage_collection(
4412                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4413                    );
4414                    let source_status_collection_id =
4415                        Some(self.catalog().get_entry(&status_id).latest_global_id());
4416
4417                    let (data_source, status_collection_id) = match source.data_source {
4418                        // Subsources use source statuses.
4419                        DataSourceDesc::IngestionExport {
4420                            ingestion_id,
4421                            external_reference: _,
4422                            details,
4423                            data_config,
4424                        } => {
4425                            // TODO(parkmycar): We should probably check the type here, but I'm not sure if
4426                            // this will always be a Source or a Table.
4427                            let ingestion_id =
4428                                self.catalog().get_entry(&ingestion_id).latest_global_id();
4429                            (
4430                                DataSource::IngestionExport {
4431                                    ingestion_id,
4432                                    details,
4433                                    data_config: data_config
4434                                        .into_inline_connection(self.catalog().state()),
4435                                },
4436                                source_status_collection_id,
4437                            )
4438                        }
4439                        o => {
4440                            unreachable!(
4441                                "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4442                                o
4443                            )
4444                        }
4445                    };
4446
4447                    collections.push((
4448                        source.global_id,
4449                        CollectionDescription {
4450                            desc: source.desc.clone(),
4451                            data_source,
4452                            since: None,
4453                            status_collection_id,
4454                            timeline: Some(source.timeline.clone()),
4455                        },
4456                    ));
4457
4458                    item_ids.insert(item_id);
4459                }
4460
4461                let storage_metadata = self.catalog.state().storage_metadata();
4462
4463                self.controller
4464                    .storage
4465                    .create_collections(storage_metadata, None, collections)
4466                    .await
4467                    .unwrap_or_terminate("cannot fail to create collections");
4468
4469                self.initialize_storage_read_policies(
4470                    item_ids,
4471                    source_compaction_window.unwrap_or(CompactionWindow::Default),
4472                )
4473                .await;
4474            }
4475            plan::AlterSourceAction::RefreshReferences { references } => {
4476                self.catalog_transact(
4477                    Some(session),
4478                    vec![catalog::Op::UpdateSourceReferences {
4479                        source_id: item_id,
4480                        references: references.into(),
4481                    }],
4482                )
4483                .await?;
4484            }
4485        }
4486
4487        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4488    }
4489
4490    #[instrument]
4491    pub(super) async fn sequence_alter_system_set(
4492        &mut self,
4493        session: &Session,
4494        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4495    ) -> Result<ExecuteResponse, AdapterError> {
4496        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4497        // We want to ensure that the network policy we're switching too actually exists.
4498        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4499            self.validate_alter_system_network_policy(session, &value)?;
4500        }
4501
4502        let op = match value {
4503            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4504                name: name.clone(),
4505                value: OwnedVarInput::SqlSet(values),
4506            },
4507            plan::VariableValue::Default => {
4508                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4509            }
4510        };
4511        self.catalog_transact(Some(session), vec![op]).await?;
4512
4513        session.add_notice(AdapterNotice::VarDefaultUpdated {
4514            role: None,
4515            var_name: Some(name),
4516        });
4517        Ok(ExecuteResponse::AlteredSystemConfiguration)
4518    }
4519
4520    #[instrument]
4521    pub(super) async fn sequence_alter_system_reset(
4522        &mut self,
4523        session: &Session,
4524        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4525    ) -> Result<ExecuteResponse, AdapterError> {
4526        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4527        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4528        self.catalog_transact(Some(session), vec![op]).await?;
4529        session.add_notice(AdapterNotice::VarDefaultUpdated {
4530            role: None,
4531            var_name: Some(name),
4532        });
4533        Ok(ExecuteResponse::AlteredSystemConfiguration)
4534    }
4535
4536    #[instrument]
4537    pub(super) async fn sequence_alter_system_reset_all(
4538        &mut self,
4539        session: &Session,
4540        _: plan::AlterSystemResetAllPlan,
4541    ) -> Result<ExecuteResponse, AdapterError> {
4542        self.is_user_allowed_to_alter_system(session, None)?;
4543        let op = catalog::Op::ResetAllSystemConfiguration;
4544        self.catalog_transact(Some(session), vec![op]).await?;
4545        session.add_notice(AdapterNotice::VarDefaultUpdated {
4546            role: None,
4547            var_name: None,
4548        });
4549        Ok(ExecuteResponse::AlteredSystemConfiguration)
4550    }
4551
4552    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4553    fn is_user_allowed_to_alter_system(
4554        &self,
4555        session: &Session,
4556        var_name: Option<&str>,
4557    ) -> Result<(), AdapterError> {
4558        match (session.user().kind(), var_name) {
4559            // Only internal superusers can reset all system variables.
4560            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4561            // Whether or not a variable can be modified depends if we're an internal superuser.
4562            (UserKind::Superuser, Some(name))
4563                if session.user().is_internal()
4564                    || self.catalog().system_config().user_modifiable(name) =>
4565            {
4566                // In lieu of plumbing the user to all system config functions, just check that
4567                // the var is visible.
4568                let var = self.catalog().system_config().get(name)?;
4569                var.visible(session.user(), self.catalog().system_config())?;
4570                Ok(())
4571            }
4572            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4573            // session variables.
4574            (UserKind::Regular, Some(name))
4575                if self.catalog().system_config().user_modifiable(name) =>
4576            {
4577                Err(AdapterError::Unauthorized(
4578                    rbac::UnauthorizedError::Superuser {
4579                        action: format!("toggle the '{name}' system configuration parameter"),
4580                    },
4581                ))
4582            }
4583            _ => Err(AdapterError::Unauthorized(
4584                rbac::UnauthorizedError::MzSystem {
4585                    action: "alter system".into(),
4586                },
4587            )),
4588        }
4589    }
4590
4591    fn validate_alter_system_network_policy(
4592        &self,
4593        session: &Session,
4594        policy_value: &plan::VariableValue,
4595    ) -> Result<(), AdapterError> {
4596        let policy_name = match &policy_value {
4597            // Make sure the compiled in default still exists.
4598            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4599            plan::VariableValue::Values(values) if values.len() == 1 => {
4600                values.iter().next().cloned()
4601            }
4602            plan::VariableValue::Values(values) => {
4603                tracing::warn!(?values, "can't set multiple network policies at once");
4604                None
4605            }
4606        };
4607        let maybe_network_policy = policy_name
4608            .as_ref()
4609            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4610        let Some(network_policy) = maybe_network_policy else {
4611            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4612                VarError::InvalidParameterValue {
4613                    name: NETWORK_POLICY.name(),
4614                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4615                    reason: "no network policy with such name exists".to_string(),
4616                },
4617            )));
4618        };
4619        self.validate_alter_network_policy(session, &network_policy.rules)
4620    }
4621
4622    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4623    ///
4624    /// This helps prevent users from modifying network policies in a way that would lock out their
4625    /// current connection.
4626    fn validate_alter_network_policy(
4627        &self,
4628        session: &Session,
4629        policy_rules: &Vec<NetworkPolicyRule>,
4630    ) -> Result<(), AdapterError> {
4631        // If the user is not an internal user attempt to protect them from
4632        // blocking themselves.
4633        if session.user().is_internal() {
4634            return Ok(());
4635        }
4636        if let Some(ip) = session.meta().client_ip() {
4637            validate_ip_with_policy_rules(ip, policy_rules)
4638                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4639        } else {
4640            // Sessions without IPs are only temporarily constructed for default values
4641            // they should not be permitted here.
4642            return Err(AdapterError::NetworkPolicyDenied(
4643                NetworkPolicyError::MissingIp,
4644            ));
4645        }
4646        Ok(())
4647    }
4648
4649    // Returns the name of the portal to execute.
4650    #[instrument]
4651    pub(super) fn sequence_execute(
4652        &mut self,
4653        session: &mut Session,
4654        plan: plan::ExecutePlan,
4655    ) -> Result<String, AdapterError> {
4656        // Verify the stmt is still valid.
4657        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4658        let ps = session
4659            .get_prepared_statement_unverified(&plan.name)
4660            .expect("known to exist");
4661        let stmt = ps.stmt().cloned();
4662        let desc = ps.desc().clone();
4663        let state_revision = ps.state_revision;
4664        let logging = Arc::clone(ps.logging());
4665        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4666    }
4667
4668    #[instrument]
4669    pub(super) async fn sequence_grant_privileges(
4670        &mut self,
4671        session: &Session,
4672        plan::GrantPrivilegesPlan {
4673            update_privileges,
4674            grantees,
4675        }: plan::GrantPrivilegesPlan,
4676    ) -> Result<ExecuteResponse, AdapterError> {
4677        self.sequence_update_privileges(
4678            session,
4679            update_privileges,
4680            grantees,
4681            UpdatePrivilegeVariant::Grant,
4682        )
4683        .await
4684    }
4685
4686    #[instrument]
4687    pub(super) async fn sequence_revoke_privileges(
4688        &mut self,
4689        session: &Session,
4690        plan::RevokePrivilegesPlan {
4691            update_privileges,
4692            revokees,
4693        }: plan::RevokePrivilegesPlan,
4694    ) -> Result<ExecuteResponse, AdapterError> {
4695        self.sequence_update_privileges(
4696            session,
4697            update_privileges,
4698            revokees,
4699            UpdatePrivilegeVariant::Revoke,
4700        )
4701        .await
4702    }
4703
4704    #[instrument]
4705    async fn sequence_update_privileges(
4706        &mut self,
4707        session: &Session,
4708        update_privileges: Vec<UpdatePrivilege>,
4709        grantees: Vec<RoleId>,
4710        variant: UpdatePrivilegeVariant,
4711    ) -> Result<ExecuteResponse, AdapterError> {
4712        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4713        let mut warnings = Vec::new();
4714        let catalog = self.catalog().for_session(session);
4715
4716        for UpdatePrivilege {
4717            acl_mode,
4718            target_id,
4719            grantor,
4720        } in update_privileges
4721        {
4722            let actual_object_type = catalog.get_system_object_type(&target_id);
4723            // For all relations we allow all applicable table privileges, but send a warning if the
4724            // privilege isn't actually applicable to the object type.
4725            if actual_object_type.is_relation() {
4726                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4727                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4728                if !non_applicable_privileges.is_empty() {
4729                    let object_description =
4730                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4731                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4732                        non_applicable_privileges,
4733                        object_description,
4734                    })
4735                }
4736            }
4737
4738            if let SystemObjectId::Object(object_id) = &target_id {
4739                self.catalog()
4740                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4741            }
4742
4743            let privileges = self
4744                .catalog()
4745                .get_privileges(&target_id, session.conn_id())
4746                // Should be unreachable since the parser will refuse to parse grant/revoke
4747                // statements on objects without privileges.
4748                .ok_or(AdapterError::Unsupported(
4749                    "GRANTs/REVOKEs on an object type with no privileges",
4750                ))?;
4751
4752            for grantee in &grantees {
4753                self.catalog().ensure_not_system_role(grantee)?;
4754                self.catalog().ensure_not_predefined_role(grantee)?;
4755                let existing_privilege = privileges
4756                    .get_acl_item(grantee, &grantor)
4757                    .map(Cow::Borrowed)
4758                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4759
4760                match variant {
4761                    UpdatePrivilegeVariant::Grant
4762                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4763                    {
4764                        ops.push(catalog::Op::UpdatePrivilege {
4765                            target_id: target_id.clone(),
4766                            privilege: MzAclItem {
4767                                grantee: *grantee,
4768                                grantor,
4769                                acl_mode,
4770                            },
4771                            variant,
4772                        });
4773                    }
4774                    UpdatePrivilegeVariant::Revoke
4775                        if !existing_privilege
4776                            .acl_mode
4777                            .intersection(acl_mode)
4778                            .is_empty() =>
4779                    {
4780                        ops.push(catalog::Op::UpdatePrivilege {
4781                            target_id: target_id.clone(),
4782                            privilege: MzAclItem {
4783                                grantee: *grantee,
4784                                grantor,
4785                                acl_mode,
4786                            },
4787                            variant,
4788                        });
4789                    }
4790                    // no-op
4791                    _ => {}
4792                }
4793            }
4794        }
4795
4796        if ops.is_empty() {
4797            session.add_notices(warnings);
4798            return Ok(variant.into());
4799        }
4800
4801        let res = self
4802            .catalog_transact(Some(session), ops)
4803            .await
4804            .map(|_| match variant {
4805                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4806                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4807            });
4808        if res.is_ok() {
4809            session.add_notices(warnings);
4810        }
4811        res
4812    }
4813
4814    #[instrument]
4815    pub(super) async fn sequence_alter_default_privileges(
4816        &mut self,
4817        session: &Session,
4818        plan::AlterDefaultPrivilegesPlan {
4819            privilege_objects,
4820            privilege_acl_items,
4821            is_grant,
4822        }: plan::AlterDefaultPrivilegesPlan,
4823    ) -> Result<ExecuteResponse, AdapterError> {
4824        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4825        let variant = if is_grant {
4826            UpdatePrivilegeVariant::Grant
4827        } else {
4828            UpdatePrivilegeVariant::Revoke
4829        };
4830        for privilege_object in &privilege_objects {
4831            self.catalog()
4832                .ensure_not_system_role(&privilege_object.role_id)?;
4833            self.catalog()
4834                .ensure_not_predefined_role(&privilege_object.role_id)?;
4835            if let Some(database_id) = privilege_object.database_id {
4836                self.catalog()
4837                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4838            }
4839            if let Some(schema_id) = privilege_object.schema_id {
4840                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4841                let schema_spec: SchemaSpecifier = schema_id.into();
4842
4843                self.catalog().ensure_not_reserved_object(
4844                    &(database_spec, schema_spec).into(),
4845                    session.conn_id(),
4846                )?;
4847            }
4848            for privilege_acl_item in &privilege_acl_items {
4849                self.catalog()
4850                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4851                self.catalog()
4852                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4853                ops.push(catalog::Op::UpdateDefaultPrivilege {
4854                    privilege_object: privilege_object.clone(),
4855                    privilege_acl_item: privilege_acl_item.clone(),
4856                    variant,
4857                })
4858            }
4859        }
4860
4861        self.catalog_transact(Some(session), ops).await?;
4862        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4863    }
4864
4865    #[instrument]
4866    pub(super) async fn sequence_grant_role(
4867        &mut self,
4868        session: &Session,
4869        plan::GrantRolePlan {
4870            role_ids,
4871            member_ids,
4872            grantor_id,
4873        }: plan::GrantRolePlan,
4874    ) -> Result<ExecuteResponse, AdapterError> {
4875        let catalog = self.catalog();
4876        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4877        for role_id in role_ids {
4878            for member_id in &member_ids {
4879                let member_membership: BTreeSet<_> =
4880                    catalog.get_role(member_id).membership().keys().collect();
4881                if member_membership.contains(&role_id) {
4882                    let role_name = catalog.get_role(&role_id).name().to_string();
4883                    let member_name = catalog.get_role(member_id).name().to_string();
4884                    // We need this check so we don't accidentally return a success on a reserved role.
4885                    catalog.ensure_not_reserved_role(member_id)?;
4886                    catalog.ensure_grantable_role(&role_id)?;
4887                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4888                        role_name,
4889                        member_name,
4890                    });
4891                } else {
4892                    ops.push(catalog::Op::GrantRole {
4893                        role_id,
4894                        member_id: *member_id,
4895                        grantor_id,
4896                    });
4897                }
4898            }
4899        }
4900
4901        if ops.is_empty() {
4902            return Ok(ExecuteResponse::GrantedRole);
4903        }
4904
4905        self.catalog_transact(Some(session), ops)
4906            .await
4907            .map(|_| ExecuteResponse::GrantedRole)
4908    }
4909
4910    #[instrument]
4911    pub(super) async fn sequence_revoke_role(
4912        &mut self,
4913        session: &Session,
4914        plan::RevokeRolePlan {
4915            role_ids,
4916            member_ids,
4917            grantor_id,
4918        }: plan::RevokeRolePlan,
4919    ) -> Result<ExecuteResponse, AdapterError> {
4920        let catalog = self.catalog();
4921        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4922        for role_id in role_ids {
4923            for member_id in &member_ids {
4924                let member_membership: BTreeSet<_> =
4925                    catalog.get_role(member_id).membership().keys().collect();
4926                if !member_membership.contains(&role_id) {
4927                    let role_name = catalog.get_role(&role_id).name().to_string();
4928                    let member_name = catalog.get_role(member_id).name().to_string();
4929                    // We need this check so we don't accidentally return a success on a reserved role.
4930                    catalog.ensure_not_reserved_role(member_id)?;
4931                    catalog.ensure_grantable_role(&role_id)?;
4932                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4933                        role_name,
4934                        member_name,
4935                    });
4936                } else {
4937                    ops.push(catalog::Op::RevokeRole {
4938                        role_id,
4939                        member_id: *member_id,
4940                        grantor_id,
4941                    });
4942                }
4943            }
4944        }
4945
4946        if ops.is_empty() {
4947            return Ok(ExecuteResponse::RevokedRole);
4948        }
4949
4950        self.catalog_transact(Some(session), ops)
4951            .await
4952            .map(|_| ExecuteResponse::RevokedRole)
4953    }
4954
4955    #[instrument]
4956    pub(super) async fn sequence_alter_owner(
4957        &mut self,
4958        session: &Session,
4959        plan::AlterOwnerPlan {
4960            id,
4961            object_type,
4962            new_owner,
4963        }: plan::AlterOwnerPlan,
4964    ) -> Result<ExecuteResponse, AdapterError> {
4965        let mut ops = vec![catalog::Op::UpdateOwner {
4966            id: id.clone(),
4967            new_owner,
4968        }];
4969
4970        match &id {
4971            ObjectId::Item(global_id) => {
4972                let entry = self.catalog().get_entry(global_id);
4973
4974                // Cannot directly change the owner of an index.
4975                if entry.is_index() {
4976                    let name = self
4977                        .catalog()
4978                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4979                        .to_string();
4980                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4981                    return Ok(ExecuteResponse::AlteredObject(object_type));
4982                }
4983
4984                // Alter owner cascades down to dependent indexes.
4985                let dependent_index_ops = entry
4986                    .used_by()
4987                    .into_iter()
4988                    .filter(|id| self.catalog().get_entry(id).is_index())
4989                    .map(|id| catalog::Op::UpdateOwner {
4990                        id: ObjectId::Item(*id),
4991                        new_owner,
4992                    });
4993                ops.extend(dependent_index_ops);
4994
4995                // Alter owner cascades down to progress collections.
4996                let dependent_subsources =
4997                    entry
4998                        .progress_id()
4999                        .into_iter()
5000                        .map(|item_id| catalog::Op::UpdateOwner {
5001                            id: ObjectId::Item(item_id),
5002                            new_owner,
5003                        });
5004                ops.extend(dependent_subsources);
5005            }
5006            ObjectId::Cluster(cluster_id) => {
5007                let cluster = self.catalog().get_cluster(*cluster_id);
5008                // Alter owner cascades down to cluster replicas.
5009                let managed_cluster_replica_ops =
5010                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
5011                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
5012                        new_owner,
5013                    });
5014                ops.extend(managed_cluster_replica_ops);
5015            }
5016            _ => {}
5017        }
5018
5019        self.catalog_transact(Some(session), ops)
5020            .await
5021            .map(|_| ExecuteResponse::AlteredObject(object_type))
5022    }
5023
5024    #[instrument]
5025    pub(super) async fn sequence_reassign_owned(
5026        &mut self,
5027        session: &Session,
5028        plan::ReassignOwnedPlan {
5029            old_roles,
5030            new_role,
5031            reassign_ids,
5032        }: plan::ReassignOwnedPlan,
5033    ) -> Result<ExecuteResponse, AdapterError> {
5034        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5035            self.catalog().ensure_not_reserved_role(role_id)?;
5036        }
5037
5038        let ops = reassign_ids
5039            .into_iter()
5040            .map(|id| catalog::Op::UpdateOwner {
5041                id,
5042                new_owner: new_role,
5043            })
5044            .collect();
5045
5046        self.catalog_transact(Some(session), ops)
5047            .await
5048            .map(|_| ExecuteResponse::ReassignOwned)
5049    }
5050
5051    #[instrument]
5052    pub(crate) async fn handle_deferred_statement(&mut self) {
5053        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
5054        // was processed, removing the single element from deferred_statements, so it is expected
5055        // that this is sometimes empty.
5056        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5057            return;
5058        };
5059        match ps {
5060            crate::coord::PlanStatement::Statement { stmt, params } => {
5061                self.handle_execute_inner(stmt, params, ctx).await;
5062            }
5063            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5064                self.sequence_plan(ctx, plan, resolved_ids).await;
5065            }
5066        }
5067    }
5068
5069    #[instrument]
5070    // TODO(parkmycar): Remove this once we have an actual implementation.
5071    #[allow(clippy::unused_async)]
5072    pub(super) async fn sequence_alter_table(
5073        &mut self,
5074        ctx: &mut ExecuteContext,
5075        plan: plan::AlterTablePlan,
5076    ) -> Result<ExecuteResponse, AdapterError> {
5077        let plan::AlterTablePlan {
5078            relation_id,
5079            column_name,
5080            column_type,
5081            raw_sql_type,
5082        } = plan;
5083
5084        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
5085        let id_ts = self.get_catalog_write_ts().await;
5086        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5087        let ops = vec![catalog::Op::AlterAddColumn {
5088            id: relation_id,
5089            new_global_id,
5090            name: column_name,
5091            typ: column_type,
5092            sql: raw_sql_type,
5093        }];
5094
5095        let entry = self.catalog().get_entry(&relation_id);
5096        let CatalogItem::Table(table) = &entry.item else {
5097            let err = format!("expected table, found {:?}", entry.item);
5098            return Err(AdapterError::Internal(err));
5099        };
5100        // Expected schema version, before altering the table.
5101        let expected_version = table.desc.latest_version();
5102        let existing_global_id = table.global_id_writes();
5103
5104        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5105            Box::pin(async move {
5106                let entry = coord.catalog().get_entry(&relation_id);
5107                let CatalogItem::Table(table) = &entry.item else {
5108                    panic!("programming error, expected table found {:?}", entry.item);
5109                };
5110                let table = table.clone();
5111
5112                // Acquire a read hold on the original table for the duration of
5113                // the alter to prevent the since of the original table from
5114                // getting advanced, while the ALTER is running.
5115                let existing_table = crate::CollectionIdBundle {
5116                    storage_ids: btreeset![existing_global_id],
5117                    compute_ids: BTreeMap::new(),
5118                };
5119                let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5120
5121                let new_version = table.desc.latest_version();
5122                let new_desc = table
5123                    .desc
5124                    .at_version(RelationVersionSelector::Specific(new_version));
5125                let register_ts = coord.get_local_write_ts().await.timestamp;
5126
5127                // Alter the table description, creating a "new" collection.
5128                coord
5129                    .controller
5130                    .storage
5131                    .alter_table_desc(
5132                        existing_global_id,
5133                        new_global_id,
5134                        new_desc,
5135                        expected_version,
5136                        register_ts,
5137                    )
5138                    .await
5139                    .expect("failed to alter desc of table");
5140
5141                // Initialize the ReadPolicy which ensures we have the correct read holds.
5142                let compaction_window = table
5143                    .custom_logical_compaction_window
5144                    .unwrap_or(CompactionWindow::Default);
5145                coord
5146                    .initialize_read_policies(
5147                        &crate::CollectionIdBundle {
5148                            storage_ids: btreeset![new_global_id],
5149                            compute_ids: BTreeMap::new(),
5150                        },
5151                        compaction_window,
5152                    )
5153                    .await;
5154                coord.apply_local_write(register_ts).await;
5155
5156                // Alter is complete! We can drop our read hold.
5157                drop(existing_table_read_hold);
5158            })
5159        })
5160        .await?;
5161
5162        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5163    }
5164}
5165
5166#[derive(Debug)]
5167struct CachedStatisticsOracle {
5168    cache: BTreeMap<GlobalId, usize>,
5169}
5170
5171impl CachedStatisticsOracle {
5172    pub async fn new<T: TimelyTimestamp>(
5173        ids: &BTreeSet<GlobalId>,
5174        as_of: &Antichain<T>,
5175        storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5176    ) -> Result<Self, StorageError<T>> {
5177        let mut cache = BTreeMap::new();
5178
5179        for id in ids {
5180            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5181
5182            match stats {
5183                Ok(stats) => {
5184                    cache.insert(*id, stats.num_updates);
5185                }
5186                Err(StorageError::IdentifierMissing(id)) => {
5187                    ::tracing::debug!("no statistics for {id}")
5188                }
5189                Err(e) => return Err(e),
5190            }
5191        }
5192
5193        Ok(Self { cache })
5194    }
5195}
5196
5197impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5198    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5199        self.cache.get(&id).map(|estimate| *estimate)
5200    }
5201
5202    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5203        self.cache.clone()
5204    }
5205}
5206
5207impl Coordinator {
5208    pub(super) async fn statistics_oracle(
5209        &self,
5210        session: &Session,
5211        source_ids: &BTreeSet<GlobalId>,
5212        query_as_of: &Antichain<Timestamp>,
5213        is_oneshot: bool,
5214    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5215        if !session.vars().enable_session_cardinality_estimates() {
5216            return Ok(Box::new(EmptyStatisticsOracle));
5217        }
5218
5219        let timeout = if is_oneshot {
5220            // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
5221            self.catalog()
5222                .system_config()
5223                .optimizer_oneshot_stats_timeout()
5224        } else {
5225            self.catalog().system_config().optimizer_stats_timeout()
5226        };
5227
5228        let cached_stats = mz_ore::future::timeout(
5229            timeout,
5230            CachedStatisticsOracle::new(
5231                source_ids,
5232                query_as_of,
5233                self.controller.storage_collections.as_ref(),
5234            ),
5235        )
5236        .await;
5237
5238        match cached_stats {
5239            Ok(stats) => Ok(Box::new(stats)),
5240            Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5241                warn!(
5242                    is_oneshot = is_oneshot,
5243                    "optimizer statistics collection timed out after {}ms",
5244                    timeout.as_millis()
5245                );
5246
5247                Ok(Box::new(EmptyStatisticsOracle))
5248            }
5249            Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5250        }
5251    }
5252}
5253
5254/// Checks whether we should emit diagnostic
5255/// information associated with reading per-replica sources.
5256///
5257/// If an unrecoverable error is found (today: an untargeted read on a
5258/// cluster with a non-1 number of replicas), return that.  Otherwise,
5259/// return a list of associated notices (today: we always emit exactly
5260/// one notice if there are any per-replica log dependencies and if
5261/// `emit_introspection_query_notice` is set, and none otherwise.)
5262pub(super) fn check_log_reads(
5263    catalog: &Catalog,
5264    cluster: &Cluster,
5265    source_ids: &BTreeSet<GlobalId>,
5266    target_replica: &mut Option<ReplicaId>,
5267    vars: &SessionVars,
5268) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5269where
5270{
5271    let log_names = source_ids
5272        .iter()
5273        .map(|gid| catalog.resolve_item_id(gid))
5274        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5275        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5276        .collect::<Vec<_>>();
5277
5278    if log_names.is_empty() {
5279        return Ok(None);
5280    }
5281
5282    // Reading from log sources on replicated clusters is only allowed if a
5283    // target replica is selected. Otherwise, we have no way of knowing which
5284    // replica we read the introspection data from.
5285    let num_replicas = cluster.replicas().count();
5286    if target_replica.is_none() {
5287        if num_replicas == 1 {
5288            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5289        } else {
5290            return Err(AdapterError::UntargetedLogRead { log_names });
5291        }
5292    }
5293
5294    // Ensure that logging is initialized for the target replica, lest
5295    // we try to read from a non-existing arrangement.
5296    let replica_id = target_replica.expect("set to `Some` above");
5297    let replica = &cluster.replica(replica_id).expect("Replica must exist");
5298    if !replica.config.compute.logging.enabled() {
5299        return Err(AdapterError::IntrospectionDisabled { log_names });
5300    }
5301
5302    Ok(vars
5303        .emit_introspection_query_notice()
5304        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5305}
5306
5307impl Coordinator {
5308    /// Forward notices that we got from the optimizer.
5309    fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5310        // `for_session` below is expensive, so return early if there's nothing to do.
5311        if notices.is_empty() {
5312            return;
5313        }
5314        let humanizer = self.catalog.for_session(session);
5315        let system_vars = self.catalog.system_config();
5316        for notice in notices {
5317            let kind = OptimizerNoticeKind::from(notice);
5318            let notice_enabled = match kind {
5319                OptimizerNoticeKind::IndexAlreadyExists => {
5320                    system_vars.enable_notices_for_index_already_exists()
5321                }
5322                OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5323                    system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5324                }
5325                OptimizerNoticeKind::IndexKeyEmpty => {
5326                    system_vars.enable_notices_for_index_empty_key()
5327                }
5328            };
5329            if notice_enabled {
5330                // We don't need to redact the notice parts because
5331                // `emit_optimizer_notices` is only called by the `sequence_~`
5332                // method for the statement that produces that notice.
5333                session.add_notice(AdapterNotice::OptimizerNotice {
5334                    notice: notice.message(&humanizer, false).to_string(),
5335                    hint: notice.hint(&humanizer, false).to_string(),
5336                });
5337            }
5338            self.metrics
5339                .optimization_notices
5340                .with_label_values(&[kind.metric_label()])
5341                .inc_by(1);
5342        }
5343    }
5344
5345    /// Process the metainfo from a newly created non-transient dataflow.
5346    async fn process_dataflow_metainfo(
5347        &mut self,
5348        df_meta: DataflowMetainfo,
5349        export_id: GlobalId,
5350        ctx: Option<&mut ExecuteContext>,
5351        notice_ids: Vec<GlobalId>,
5352    ) -> Option<BuiltinTableAppendNotify> {
5353        // Emit raw notices to the user.
5354        if let Some(ctx) = ctx {
5355            self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5356        }
5357
5358        // Create a metainfo with rendered notices.
5359        let df_meta = self
5360            .catalog()
5361            .render_notices(df_meta, notice_ids, Some(export_id));
5362
5363        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
5364        // in-memory state.
5365        if self.catalog().state().system_config().enable_mz_notices()
5366            && !df_meta.optimizer_notices.is_empty()
5367        {
5368            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5369            self.catalog().state().pack_optimizer_notices(
5370                &mut builtin_table_updates,
5371                df_meta.optimizer_notices.iter(),
5372                Diff::ONE,
5373            );
5374
5375            // Save the metainfo.
5376            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5377
5378            Some(
5379                self.builtin_table_update()
5380                    .execute(builtin_table_updates)
5381                    .await
5382                    .0,
5383            )
5384        } else {
5385            // Save the metainfo.
5386            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5387
5388            None
5389        }
5390    }
5391}