mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27    CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46    CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47    RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50    AlterSourceAddSubsourceOption, CreateSourceOptionName, CreateSubsourceOption,
51    CreateSubsourceOptionName, SqlServerConfigOption, SqlServerConfigOptionName,
52};
53use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
54use mz_sql::catalog::{
55    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
56    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
57    ErrorMessageObjectDescription, ObjectType, RoleAttributes, RoleVars, SessionCatalog,
58};
59use mz_sql::names::{
60    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
61    SchemaSpecifier, SystemObjectId,
62};
63use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_storage_types::stats::RelationPartStats;
92use mz_transform::EmptyStatisticsOracle;
93use mz_transform::dataflow::DataflowMetainfo;
94use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
95use smallvec::SmallVec;
96use timely::progress::Antichain;
97use timely::progress::Timestamp as TimelyTimestamp;
98use tokio::sync::{oneshot, watch};
99use tracing::{Instrument, Span, info, warn};
100
101use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
102use crate::command::{ExecuteResponse, Response};
103use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
104use crate::coord::{
105    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
106    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
107    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
108    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
109    validate_ip_with_policy_rules,
110};
111use crate::error::AdapterError;
112use crate::notice::{AdapterNotice, DroppedInUseIndex};
113use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
114use crate::optimize::{self, Optimize};
115use crate::session::{
116    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
117    WriteLocks, WriteOp,
118};
119use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
120use crate::{PeekResponseUnary, ReadHolds};
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = match res {
255                        Ok(next) => return_if_err!(next, ctx),
256                        Err(err) => {
257                            tracing::error!("sequence_staged join error {err}");
258                            ctx.retire(Err(AdapterError::Internal(
259                                "sequence_staged join error".into(),
260                            )));
261                            return;
262                        }
263                    };
264                    f(ctx, next);
265                }
266                _ = rx, if cancel_enabled => {
267                    ctx.retire(Err(AdapterError::Canceled));
268                }
269            }
270        });
271    }
272
273    async fn create_source_inner(
274        &self,
275        session: &Session,
276        plans: Vec<plan::CreateSourcePlanBundle>,
277    ) -> Result<CreateSourceInner, AdapterError> {
278        let mut ops = vec![];
279        let mut sources = vec![];
280
281        let if_not_exists_ids = plans
282            .iter()
283            .filter_map(
284                |plan::CreateSourcePlanBundle {
285                     item_id,
286                     global_id: _,
287                     plan,
288                     resolved_ids: _,
289                     available_source_references: _,
290                 }| {
291                    if plan.if_not_exists {
292                        Some((*item_id, plan.name.clone()))
293                    } else {
294                        None
295                    }
296                },
297            )
298            .collect::<BTreeMap<_, _>>();
299
300        for plan::CreateSourcePlanBundle {
301            item_id,
302            global_id,
303            mut plan,
304            resolved_ids,
305            available_source_references,
306        } in plans
307        {
308            let name = plan.name.clone();
309
310            match plan.source.data_source {
311                plan::DataSourceDesc::Ingestion(ref ingestion) => {
312                    let cluster_id = plan
313                        .in_cluster
314                        .expect("ingestion plans must specify cluster");
315                    match ingestion.desc.connection {
316                        GenericSourceConnection::Postgres(_)
317                        | GenericSourceConnection::MySql(_)
318                        | GenericSourceConnection::SqlServer(_)
319                        | GenericSourceConnection::Kafka(_)
320                        | GenericSourceConnection::LoadGenerator(_) => {
321                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
322                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
323                                    .get(self.catalog().system_config().dyncfgs());
324
325                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
326                                {
327                                    return Err(AdapterError::Unsupported(
328                                        "sources in clusters with >1 replicas",
329                                    ));
330                                }
331                            }
332                        }
333                    }
334                }
335                plan::DataSourceDesc::Webhook { .. } => {
336                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
337                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
338                        let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
339                            .get(self.catalog().system_config().dyncfgs());
340
341                        if !enable_multi_replica_sources {
342                            if cluster.replica_ids().len() > 1 {
343                                return Err(AdapterError::Unsupported(
344                                    "webhook sources in clusters with >1 replicas",
345                                ));
346                            }
347                        }
348                    }
349                }
350                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
351            }
352
353            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
354            if let mz_sql::plan::DataSourceDesc::Webhook {
355                validate_using: Some(validate),
356                ..
357            } = &mut plan.source.data_source
358            {
359                if let Err(reason) = validate.reduce_expression().await {
360                    self.metrics
361                        .webhook_validation_reduce_failures
362                        .with_label_values(&[reason])
363                        .inc();
364                    return Err(AdapterError::Internal(format!(
365                        "failed to reduce check expression, {reason}"
366                    )));
367                }
368            }
369
370            // If this source contained a set of available source references, update the
371            // source references catalog table.
372            let mut reference_ops = vec![];
373            if let Some(references) = &available_source_references {
374                reference_ops.push(catalog::Op::UpdateSourceReferences {
375                    source_id: item_id,
376                    references: references.clone().into(),
377                });
378            }
379
380            let source = Source::new(plan, global_id, resolved_ids, None, false);
381            ops.push(catalog::Op::CreateItem {
382                id: item_id,
383                name,
384                item: CatalogItem::Source(source.clone()),
385                owner_id: *session.current_role_id(),
386            });
387            sources.push((item_id, source));
388            // These operations must be executed after the source is added to the catalog.
389            ops.extend(reference_ops);
390        }
391
392        Ok(CreateSourceInner {
393            ops,
394            sources,
395            if_not_exists_ids,
396        })
397    }
398
399    /// Subsources are planned differently from other statements because they
400    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
401    /// Because of this, we have usually "missed" the opportunity to plan them
402    /// through the normal statement execution life cycle (the exception being
403    /// during bootstrapping).
404    ///
405    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
406    pub(crate) fn plan_subsource(
407        &self,
408        session: &Session,
409        params: &mz_sql::plan::Params,
410        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
411        item_id: CatalogItemId,
412        global_id: GlobalId,
413    ) -> Result<CreateSourcePlanBundle, AdapterError> {
414        let catalog = self.catalog().for_session(session);
415        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
416
417        let plan = self.plan_statement(
418            session,
419            Statement::CreateSubsource(subsource_stmt),
420            params,
421            &resolved_ids,
422        )?;
423        let plan = match plan {
424            Plan::CreateSource(plan) => plan,
425            _ => unreachable!(),
426        };
427        Ok(CreateSourcePlanBundle {
428            item_id,
429            global_id,
430            plan,
431            resolved_ids,
432            available_source_references: None,
433        })
434    }
435
436    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
437    pub(crate) async fn plan_purified_alter_source_add_subsource(
438        &mut self,
439        session: &Session,
440        params: Params,
441        source_name: ResolvedItemName,
442        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
443        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
444    ) -> Result<(Plan, ResolvedIds), AdapterError> {
445        let mut subsource_plans = Vec::with_capacity(subsources.len());
446
447        // Generate subsource statements
448        let conn_catalog = self.catalog().for_system_session();
449        let pcx = plan::PlanContext::zero();
450        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
451
452        let entry = self.catalog().get_entry(source_name.item_id());
453        let source = entry.source().ok_or_else(|| {
454            AdapterError::internal(
455                "plan alter source",
456                format!("expected Source found {entry:?}"),
457            )
458        })?;
459
460        let item_id = entry.id();
461        let ingestion_id = source.global_id();
462        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
463
464        let id_ts = self.get_catalog_write_ts().await;
465        let ids = self
466            .catalog_mut()
467            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
468            .await?;
469        for (subsource_stmt, (item_id, global_id)) in
470            subsource_stmts.into_iter().zip_eq(ids.into_iter())
471        {
472            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
473            subsource_plans.push(s);
474        }
475
476        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
477            subsources: subsource_plans,
478            options,
479        };
480
481        Ok((
482            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
483                item_id,
484                ingestion_id,
485                action,
486            }),
487            ResolvedIds::empty(),
488        ))
489    }
490
491    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
492    pub(crate) fn plan_purified_alter_source_refresh_references(
493        &self,
494        _session: &Session,
495        _params: Params,
496        source_name: ResolvedItemName,
497        available_source_references: plan::SourceReferences,
498    ) -> Result<(Plan, ResolvedIds), AdapterError> {
499        let entry = self.catalog().get_entry(source_name.item_id());
500        let source = entry.source().ok_or_else(|| {
501            AdapterError::internal(
502                "plan alter source",
503                format!("expected Source found {entry:?}"),
504            )
505        })?;
506        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
507            references: available_source_references,
508        };
509
510        Ok((
511            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
512                item_id: entry.id(),
513                ingestion_id: source.global_id(),
514                action,
515            }),
516            ResolvedIds::empty(),
517        ))
518    }
519
520    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
521    /// the primary source, and any ingestion export subsources (e.g. PG
522    /// tables).
523    pub(crate) async fn plan_purified_create_source(
524        &mut self,
525        ctx: &ExecuteContext,
526        params: Params,
527        progress_stmt: CreateSubsourceStatement<Aug>,
528        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
529        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
530        available_source_references: plan::SourceReferences,
531    ) -> Result<(Plan, ResolvedIds), AdapterError> {
532        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
533
534        // 1. First plan the progress subsource.
535        //
536        // The primary source depends on this subsource because the primary
537        // source needs to know its shard ID, and the easiest way of
538        // guaranteeing that the shard ID is discoverable is to create this
539        // collection first.
540        assert_none!(progress_stmt.of_source);
541        let id_ts = self.get_catalog_write_ts().await;
542        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
543        let progress_plan =
544            self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
545        let progress_full_name = self
546            .catalog()
547            .resolve_full_name(&progress_plan.plan.name, None);
548        let progress_subsource = ResolvedItemName::Item {
549            id: progress_plan.item_id,
550            qualifiers: progress_plan.plan.name.qualifiers.clone(),
551            full_name: progress_full_name,
552            print_id: true,
553            version: RelationVersionSelector::Latest,
554        };
555
556        create_source_plans.push(progress_plan);
557
558        source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
559
560        let catalog = self.catalog().for_session(ctx.session());
561        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
562
563        let propagated_with_options: Vec<_> = source_stmt
564            .with_options
565            .iter()
566            .filter_map(|opt| match opt.name {
567                CreateSourceOptionName::TimestampInterval => None,
568                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
569                    name: CreateSubsourceOptionName::RetainHistory,
570                    value: opt.value.clone(),
571                }),
572            })
573            .collect();
574
575        // 2. Then plan the main source.
576        let source_plan = match self.plan_statement(
577            ctx.session(),
578            Statement::CreateSource(source_stmt),
579            &params,
580            &resolved_ids,
581        )? {
582            Plan::CreateSource(plan) => plan,
583            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
584        };
585
586        let id_ts = self.get_catalog_write_ts().await;
587        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
588
589        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
590        let of_source = ResolvedItemName::Item {
591            id: item_id,
592            qualifiers: source_plan.name.qualifiers.clone(),
593            full_name: source_full_name,
594            print_id: true,
595            version: RelationVersionSelector::Latest,
596        };
597
598        // Generate subsource statements
599        let conn_catalog = self.catalog().for_system_session();
600        let pcx = plan::PlanContext::zero();
601        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
602
603        let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
604
605        for subsource_stmt in subsource_stmts.iter_mut() {
606            subsource_stmt
607                .with_options
608                .extend(propagated_with_options.iter().cloned())
609        }
610
611        create_source_plans.push(CreateSourcePlanBundle {
612            item_id,
613            global_id,
614            plan: source_plan,
615            resolved_ids: resolved_ids.clone(),
616            available_source_references: Some(available_source_references),
617        });
618
619        // 3. Finally, plan all the subsources
620        let id_ts = self.get_catalog_write_ts().await;
621        let ids = self
622            .catalog_mut()
623            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
624            .await?;
625        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
626            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
627            create_source_plans.push(plan);
628        }
629
630        Ok((
631            Plan::CreateSources(create_source_plans),
632            ResolvedIds::empty(),
633        ))
634    }
635
636    #[instrument]
637    pub(super) async fn sequence_create_source(
638        &mut self,
639        ctx: &mut ExecuteContext,
640        plans: Vec<plan::CreateSourcePlanBundle>,
641    ) -> Result<ExecuteResponse, AdapterError> {
642        let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
643        let CreateSourceInner {
644            ops,
645            sources,
646            if_not_exists_ids,
647        } = self.create_source_inner(ctx.session(), plans).await?;
648
649        let transact_result = self
650            .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
651                Box::pin(async move {
652                    // TODO(roshan): This will be unnecessary when we drop support for
653                    // automatically created subsources.
654                    // To improve the performance of creating a source and many
655                    // subsources, we create all of the collections at once. If we
656                    // created each collection independently, we would reschedule
657                    // the primary source's execution for each subsources, causing
658                    // unncessary thrashing.
659                    //
660                    // Note that creating all of the collections at once should
661                    // never be semantic bearing; we should always be able to break
662                    // this apart into creating the collections sequentially.
663                    let mut collections = Vec::with_capacity(sources.len());
664
665                    for (item_id, source) in sources {
666                        let source_status_item_id =
667                            coord.catalog().resolve_builtin_storage_collection(
668                                &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
669                            );
670                        let source_status_collection_id = Some(
671                            coord
672                                .catalog()
673                                .get_entry(&source_status_item_id)
674                                .latest_global_id(),
675                        );
676
677                        let (data_source, status_collection_id) = match source.data_source {
678                            DataSourceDesc::Ingestion {
679                                ingestion_desc:
680                                    mz_sql::plan::Ingestion {
681                                        desc,
682                                        progress_subsource,
683                                    },
684                                cluster_id,
685                            } => {
686                                let desc = desc.into_inline_connection(coord.catalog().state());
687                                // TODO(parkmycar): We should probably check the type here, but I'm not
688                                // sure if this will always be a Source or a Table.
689                                let progress_subsource = coord
690                                    .catalog()
691                                    .get_entry(&progress_subsource)
692                                    .latest_global_id();
693
694                                let ingestion =
695                                    mz_storage_types::sources::IngestionDescription::new(
696                                        desc,
697                                        cluster_id,
698                                        progress_subsource,
699                                    );
700
701                                (
702                                    DataSource::Ingestion(ingestion),
703                                    source_status_collection_id,
704                                )
705                            }
706                            DataSourceDesc::IngestionExport {
707                                ingestion_id,
708                                external_reference: _,
709                                details,
710                                data_config,
711                            } => {
712                                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
713                                // this will always be a Source or a Table.
714                                let ingestion_id =
715                                    coord.catalog().get_entry(&ingestion_id).latest_global_id();
716                                (
717                                    DataSource::IngestionExport {
718                                        ingestion_id,
719                                        details,
720                                        data_config: data_config
721                                            .into_inline_connection(coord.catalog().state()),
722                                    },
723                                    source_status_collection_id,
724                                )
725                            }
726                            DataSourceDesc::Progress => (DataSource::Progress, None),
727                            DataSourceDesc::Webhook { .. } => {
728                                if let Some(url) =
729                                    coord.catalog().state().try_get_webhook_url(&item_id)
730                                {
731                                    if let Some(ctx) = ctx.as_ref() {
732                                        ctx.session()
733                                            .add_notice(AdapterNotice::WebhookSourceCreated { url })
734                                    }
735                                }
736
737                                (DataSource::Webhook, None)
738                            }
739                            DataSourceDesc::Introspection(_) => {
740                                unreachable!(
741                                    "cannot create sources with introspection data sources"
742                                )
743                            }
744                        };
745
746                        collections.push((
747                            source.global_id,
748                            CollectionDescription::<Timestamp> {
749                                desc: source.desc.clone(),
750                                data_source,
751                                timeline: Some(source.timeline),
752                                since: None,
753                                status_collection_id,
754                            },
755                        ));
756                    }
757
758                    let storage_metadata = coord.catalog.state().storage_metadata();
759
760                    coord
761                        .controller
762                        .storage
763                        .create_collections(storage_metadata, None, collections)
764                        .await
765                        .unwrap_or_terminate("cannot fail to create collections");
766
767                    // It is _very_ important that we only initialize read policies
768                    // after we have created all the sources/collections. Some of
769                    // the sources created in this collection might have
770                    // dependencies on other sources, so the controller must get a
771                    // chance to install read holds before we set a policy that
772                    // might make the since advance.
773                    //
774                    // One instance of this is the remap shard: it presents as a
775                    // SUBSOURCE, and all other SUBSOURCES of a SOURCE will depend
776                    // on it. Both subsources and sources will show up as a `Source`
777                    // in the above.
778                    // Although there should only be one parent source that sequence_create_source is
779                    // ever called with, hedge our bets a bit and collect the compaction windows for
780                    // each id in the bundle (these should all be identical). This is some extra work
781                    // but seems safer.
782                    let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
783                    for (compaction_window, storage_policies) in read_policies {
784                        coord
785                            .initialize_storage_read_policies(storage_policies, compaction_window)
786                            .await;
787                    }
788                })
789            })
790            .await;
791
792        match transact_result {
793            Ok(()) => Ok(ExecuteResponse::CreatedSource),
794            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
795                kind:
796                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
797            })) if if_not_exists_ids.contains_key(&id) => {
798                ctx.session()
799                    .add_notice(AdapterNotice::ObjectAlreadyExists {
800                        name: if_not_exists_ids[&id].item.clone(),
801                        ty: "source",
802                    });
803                Ok(ExecuteResponse::CreatedSource)
804            }
805            Err(err) => Err(err),
806        }
807    }
808
809    #[instrument]
810    pub(super) async fn sequence_create_connection(
811        &mut self,
812        mut ctx: ExecuteContext,
813        plan: plan::CreateConnectionPlan,
814        resolved_ids: ResolvedIds,
815    ) {
816        let id_ts = self.get_catalog_write_ts().await;
817        let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
818        {
819            Ok(item_id) => item_id,
820            Err(err) => return ctx.retire(Err(err.into())),
821        };
822
823        match &plan.connection.details {
824            ConnectionDetails::Ssh { key_1, key_2, .. } => {
825                let key_1 = match key_1.as_key_pair() {
826                    Some(key_1) => key_1.clone(),
827                    None => {
828                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
829                            "the PUBLIC KEY 1 option cannot be explicitly specified"
830                        ))));
831                    }
832                };
833
834                let key_2 = match key_2.as_key_pair() {
835                    Some(key_2) => key_2.clone(),
836                    None => {
837                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
838                            "the PUBLIC KEY 2 option cannot be explicitly specified"
839                        ))));
840                    }
841                };
842
843                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
844                let secret = key_set.to_bytes();
845                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
846                    return ctx.retire(Err(err.into()));
847                }
848            }
849            _ => (),
850        };
851
852        if plan.validate {
853            let internal_cmd_tx = self.internal_cmd_tx.clone();
854            let transient_revision = self.catalog().transient_revision();
855            let conn_id = ctx.session().conn_id().clone();
856            let otel_ctx = OpenTelemetryContext::obtain();
857            let role_metadata = ctx.session().role_metadata().clone();
858
859            let connection = plan
860                .connection
861                .details
862                .to_connection()
863                .into_inline_connection(self.catalog().state());
864
865            let current_storage_parameters = self.controller.storage.config().clone();
866            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
867                let result = match connection
868                    .validate(connection_id, &current_storage_parameters)
869                    .await
870                {
871                    Ok(()) => Ok(plan),
872                    Err(err) => Err(err.into()),
873                };
874
875                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
876                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
877                    CreateConnectionValidationReady {
878                        ctx,
879                        result,
880                        connection_id,
881                        connection_gid,
882                        plan_validity: PlanValidity::new(
883                            transient_revision,
884                            resolved_ids.items().copied().collect(),
885                            None,
886                            None,
887                            role_metadata,
888                        ),
889                        otel_ctx,
890                        resolved_ids: resolved_ids.clone(),
891                    },
892                ));
893                if let Err(e) = result {
894                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
895                }
896            });
897        } else {
898            let result = self
899                .sequence_create_connection_stage_finish(
900                    &mut ctx,
901                    connection_id,
902                    connection_gid,
903                    plan,
904                    resolved_ids,
905                )
906                .await;
907            ctx.retire(result);
908        }
909    }
910
911    #[instrument]
912    pub(crate) async fn sequence_create_connection_stage_finish(
913        &mut self,
914        ctx: &mut ExecuteContext,
915        connection_id: CatalogItemId,
916        connection_gid: GlobalId,
917        plan: plan::CreateConnectionPlan,
918        resolved_ids: ResolvedIds,
919    ) -> Result<ExecuteResponse, AdapterError> {
920        let ops = vec![catalog::Op::CreateItem {
921            id: connection_id,
922            name: plan.name.clone(),
923            item: CatalogItem::Connection(Connection {
924                create_sql: plan.connection.create_sql,
925                global_id: connection_gid,
926                details: plan.connection.details.clone(),
927                resolved_ids,
928            }),
929            owner_id: *ctx.session().current_role_id(),
930        }];
931
932        let transact_result = self
933            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
934                Box::pin(async move {
935                    match plan.connection.details {
936                        ConnectionDetails::AwsPrivatelink(ref privatelink) => {
937                            let spec = VpcEndpointConfig {
938                                aws_service_name: privatelink.service_name.to_owned(),
939                                availability_zone_ids: privatelink.availability_zones.to_owned(),
940                            };
941                            let cloud_resource_controller =
942                                match coord.cloud_resource_controller.as_ref().cloned() {
943                                    Some(controller) => controller,
944                                    None => {
945                                        tracing::warn!("AWS PrivateLink connections unsupported");
946                                        return;
947                                    }
948                                };
949                            if let Err(err) = cloud_resource_controller
950                                .ensure_vpc_endpoint(connection_id, spec)
951                                .await
952                            {
953                                tracing::warn!(?err, "failed to ensure vpc endpoint!");
954                            }
955                        }
956                        _ => {}
957                    }
958                })
959            })
960            .await;
961
962        match transact_result {
963            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
964            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
965                kind:
966                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
967            })) if plan.if_not_exists => {
968                ctx.session()
969                    .add_notice(AdapterNotice::ObjectAlreadyExists {
970                        name: plan.name.item,
971                        ty: "connection",
972                    });
973                Ok(ExecuteResponse::CreatedConnection)
974            }
975            Err(err) => Err(err),
976        }
977    }
978
979    #[instrument]
980    pub(super) async fn sequence_create_database(
981        &mut self,
982        session: &mut Session,
983        plan: plan::CreateDatabasePlan,
984    ) -> Result<ExecuteResponse, AdapterError> {
985        let ops = vec![catalog::Op::CreateDatabase {
986            name: plan.name.clone(),
987            owner_id: *session.current_role_id(),
988        }];
989        match self.catalog_transact(Some(session), ops).await {
990            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
991            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
992                kind:
993                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
994            })) if plan.if_not_exists => {
995                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
996                Ok(ExecuteResponse::CreatedDatabase)
997            }
998            Err(err) => Err(err),
999        }
1000    }
1001
1002    #[instrument]
1003    pub(super) async fn sequence_create_schema(
1004        &mut self,
1005        session: &mut Session,
1006        plan: plan::CreateSchemaPlan,
1007    ) -> Result<ExecuteResponse, AdapterError> {
1008        let op = catalog::Op::CreateSchema {
1009            database_id: plan.database_spec,
1010            schema_name: plan.schema_name.clone(),
1011            owner_id: *session.current_role_id(),
1012        };
1013        match self.catalog_transact(Some(session), vec![op]).await {
1014            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1015            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1016                kind:
1017                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1018            })) if plan.if_not_exists => {
1019                session.add_notice(AdapterNotice::SchemaAlreadyExists {
1020                    name: plan.schema_name,
1021                });
1022                Ok(ExecuteResponse::CreatedSchema)
1023            }
1024            Err(err) => Err(err),
1025        }
1026    }
1027
1028    /// Validates the role attributes for a `CREATE ROLE` statement.
1029    fn validate_role_attributes(&self, attributes: &RoleAttributes) -> Result<(), AdapterError> {
1030        if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1031            if attributes.superuser.is_some()
1032                || attributes.password.is_some()
1033                || attributes.login.is_some()
1034            {
1035                return Err(AdapterError::UnavailableFeature {
1036                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1037                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1038                });
1039            }
1040        }
1041        Ok(())
1042    }
1043
1044    #[instrument]
1045    pub(super) async fn sequence_create_role(
1046        &mut self,
1047        conn_id: Option<&ConnectionId>,
1048        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1049    ) -> Result<ExecuteResponse, AdapterError> {
1050        self.validate_role_attributes(&attributes.clone())?;
1051        let op = catalog::Op::CreateRole { name, attributes };
1052        self.catalog_transact_conn(conn_id, vec![op])
1053            .await
1054            .map(|_| ExecuteResponse::CreatedRole)
1055    }
1056
1057    #[instrument]
1058    pub(super) async fn sequence_create_network_policy(
1059        &mut self,
1060        session: &Session,
1061        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1062    ) -> Result<ExecuteResponse, AdapterError> {
1063        let op = catalog::Op::CreateNetworkPolicy {
1064            rules,
1065            name,
1066            owner_id: *session.current_role_id(),
1067        };
1068        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1069            .await
1070            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1071    }
1072
1073    #[instrument]
1074    pub(super) async fn sequence_alter_network_policy(
1075        &mut self,
1076        session: &Session,
1077        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1078    ) -> Result<ExecuteResponse, AdapterError> {
1079        // TODO(network_policy): Consider role based network policies here.
1080        let current_network_policy_name = self
1081            .owned_catalog()
1082            .system_config()
1083            .default_network_policy_name();
1084        // Check if the way we're alerting the policy is still valid for the current connection.
1085        if current_network_policy_name == name {
1086            self.validate_alter_network_policy(session, &rules)?;
1087        }
1088
1089        let op = catalog::Op::AlterNetworkPolicy {
1090            id,
1091            rules,
1092            name,
1093            owner_id: *session.current_role_id(),
1094        };
1095        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1096            .await
1097            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1098    }
1099
1100    #[instrument]
1101    pub(super) async fn sequence_create_table(
1102        &mut self,
1103        ctx: &mut ExecuteContext,
1104        plan: plan::CreateTablePlan,
1105        resolved_ids: ResolvedIds,
1106    ) -> Result<ExecuteResponse, AdapterError> {
1107        let plan::CreateTablePlan {
1108            name,
1109            table,
1110            if_not_exists,
1111        } = plan;
1112
1113        let conn_id = if table.temporary {
1114            Some(ctx.session().conn_id())
1115        } else {
1116            None
1117        };
1118        let id_ts = self.get_catalog_write_ts().await;
1119        let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1120        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1121
1122        let data_source = match table.data_source {
1123            plan::TableDataSource::TableWrites { defaults } => {
1124                TableDataSource::TableWrites { defaults }
1125            }
1126            plan::TableDataSource::DataSource {
1127                desc: data_source_plan,
1128                timeline,
1129            } => match data_source_plan {
1130                plan::DataSourceDesc::IngestionExport {
1131                    ingestion_id,
1132                    external_reference,
1133                    details,
1134                    data_config,
1135                } => TableDataSource::DataSource {
1136                    desc: DataSourceDesc::IngestionExport {
1137                        ingestion_id,
1138                        external_reference,
1139                        details,
1140                        data_config,
1141                    },
1142                    timeline,
1143                },
1144                plan::DataSourceDesc::Webhook {
1145                    validate_using,
1146                    body_format,
1147                    headers,
1148                    cluster_id,
1149                } => TableDataSource::DataSource {
1150                    desc: DataSourceDesc::Webhook {
1151                        validate_using,
1152                        body_format,
1153                        headers,
1154                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1155                    },
1156                    timeline,
1157                },
1158                o => {
1159                    unreachable!("CREATE TABLE data source got {:?}", o)
1160                }
1161            },
1162        };
1163        let table = Table {
1164            create_sql: Some(table.create_sql),
1165            desc: table.desc,
1166            collections,
1167            conn_id: conn_id.cloned(),
1168            resolved_ids,
1169            custom_logical_compaction_window: table.compaction_window,
1170            is_retained_metrics_object: false,
1171            data_source,
1172        };
1173        let ops = vec![catalog::Op::CreateItem {
1174            id: table_id,
1175            name: name.clone(),
1176            item: CatalogItem::Table(table.clone()),
1177            owner_id: *ctx.session().current_role_id(),
1178        }];
1179
1180        let catalog_result = self
1181            .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1182                Box::pin(async move {
1183                    // The table data_source determines whether this table will be written to
1184                    // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1185                    // (e.g. a source-fed table).
1186                    let (collections, register_ts, read_policies) = match table.data_source {
1187                        TableDataSource::TableWrites { defaults: _ } => {
1188                            // Determine the initial validity for the table.
1189                            let register_ts = coord.get_local_write_ts().await.timestamp;
1190
1191                            // After acquiring `register_ts` but before using it, we need to
1192                            // be sure we're still the leader. Otherwise a new generation
1193                            // may also be trying to use `register_ts` for a different
1194                            // purpose.
1195                            //
1196                            // See database-issues#8273.
1197                            coord
1198                                .catalog
1199                                .confirm_leadership()
1200                                .await
1201                                .unwrap_or_terminate("unable to confirm leadership");
1202
1203                            if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1204                                coord.set_statement_execution_timestamp(id, register_ts);
1205                            }
1206
1207                            // When initially creating a table it should only have a single version.
1208                            let relation_version = RelationVersion::root();
1209                            assert_eq!(table.desc.latest_version(), relation_version);
1210                            let relation_desc = table
1211                                .desc
1212                                .at_version(RelationVersionSelector::Specific(relation_version));
1213                            // We assert above we have a single version, and thus we are the primary.
1214                            let collection_desc =
1215                                CollectionDescription::for_table(relation_desc, None);
1216                            let collections = vec![(global_id, collection_desc)];
1217
1218                            let compaction_window = table
1219                                .custom_logical_compaction_window
1220                                .unwrap_or(CompactionWindow::Default);
1221                            let read_policies =
1222                                BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1223
1224                            (collections, Some(register_ts), read_policies)
1225                        }
1226                        TableDataSource::DataSource {
1227                            desc: data_source,
1228                            timeline,
1229                        } => {
1230                            match data_source {
1231                                DataSourceDesc::IngestionExport {
1232                                    ingestion_id,
1233                                    external_reference: _,
1234                                    details,
1235                                    data_config,
1236                                } => {
1237                                    // TODO: It's a little weird that a table will be present in this
1238                                    // source status collection, we might want to split out into a separate
1239                                    // status collection.
1240                                    let source_status_item_id =
1241                                        coord.catalog().resolve_builtin_storage_collection(
1242                                            &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1243                                        );
1244                                    let status_collection_id = Some(
1245                                        coord
1246                                            .catalog()
1247                                            .get_entry(&source_status_item_id)
1248                                            .latest_global_id(),
1249                                    );
1250                                    // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1251                                    // this will always be a Source or a Table.
1252                                    let ingestion_id =
1253                                        coord.catalog().get_entry(&ingestion_id).latest_global_id();
1254                                    // Create the underlying collection with the latest schema from the Table.
1255                                    let collection_desc = CollectionDescription::<Timestamp> {
1256                                        desc: table
1257                                            .desc
1258                                            .at_version(RelationVersionSelector::Latest),
1259                                        data_source: DataSource::IngestionExport {
1260                                            ingestion_id,
1261                                            details,
1262                                            data_config: data_config
1263                                                .into_inline_connection(coord.catalog.state()),
1264                                        },
1265                                        since: None,
1266                                        status_collection_id,
1267                                        timeline: Some(timeline.clone()),
1268                                    };
1269
1270                                    let collections = vec![(global_id, collection_desc)];
1271                                    let read_policies = coord
1272                                        .catalog()
1273                                        .state()
1274                                        .source_compaction_windows(vec![table_id]);
1275
1276                                    (collections, None, read_policies)
1277                                }
1278                                DataSourceDesc::Webhook { .. } => {
1279                                    if let Some(url) =
1280                                        coord.catalog().state().try_get_webhook_url(&table_id)
1281                                    {
1282                                        if let Some(ctx) = ctx.as_ref() {
1283                                            ctx.session().add_notice(
1284                                                AdapterNotice::WebhookSourceCreated { url },
1285                                            )
1286                                        }
1287                                    }
1288
1289                                    // Create the underlying collection with the latest schema from the Table.
1290                                    assert_eq!(
1291                                        table.desc.latest_version(),
1292                                        RelationVersion::root(),
1293                                        "found webhook with more than 1 relation version, {:?}",
1294                                        table.desc
1295                                    );
1296                                    let desc = table.desc.latest();
1297
1298                                    let collection_desc = CollectionDescription {
1299                                        desc,
1300                                        data_source: DataSource::Webhook,
1301                                        since: None,
1302                                        status_collection_id: None,
1303                                        timeline: Some(timeline.clone()),
1304                                    };
1305                                    let collections = vec![(global_id, collection_desc)];
1306                                    let read_policies = coord
1307                                        .catalog()
1308                                        .state()
1309                                        .source_compaction_windows(vec![table_id]);
1310
1311                                    (collections, None, read_policies)
1312                                }
1313                                _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1314                            }
1315                        }
1316                    };
1317
1318                    // Create the collections.
1319                    let storage_metadata = coord.catalog.state().storage_metadata();
1320                    coord
1321                        .controller
1322                        .storage
1323                        .create_collections(storage_metadata, register_ts, collections)
1324                        .await
1325                        .unwrap_or_terminate("cannot fail to create collections");
1326
1327                    // Mark the register timestamp as completed.
1328                    if let Some(register_ts) = register_ts {
1329                        coord.apply_local_write(register_ts).await;
1330                    }
1331
1332                    // Initialize the Read Policies.
1333                    for (compaction_window, storage_policies) in read_policies {
1334                        coord
1335                            .initialize_storage_read_policies(storage_policies, compaction_window)
1336                            .await;
1337                    }
1338                })
1339            })
1340            .await;
1341
1342        match catalog_result {
1343            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1344            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1345                kind:
1346                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1347            })) if if_not_exists => {
1348                ctx.session_mut()
1349                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1350                        name: name.item,
1351                        ty: "table",
1352                    });
1353                Ok(ExecuteResponse::CreatedTable)
1354            }
1355            Err(err) => Err(err),
1356        }
1357    }
1358
1359    #[instrument]
1360    pub(super) async fn sequence_create_sink(
1361        &mut self,
1362        ctx: ExecuteContext,
1363        plan: plan::CreateSinkPlan,
1364        resolved_ids: ResolvedIds,
1365    ) {
1366        let plan::CreateSinkPlan {
1367            name,
1368            sink,
1369            with_snapshot,
1370            if_not_exists,
1371            in_cluster,
1372        } = plan;
1373
1374        // First try to allocate an ID and an OID. If either fails, we're done.
1375        let id_ts = self.get_catalog_write_ts().await;
1376        let (item_id, global_id) =
1377            return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1378
1379        let catalog_sink = Sink {
1380            create_sql: sink.create_sql,
1381            global_id,
1382            from: sink.from,
1383            connection: sink.connection,
1384            envelope: sink.envelope,
1385            version: sink.version,
1386            with_snapshot,
1387            resolved_ids,
1388            cluster_id: in_cluster,
1389        };
1390
1391        let ops = vec![catalog::Op::CreateItem {
1392            id: item_id,
1393            name: name.clone(),
1394            item: CatalogItem::Sink(catalog_sink.clone()),
1395            owner_id: *ctx.session().current_role_id(),
1396        }];
1397
1398        let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1399        if let Err(e) = self
1400            .controller
1401            .storage
1402            .check_exists(sink.from)
1403            .map_err(|e| match e {
1404                StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1405                    "{} is a {}, which cannot be exported as a sink",
1406                    from.name().item.clone(),
1407                    from.item().typ().to_string(),
1408                )),
1409                e => AdapterError::Storage(e),
1410            })
1411        {
1412            ctx.retire(Err(e));
1413            return;
1414        }
1415
1416        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1417
1418        match result {
1419            Ok(()) => {}
1420            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1421                kind:
1422                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1423            })) if if_not_exists => {
1424                ctx.session()
1425                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1426                        name: name.item,
1427                        ty: "sink",
1428                    });
1429                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1430                return;
1431            }
1432            Err(e) => {
1433                ctx.retire(Err(e));
1434                return;
1435            }
1436        };
1437
1438        self.create_storage_export(global_id, &catalog_sink)
1439            .await
1440            .unwrap_or_terminate("cannot fail to create exports");
1441
1442        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1443            .await;
1444
1445        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1446    }
1447
1448    /// Validates that a view definition does not contain any expressions that may lead to
1449    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1450    ///
1451    /// We prevent these expressions so that we can add columns to system tables without
1452    /// changing the definition of the view.
1453    ///
1454    /// Here is a bit of a hand wavy proof as to why we only need to check the
1455    /// immediate view definition for system objects and ambiguous column
1456    /// references, and not the entire dependency tree:
1457    ///
1458    ///   - A view with no object references cannot have any ambiguous column
1459    ///   references to a system object, because it has no system objects.
1460    ///   - A view with a direct reference to a system object and a * or
1461    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1462    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1463    ///   any ambiguous column references to a system object, because all column
1464    ///   references are explicitly named.
1465    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1466    ///   system object cannot have any ambiguous column references to a system
1467    ///   object, because there are no system objects in the top level view and
1468    ///   all sub-views are guaranteed to have no ambiguous column references to
1469    ///   system objects.
1470    pub(super) fn validate_system_column_references(
1471        &self,
1472        uses_ambiguous_columns: bool,
1473        depends_on: &BTreeSet<GlobalId>,
1474    ) -> Result<(), AdapterError> {
1475        if uses_ambiguous_columns
1476            && depends_on
1477                .iter()
1478                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1479        {
1480            Err(AdapterError::AmbiguousSystemColumnReference)
1481        } else {
1482            Ok(())
1483        }
1484    }
1485
1486    #[instrument]
1487    pub(super) async fn sequence_create_type(
1488        &mut self,
1489        session: &Session,
1490        plan: plan::CreateTypePlan,
1491        resolved_ids: ResolvedIds,
1492    ) -> Result<ExecuteResponse, AdapterError> {
1493        let id_ts = self.get_catalog_write_ts().await;
1494        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1495        let typ = Type {
1496            create_sql: Some(plan.typ.create_sql),
1497            global_id,
1498            desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1499            details: CatalogTypeDetails {
1500                array_id: None,
1501                typ: plan.typ.inner,
1502                pg_metadata: None,
1503            },
1504            resolved_ids,
1505        };
1506        let op = catalog::Op::CreateItem {
1507            id: item_id,
1508            name: plan.name,
1509            item: CatalogItem::Type(typ),
1510            owner_id: *session.current_role_id(),
1511        };
1512        match self.catalog_transact(Some(session), vec![op]).await {
1513            Ok(()) => Ok(ExecuteResponse::CreatedType),
1514            Err(err) => Err(err),
1515        }
1516    }
1517
1518    #[instrument]
1519    pub(super) async fn sequence_comment_on(
1520        &mut self,
1521        session: &Session,
1522        plan: plan::CommentPlan,
1523    ) -> Result<ExecuteResponse, AdapterError> {
1524        let op = catalog::Op::Comment {
1525            object_id: plan.object_id,
1526            sub_component: plan.sub_component,
1527            comment: plan.comment,
1528        };
1529        self.catalog_transact(Some(session), vec![op]).await?;
1530        Ok(ExecuteResponse::Comment)
1531    }
1532
1533    #[instrument]
1534    pub(super) async fn sequence_drop_objects(
1535        &mut self,
1536        session: &Session,
1537        plan::DropObjectsPlan {
1538            drop_ids,
1539            object_type,
1540            referenced_ids,
1541        }: plan::DropObjectsPlan,
1542    ) -> Result<ExecuteResponse, AdapterError> {
1543        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1544        let mut objects = Vec::new();
1545        for obj_id in &drop_ids {
1546            if !referenced_ids_hashset.contains(obj_id) {
1547                let object_info = ErrorMessageObjectDescription::from_id(
1548                    obj_id,
1549                    &self.catalog().for_session(session),
1550                )
1551                .to_string();
1552                objects.push(object_info);
1553            }
1554        }
1555
1556        if !objects.is_empty() {
1557            session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1558        }
1559
1560        let DropOps {
1561            ops,
1562            dropped_active_db,
1563            dropped_active_cluster,
1564            dropped_in_use_indexes,
1565        } = self.sequence_drop_common(session, drop_ids)?;
1566
1567        self.catalog_transact(Some(session), ops).await?;
1568
1569        fail::fail_point!("after_sequencer_drop_replica");
1570
1571        if dropped_active_db {
1572            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1573                name: session.vars().database().to_string(),
1574            });
1575        }
1576        if dropped_active_cluster {
1577            session.add_notice(AdapterNotice::DroppedActiveCluster {
1578                name: session.vars().cluster().to_string(),
1579            });
1580        }
1581        for dropped_in_use_index in dropped_in_use_indexes {
1582            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1583            self.metrics
1584                .optimization_notices
1585                .with_label_values(&["DroppedInUseIndex"])
1586                .inc_by(1);
1587        }
1588        Ok(ExecuteResponse::DroppedObject(object_type))
1589    }
1590
1591    fn validate_dropped_role_ownership(
1592        &self,
1593        session: &Session,
1594        dropped_roles: &BTreeMap<RoleId, &str>,
1595    ) -> Result<(), AdapterError> {
1596        fn privilege_check(
1597            privileges: &PrivilegeMap,
1598            dropped_roles: &BTreeMap<RoleId, &str>,
1599            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1600            object_id: &SystemObjectId,
1601            catalog: &ConnCatalog,
1602        ) {
1603            for privilege in privileges.all_values() {
1604                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1605                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1606                    let object_description =
1607                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1608                    dependent_objects
1609                        .entry(role_name.to_string())
1610                        .or_default()
1611                        .push(format!(
1612                            "privileges on {object_description} granted by {grantor_name}",
1613                        ));
1614                }
1615                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1616                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1617                    let object_description =
1618                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1619                    dependent_objects
1620                        .entry(role_name.to_string())
1621                        .or_default()
1622                        .push(format!(
1623                            "privileges granted on {object_description} to {grantee_name}"
1624                        ));
1625                }
1626            }
1627        }
1628
1629        let catalog = self.catalog().for_session(session);
1630        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1631        for entry in self.catalog.entries() {
1632            let id = SystemObjectId::Object(entry.id().into());
1633            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1634                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1635                dependent_objects
1636                    .entry(role_name.to_string())
1637                    .or_default()
1638                    .push(format!("owner of {object_description}"));
1639            }
1640            privilege_check(
1641                entry.privileges(),
1642                dropped_roles,
1643                &mut dependent_objects,
1644                &id,
1645                &catalog,
1646            );
1647        }
1648        for database in self.catalog.databases() {
1649            let database_id = SystemObjectId::Object(database.id().into());
1650            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1651                let object_description =
1652                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1653                dependent_objects
1654                    .entry(role_name.to_string())
1655                    .or_default()
1656                    .push(format!("owner of {object_description}"));
1657            }
1658            privilege_check(
1659                &database.privileges,
1660                dropped_roles,
1661                &mut dependent_objects,
1662                &database_id,
1663                &catalog,
1664            );
1665            for schema in database.schemas_by_id.values() {
1666                let schema_id = SystemObjectId::Object(
1667                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1668                );
1669                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1670                    let object_description =
1671                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1672                    dependent_objects
1673                        .entry(role_name.to_string())
1674                        .or_default()
1675                        .push(format!("owner of {object_description}"));
1676                }
1677                privilege_check(
1678                    &schema.privileges,
1679                    dropped_roles,
1680                    &mut dependent_objects,
1681                    &schema_id,
1682                    &catalog,
1683                );
1684            }
1685        }
1686        for cluster in self.catalog.clusters() {
1687            let cluster_id = SystemObjectId::Object(cluster.id().into());
1688            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1689                let object_description =
1690                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1691                dependent_objects
1692                    .entry(role_name.to_string())
1693                    .or_default()
1694                    .push(format!("owner of {object_description}"));
1695            }
1696            privilege_check(
1697                &cluster.privileges,
1698                dropped_roles,
1699                &mut dependent_objects,
1700                &cluster_id,
1701                &catalog,
1702            );
1703            for replica in cluster.replicas() {
1704                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1705                    let replica_id =
1706                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1707                    let object_description =
1708                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1709                    dependent_objects
1710                        .entry(role_name.to_string())
1711                        .or_default()
1712                        .push(format!("owner of {object_description}"));
1713                }
1714            }
1715        }
1716        privilege_check(
1717            self.catalog().system_privileges(),
1718            dropped_roles,
1719            &mut dependent_objects,
1720            &SystemObjectId::System,
1721            &catalog,
1722        );
1723        for (default_privilege_object, default_privilege_acl_items) in
1724            self.catalog.default_privileges()
1725        {
1726            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1727                dependent_objects
1728                    .entry(role_name.to_string())
1729                    .or_default()
1730                    .push(format!(
1731                        "default privileges on {}S created by {}",
1732                        default_privilege_object.object_type, role_name
1733                    ));
1734            }
1735            for default_privilege_acl_item in default_privilege_acl_items {
1736                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1737                    dependent_objects
1738                        .entry(role_name.to_string())
1739                        .or_default()
1740                        .push(format!(
1741                            "default privileges on {}S granted to {}",
1742                            default_privilege_object.object_type, role_name
1743                        ));
1744                }
1745            }
1746        }
1747
1748        if !dependent_objects.is_empty() {
1749            Err(AdapterError::DependentObject(dependent_objects))
1750        } else {
1751            Ok(())
1752        }
1753    }
1754
1755    #[instrument]
1756    pub(super) async fn sequence_drop_owned(
1757        &mut self,
1758        session: &Session,
1759        plan: plan::DropOwnedPlan,
1760    ) -> Result<ExecuteResponse, AdapterError> {
1761        for role_id in &plan.role_ids {
1762            self.catalog().ensure_not_reserved_role(role_id)?;
1763        }
1764
1765        let mut privilege_revokes = plan.privilege_revokes;
1766
1767        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1768        let session_catalog = self.catalog().for_session(session);
1769        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1770            && !session.is_superuser()
1771        {
1772            // Obtain all roles that the current session is a member of.
1773            let role_membership =
1774                session_catalog.collect_role_membership(session.current_role_id());
1775            let invalid_revokes: BTreeSet<_> = privilege_revokes
1776                .extract_if(.., |(_, privilege)| {
1777                    !role_membership.contains(&privilege.grantor)
1778                })
1779                .map(|(object_id, _)| object_id)
1780                .collect();
1781            for invalid_revoke in invalid_revokes {
1782                let object_description =
1783                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1784                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1785            }
1786        }
1787
1788        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1789            catalog::Op::UpdatePrivilege {
1790                target_id: object_id,
1791                privilege,
1792                variant: UpdatePrivilegeVariant::Revoke,
1793            }
1794        });
1795        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1796            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1797                privilege_object,
1798                privilege_acl_item,
1799                variant: UpdatePrivilegeVariant::Revoke,
1800            },
1801        );
1802        let DropOps {
1803            ops: drop_ops,
1804            dropped_active_db,
1805            dropped_active_cluster,
1806            dropped_in_use_indexes,
1807        } = self.sequence_drop_common(session, plan.drop_ids)?;
1808
1809        let ops = privilege_revoke_ops
1810            .chain(default_privilege_revoke_ops)
1811            .chain(drop_ops.into_iter())
1812            .collect();
1813
1814        self.catalog_transact(Some(session), ops).await?;
1815
1816        if dropped_active_db {
1817            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1818                name: session.vars().database().to_string(),
1819            });
1820        }
1821        if dropped_active_cluster {
1822            session.add_notice(AdapterNotice::DroppedActiveCluster {
1823                name: session.vars().cluster().to_string(),
1824            });
1825        }
1826        for dropped_in_use_index in dropped_in_use_indexes {
1827            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1828        }
1829        Ok(ExecuteResponse::DroppedOwned)
1830    }
1831
1832    fn sequence_drop_common(
1833        &self,
1834        session: &Session,
1835        ids: Vec<ObjectId>,
1836    ) -> Result<DropOps, AdapterError> {
1837        let mut dropped_active_db = false;
1838        let mut dropped_active_cluster = false;
1839        let mut dropped_in_use_indexes = Vec::new();
1840        let mut dropped_roles = BTreeMap::new();
1841        let mut dropped_databases = BTreeSet::new();
1842        let mut dropped_schemas = BTreeSet::new();
1843        // Dropping either the group role or the member role of a role membership will trigger a
1844        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1845        // role membership twice.
1846        let mut role_revokes = BTreeSet::new();
1847        // Dropping a database or a schema will revoke all default roles associated with that
1848        // database or schema.
1849        let mut default_privilege_revokes = BTreeSet::new();
1850
1851        // Clusters we're dropping
1852        let mut clusters_to_drop = BTreeSet::new();
1853
1854        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1855        for id in &ids {
1856            match id {
1857                ObjectId::Database(id) => {
1858                    let name = self.catalog().get_database(id).name();
1859                    if name == session.vars().database() {
1860                        dropped_active_db = true;
1861                    }
1862                    dropped_databases.insert(id);
1863                }
1864                ObjectId::Schema((_, spec)) => {
1865                    if let SchemaSpecifier::Id(id) = spec {
1866                        dropped_schemas.insert(id);
1867                    }
1868                }
1869                ObjectId::Cluster(id) => {
1870                    clusters_to_drop.insert(*id);
1871                    if let Some(active_id) = self
1872                        .catalog()
1873                        .active_cluster(session)
1874                        .ok()
1875                        .map(|cluster| cluster.id())
1876                    {
1877                        if id == &active_id {
1878                            dropped_active_cluster = true;
1879                        }
1880                    }
1881                }
1882                ObjectId::Role(id) => {
1883                    let role = self.catalog().get_role(id);
1884                    let name = role.name();
1885                    dropped_roles.insert(*id, name);
1886                    // We must revoke all role memberships that the dropped roles belongs to.
1887                    for (group_id, grantor_id) in &role.membership.map {
1888                        role_revokes.insert((*group_id, *id, *grantor_id));
1889                    }
1890                }
1891                ObjectId::Item(id) => {
1892                    if let Some(index) = self.catalog().get_entry(id).index() {
1893                        let humanizer = self.catalog().for_session(session);
1894                        let dependants = self
1895                            .controller
1896                            .compute
1897                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1898                            .ok()
1899                            .into_iter()
1900                            .flatten()
1901                            .filter(|dependant_id| {
1902                                // Transient Ids belong to Peeks. We are not interested for now in
1903                                // peeks depending on a dropped index.
1904                                // TODO: show a different notice in this case. Something like
1905                                // "There is an in-progress ad hoc SELECT that uses the dropped
1906                                // index. The resources used by the index will be freed when all
1907                                // such SELECTs complete."
1908                                if dependant_id.is_transient() {
1909                                    return false;
1910                                }
1911                                // The item should exist, but don't panic if it doesn't.
1912                                let Some(dependent_id) = humanizer
1913                                    .try_get_item_by_global_id(dependant_id)
1914                                    .map(|item| item.id())
1915                                else {
1916                                    return false;
1917                                };
1918                                // If the dependent object is also being dropped, then there is no
1919                                // problem, so we don't want a notice.
1920                                !ids_set.contains(&ObjectId::Item(dependent_id))
1921                            })
1922                            .flat_map(|dependant_id| {
1923                                // If we are not able to find a name for this ID it probably means
1924                                // we have already dropped the compute collection, in which case we
1925                                // can ignore it.
1926                                humanizer.humanize_id(dependant_id)
1927                            })
1928                            .collect_vec();
1929                        if !dependants.is_empty() {
1930                            dropped_in_use_indexes.push(DroppedInUseIndex {
1931                                index_name: humanizer
1932                                    .humanize_id(index.global_id())
1933                                    .unwrap_or_else(|| id.to_string()),
1934                                dependant_objects: dependants,
1935                            });
1936                        }
1937                    }
1938                }
1939                _ => {}
1940            }
1941        }
1942
1943        for id in &ids {
1944            match id {
1945                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1946                // unless they are internal replicas, which exist outside the scope
1947                // of managed clusters.
1948                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1949                    if !clusters_to_drop.contains(cluster_id) {
1950                        let cluster = self.catalog.get_cluster(*cluster_id);
1951                        if cluster.is_managed() {
1952                            let replica =
1953                                cluster.replica(*replica_id).expect("Catalog out of sync");
1954                            if !replica.config.location.internal() {
1955                                coord_bail!("cannot drop replica of managed cluster");
1956                            }
1957                        }
1958                    }
1959                }
1960                _ => {}
1961            }
1962        }
1963
1964        for role_id in dropped_roles.keys() {
1965            self.catalog().ensure_not_reserved_role(role_id)?;
1966        }
1967        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1968        // If any role is a member of a dropped role, then we must revoke that membership.
1969        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1970        for role in self.catalog().user_roles() {
1971            for dropped_role_id in
1972                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1973            {
1974                role_revokes.insert((
1975                    **dropped_role_id,
1976                    role.id(),
1977                    *role
1978                        .membership
1979                        .map
1980                        .get(*dropped_role_id)
1981                        .expect("included in keys above"),
1982                ));
1983            }
1984        }
1985
1986        for (default_privilege_object, default_privilege_acls) in
1987            self.catalog().default_privileges()
1988        {
1989            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1990                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1991            {
1992                for default_privilege_acl in default_privilege_acls {
1993                    default_privilege_revokes.insert((
1994                        default_privilege_object.clone(),
1995                        default_privilege_acl.clone(),
1996                    ));
1997                }
1998            }
1999        }
2000
2001        let ops = role_revokes
2002            .into_iter()
2003            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2004                role_id,
2005                member_id,
2006                grantor_id,
2007            })
2008            .chain(default_privilege_revokes.into_iter().map(
2009                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2010                    privilege_object,
2011                    privilege_acl_item,
2012                    variant: UpdatePrivilegeVariant::Revoke,
2013                },
2014            ))
2015            .chain(iter::once(catalog::Op::DropObjects(
2016                ids.into_iter()
2017                    .map(DropObjectInfo::manual_drop_from_object_id)
2018                    .collect(),
2019            )))
2020            .collect();
2021
2022        Ok(DropOps {
2023            ops,
2024            dropped_active_db,
2025            dropped_active_cluster,
2026            dropped_in_use_indexes,
2027        })
2028    }
2029
2030    pub(super) fn sequence_explain_schema(
2031        &self,
2032        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2033    ) -> Result<ExecuteResponse, AdapterError> {
2034        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2035            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2036        })?;
2037
2038        let json_string = json_string(&json_value);
2039        let row = Row::pack_slice(&[Datum::String(&json_string)]);
2040        Ok(Self::send_immediate_rows(row))
2041    }
2042
2043    pub(super) fn sequence_show_all_variables(
2044        &self,
2045        session: &Session,
2046    ) -> Result<ExecuteResponse, AdapterError> {
2047        let mut rows = viewable_variables(self.catalog().state(), session)
2048            .map(|v| (v.name(), v.value(), v.description()))
2049            .collect::<Vec<_>>();
2050        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2051
2052        // TODO(parkmycar): Pack all of these into a single RowCollection.
2053        let rows: Vec<_> = rows
2054            .into_iter()
2055            .map(|(name, val, desc)| {
2056                Row::pack_slice(&[
2057                    Datum::String(name),
2058                    Datum::String(&val),
2059                    Datum::String(desc),
2060                ])
2061            })
2062            .collect();
2063        Ok(Self::send_immediate_rows(rows))
2064    }
2065
2066    pub(super) fn sequence_show_variable(
2067        &self,
2068        session: &Session,
2069        plan: plan::ShowVariablePlan,
2070    ) -> Result<ExecuteResponse, AdapterError> {
2071        if &plan.name == SCHEMA_ALIAS {
2072            let schemas = self.catalog.resolve_search_path(session);
2073            let schema = schemas.first();
2074            return match schema {
2075                Some((database_spec, schema_spec)) => {
2076                    let schema_name = &self
2077                        .catalog
2078                        .get_schema(database_spec, schema_spec, session.conn_id())
2079                        .name()
2080                        .schema;
2081                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
2082                    Ok(Self::send_immediate_rows(row))
2083                }
2084                None => {
2085                    if session.vars().current_object_missing_warnings() {
2086                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2087                            search_path: session
2088                                .vars()
2089                                .search_path()
2090                                .into_iter()
2091                                .map(|schema| schema.to_string())
2092                                .collect(),
2093                        });
2094                    }
2095                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2096                }
2097            };
2098        }
2099
2100        let variable = session
2101            .vars()
2102            .get(self.catalog().system_config(), &plan.name)
2103            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2104
2105        // In lieu of plumbing the user to all system config functions, just check that the var is
2106        // visible.
2107        variable.visible(session.user(), self.catalog().system_config())?;
2108
2109        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2110        if variable.name() == vars::DATABASE.name()
2111            && matches!(
2112                self.catalog().resolve_database(&variable.value()),
2113                Err(CatalogError::UnknownDatabase(_))
2114            )
2115            && session.vars().current_object_missing_warnings()
2116        {
2117            let name = variable.value();
2118            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2119        } else if variable.name() == vars::CLUSTER.name()
2120            && matches!(
2121                self.catalog().resolve_cluster(&variable.value()),
2122                Err(CatalogError::UnknownCluster(_))
2123            )
2124            && session.vars().current_object_missing_warnings()
2125        {
2126            let name = variable.value();
2127            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2128        }
2129        Ok(Self::send_immediate_rows(row))
2130    }
2131
2132    #[instrument]
2133    pub(super) async fn sequence_inspect_shard(
2134        &self,
2135        session: &Session,
2136        plan: plan::InspectShardPlan,
2137    ) -> Result<ExecuteResponse, AdapterError> {
2138        // TODO: Not thrilled about this rbac special case here, but probably
2139        // sufficient for now.
2140        if !session.user().is_internal() {
2141            return Err(AdapterError::Unauthorized(
2142                rbac::UnauthorizedError::MzSystem {
2143                    action: "inspect".into(),
2144                },
2145            ));
2146        }
2147        let state = self
2148            .controller
2149            .storage
2150            .inspect_persist_state(plan.id)
2151            .await?;
2152        let jsonb = Jsonb::from_serde_json(state)?;
2153        Ok(Self::send_immediate_rows(jsonb.into_row()))
2154    }
2155
2156    #[instrument]
2157    pub(super) fn sequence_set_variable(
2158        &self,
2159        session: &mut Session,
2160        plan: plan::SetVariablePlan,
2161    ) -> Result<ExecuteResponse, AdapterError> {
2162        let (name, local) = (plan.name, plan.local);
2163        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2164            self.validate_set_isolation_level(session)?;
2165        }
2166        if &name == vars::CLUSTER.name() {
2167            self.validate_set_cluster(session)?;
2168        }
2169
2170        let vars = session.vars_mut();
2171        let values = match plan.value {
2172            plan::VariableValue::Default => None,
2173            plan::VariableValue::Values(values) => Some(values),
2174        };
2175
2176        match values {
2177            Some(values) => {
2178                vars.set(
2179                    self.catalog().system_config(),
2180                    &name,
2181                    VarInput::SqlSet(&values),
2182                    local,
2183                )?;
2184
2185                let vars = session.vars();
2186
2187                // Emit a warning when deprecated variables are used.
2188                // TODO(database-issues#8069) remove this after sufficient time has passed
2189                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2190                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2191                } else if name == vars::CLUSTER.name()
2192                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2193                {
2194                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2195                }
2196
2197                // Database or cluster value does not correspond to a catalog item.
2198                if name.as_str() == vars::DATABASE.name()
2199                    && matches!(
2200                        self.catalog().resolve_database(vars.database()),
2201                        Err(CatalogError::UnknownDatabase(_))
2202                    )
2203                    && session.vars().current_object_missing_warnings()
2204                {
2205                    let name = vars.database().to_string();
2206                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2207                } else if name.as_str() == vars::CLUSTER.name()
2208                    && matches!(
2209                        self.catalog().resolve_cluster(vars.cluster()),
2210                        Err(CatalogError::UnknownCluster(_))
2211                    )
2212                    && session.vars().current_object_missing_warnings()
2213                {
2214                    let name = vars.cluster().to_string();
2215                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2216                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2217                    let v = values.into_first().to_lowercase();
2218                    if v == IsolationLevel::ReadUncommitted.as_str()
2219                        || v == IsolationLevel::ReadCommitted.as_str()
2220                        || v == IsolationLevel::RepeatableRead.as_str()
2221                    {
2222                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2223                            isolation_level: v,
2224                        });
2225                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2226                        session.add_notice(AdapterNotice::StrongSessionSerializable);
2227                    }
2228                }
2229            }
2230            None => vars.reset(self.catalog().system_config(), &name, local)?,
2231        }
2232
2233        Ok(ExecuteResponse::SetVariable { name, reset: false })
2234    }
2235
2236    pub(super) fn sequence_reset_variable(
2237        &self,
2238        session: &mut Session,
2239        plan: plan::ResetVariablePlan,
2240    ) -> Result<ExecuteResponse, AdapterError> {
2241        let name = plan.name;
2242        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2243            self.validate_set_isolation_level(session)?;
2244        }
2245        if &name == vars::CLUSTER.name() {
2246            self.validate_set_cluster(session)?;
2247        }
2248        session
2249            .vars_mut()
2250            .reset(self.catalog().system_config(), &name, false)?;
2251        Ok(ExecuteResponse::SetVariable { name, reset: true })
2252    }
2253
2254    pub(super) fn sequence_set_transaction(
2255        &self,
2256        session: &mut Session,
2257        plan: plan::SetTransactionPlan,
2258    ) -> Result<ExecuteResponse, AdapterError> {
2259        // TODO(jkosh44) Only supports isolation levels for now.
2260        for mode in plan.modes {
2261            match mode {
2262                TransactionMode::AccessMode(_) => {
2263                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2264                }
2265                TransactionMode::IsolationLevel(isolation_level) => {
2266                    self.validate_set_isolation_level(session)?;
2267
2268                    session.vars_mut().set(
2269                        self.catalog().system_config(),
2270                        TRANSACTION_ISOLATION_VAR_NAME,
2271                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2272                        plan.local,
2273                    )?
2274                }
2275            }
2276        }
2277        Ok(ExecuteResponse::SetVariable {
2278            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2279            reset: false,
2280        })
2281    }
2282
2283    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2284        if session.transaction().contains_ops() {
2285            Err(AdapterError::InvalidSetIsolationLevel)
2286        } else {
2287            Ok(())
2288        }
2289    }
2290
2291    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2292        if session.transaction().contains_ops() {
2293            Err(AdapterError::InvalidSetCluster)
2294        } else {
2295            Ok(())
2296        }
2297    }
2298
2299    #[instrument]
2300    pub(super) async fn sequence_end_transaction(
2301        &mut self,
2302        mut ctx: ExecuteContext,
2303        mut action: EndTransactionAction,
2304    ) {
2305        // If the transaction has failed, we can only rollback.
2306        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2307            (&action, ctx.session().transaction())
2308        {
2309            action = EndTransactionAction::Rollback;
2310        }
2311        let response = match action {
2312            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2313                params: BTreeMap::new(),
2314            }),
2315            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2316                params: BTreeMap::new(),
2317            }),
2318        };
2319
2320        let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2321
2322        let (response, action) = match result {
2323            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2324                (response, action)
2325            }
2326            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2327                // Make sure we have the correct set of write locks for this transaction.
2328                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2329                // transactions.
2330                let validated_locks = match write_lock_guards {
2331                    None => None,
2332                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2333                        Ok(locks) => Some(locks),
2334                        Err(missing) => {
2335                            tracing::error!(?missing, "programming error, missing write locks");
2336                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2337                        }
2338                    },
2339                };
2340
2341                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2342                for WriteOp { id, rows } in writes {
2343                    let total_rows = collected_writes.entry(id).or_default();
2344                    total_rows.push(rows);
2345                }
2346
2347                self.submit_write(PendingWriteTxn::User {
2348                    span: Span::current(),
2349                    writes: collected_writes,
2350                    write_locks: validated_locks,
2351                    pending_txn: PendingTxn {
2352                        ctx,
2353                        response,
2354                        action,
2355                    },
2356                });
2357                return;
2358            }
2359            Ok((
2360                Some(TransactionOps::Peeks {
2361                    determination,
2362                    requires_linearization: RequireLinearization::Required,
2363                    ..
2364                }),
2365                _,
2366            )) if ctx.session().vars().transaction_isolation()
2367                == &IsolationLevel::StrictSerializable =>
2368            {
2369                let conn_id = ctx.session().conn_id().clone();
2370                let pending_read_txn = PendingReadTxn {
2371                    txn: PendingRead::Read {
2372                        txn: PendingTxn {
2373                            ctx,
2374                            response,
2375                            action,
2376                        },
2377                    },
2378                    timestamp_context: determination.timestamp_context,
2379                    created: Instant::now(),
2380                    num_requeues: 0,
2381                    otel_ctx: OpenTelemetryContext::obtain(),
2382                };
2383                self.strict_serializable_reads_tx
2384                    .send((conn_id, pending_read_txn))
2385                    .expect("sending to strict_serializable_reads_tx cannot fail");
2386                return;
2387            }
2388            Ok((
2389                Some(TransactionOps::Peeks {
2390                    determination,
2391                    requires_linearization: RequireLinearization::Required,
2392                    ..
2393                }),
2394                _,
2395            )) if ctx.session().vars().transaction_isolation()
2396                == &IsolationLevel::StrongSessionSerializable =>
2397            {
2398                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2399                    ctx.session_mut()
2400                        .ensure_timestamp_oracle(timeline.clone())
2401                        .apply_write(*ts);
2402                }
2403                (response, action)
2404            }
2405            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2406                self.internal_cmd_tx
2407                    .send(Message::ExecuteSingleStatementTransaction {
2408                        ctx,
2409                        otel_ctx: OpenTelemetryContext::obtain(),
2410                        stmt,
2411                        params,
2412                    })
2413                    .expect("must send");
2414                return;
2415            }
2416            Ok((_, _)) => (response, action),
2417            Err(err) => (Err(err), EndTransactionAction::Rollback),
2418        };
2419        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2420        // Append any parameters that changed to the response.
2421        let response = response.map(|mut r| {
2422            r.extend_params(changed);
2423            ExecuteResponse::from(r)
2424        });
2425
2426        ctx.retire(response);
2427    }
2428
2429    #[instrument]
2430    async fn sequence_end_transaction_inner(
2431        &mut self,
2432        ctx: &mut ExecuteContext,
2433        action: EndTransactionAction,
2434    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2435        let txn = self.clear_transaction(ctx.session_mut()).await;
2436
2437        if let EndTransactionAction::Commit = action {
2438            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2439                match &mut ops {
2440                    TransactionOps::Writes(writes) => {
2441                        for WriteOp { id, .. } in &mut writes.iter() {
2442                            // Re-verify this id exists.
2443                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2444                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2445                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2446                                        CatalogError::UnknownItem(id.to_string()),
2447                                    ),
2448                                })
2449                            })?;
2450                        }
2451
2452                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2453                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2454                    }
2455                    TransactionOps::DDL {
2456                        ops,
2457                        state: _,
2458                        side_effects,
2459                        revision,
2460                    } => {
2461                        // Make sure our catalog hasn't changed.
2462                        if *revision != self.catalog().transient_revision() {
2463                            return Err(AdapterError::DDLTransactionRace);
2464                        }
2465                        // Commit all of our queued ops.
2466                        let ops = std::mem::take(ops);
2467                        let side_effects = std::mem::take(side_effects);
2468                        self.catalog_transact_with_side_effects(
2469                            Some(ctx),
2470                            ops,
2471                            move |a, mut ctx| {
2472                                Box::pin(async move {
2473                                    for side_effect in side_effects {
2474                                        side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2475                                    }
2476                                })
2477                            },
2478                        )
2479                        .await?;
2480                    }
2481                    _ => (),
2482                }
2483                return Ok((Some(ops), write_lock_guards));
2484            }
2485        }
2486
2487        Ok((None, None))
2488    }
2489
2490    pub(super) async fn sequence_side_effecting_func(
2491        &mut self,
2492        ctx: ExecuteContext,
2493        plan: SideEffectingFunc,
2494    ) {
2495        match plan {
2496            SideEffectingFunc::PgCancelBackend { connection_id } => {
2497                if ctx.session().conn_id().unhandled() == connection_id {
2498                    // As a special case, if we're canceling ourselves, we send
2499                    // back a canceled resposne to the client issuing the query,
2500                    // and so we need to do no further processing of the cancel.
2501                    ctx.retire(Err(AdapterError::Canceled));
2502                    return;
2503                }
2504
2505                let res = if let Some((id_handle, _conn_meta)) =
2506                    self.active_conns.get_key_value(&connection_id)
2507                {
2508                    // check_plan already verified role membership.
2509                    self.handle_privileged_cancel(id_handle.clone()).await;
2510                    Datum::True
2511                } else {
2512                    Datum::False
2513                };
2514                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2515            }
2516        }
2517    }
2518
2519    /// Checks to see if the session needs a real time recency timestamp and if so returns
2520    /// a future that will return the timestamp.
2521    pub(super) async fn determine_real_time_recent_timestamp(
2522        &self,
2523        session: &Session,
2524        source_ids: impl Iterator<Item = CatalogItemId>,
2525    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2526    {
2527        let vars = session.vars();
2528
2529        // Ideally this logic belongs inside of
2530        // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2531        // logic in there would make it extremely difficult and inconvenient to pull the waiting off
2532        // of the main coord thread.
2533        let r = if vars.real_time_recency()
2534            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2535            && !session.contains_read_timestamp()
2536        {
2537            // Find all dependencies transitively because we need to ensure that
2538            // RTR queries determine the timestamp from the sources' (i.e.
2539            // storage objects that ingest data from external systems) remap
2540            // data. We "cheat" a little bit and filter out any IDs that aren't
2541            // user objects because we know they are not a RTR source.
2542            let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2543            // If none of the sources are user objects, we don't need to provide
2544            // a RTR timestamp.
2545            if to_visit.is_empty() {
2546                return Ok(None);
2547            }
2548
2549            let mut timestamp_objects = BTreeSet::new();
2550
2551            while let Some(id) = to_visit.pop_front() {
2552                timestamp_objects.insert(id);
2553                to_visit.extend(
2554                    self.catalog()
2555                        .get_entry(&id)
2556                        .uses()
2557                        .into_iter()
2558                        .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2559                );
2560            }
2561            let timestamp_objects = timestamp_objects
2562                .into_iter()
2563                .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2564                .collect();
2565
2566            let r = self
2567                .controller
2568                .determine_real_time_recent_timestamp(
2569                    timestamp_objects,
2570                    *vars.real_time_recency_timeout(),
2571                )
2572                .await?;
2573
2574            Some(r)
2575        } else {
2576            None
2577        };
2578
2579        Ok(r)
2580    }
2581
2582    #[instrument]
2583    pub(super) async fn sequence_explain_plan(
2584        &mut self,
2585        ctx: ExecuteContext,
2586        plan: plan::ExplainPlanPlan,
2587        target_cluster: TargetCluster,
2588    ) {
2589        match &plan.explainee {
2590            plan::Explainee::Statement(stmt) => match stmt {
2591                plan::ExplaineeStatement::CreateView { .. } => {
2592                    self.explain_create_view(ctx, plan).await;
2593                }
2594                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2595                    self.explain_create_materialized_view(ctx, plan).await;
2596                }
2597                plan::ExplaineeStatement::CreateIndex { .. } => {
2598                    self.explain_create_index(ctx, plan).await;
2599                }
2600                plan::ExplaineeStatement::Select { .. } => {
2601                    self.explain_peek(ctx, plan, target_cluster).await;
2602                }
2603            },
2604            plan::Explainee::View(_) => {
2605                let result = self.explain_view(&ctx, plan);
2606                ctx.retire(result);
2607            }
2608            plan::Explainee::MaterializedView(_) => {
2609                let result = self.explain_materialized_view(&ctx, plan);
2610                ctx.retire(result);
2611            }
2612            plan::Explainee::Index(_) => {
2613                let result = self.explain_index(&ctx, plan);
2614                ctx.retire(result);
2615            }
2616            plan::Explainee::ReplanView(_) => {
2617                self.explain_replan_view(ctx, plan).await;
2618            }
2619            plan::Explainee::ReplanMaterializedView(_) => {
2620                self.explain_replan_materialized_view(ctx, plan).await;
2621            }
2622            plan::Explainee::ReplanIndex(_) => {
2623                self.explain_replan_index(ctx, plan).await;
2624            }
2625        };
2626    }
2627
2628    pub(super) async fn sequence_explain_pushdown(
2629        &mut self,
2630        ctx: ExecuteContext,
2631        plan: plan::ExplainPushdownPlan,
2632        target_cluster: TargetCluster,
2633    ) {
2634        match plan.explainee {
2635            Explainee::Statement(ExplaineeStatement::Select {
2636                broken: false,
2637                plan,
2638                desc: _,
2639            }) => {
2640                let stage = return_if_err!(
2641                    self.peek_validate(
2642                        ctx.session(),
2643                        plan,
2644                        target_cluster,
2645                        None,
2646                        ExplainContext::Pushdown,
2647                        Some(ctx.session().vars().max_query_result_size()),
2648                    ),
2649                    ctx
2650                );
2651                self.sequence_staged(ctx, Span::current(), stage).await;
2652            }
2653            Explainee::MaterializedView(item_id) => {
2654                self.explain_pushdown_materialized_view(ctx, item_id).await;
2655            }
2656            _ => {
2657                ctx.retire(Err(AdapterError::Unsupported(
2658                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2659                )));
2660            }
2661        };
2662    }
2663
2664    async fn render_explain_pushdown(
2665        &self,
2666        ctx: ExecuteContext,
2667        as_of: Antichain<Timestamp>,
2668        mz_now: ResultSpec<'static>,
2669        read_holds: Option<ReadHolds<Timestamp>>,
2670        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2671    ) {
2672        let fut = self
2673            .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2674            .await;
2675        task::spawn(|| "render explain pushdown", async move {
2676            // Transfer the necessary read holds over to the background task
2677            let _read_holds = read_holds;
2678            let res = fut.await;
2679            ctx.retire(res);
2680        });
2681    }
2682
2683    async fn render_explain_pushdown_prepare<
2684        I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2685    >(
2686        &self,
2687        session: &Session,
2688        as_of: Antichain<Timestamp>,
2689        mz_now: ResultSpec<'static>,
2690        imports: I,
2691    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2692        let explain_timeout = *session.vars().statement_timeout();
2693        let mut futures = FuturesOrdered::new();
2694        for (id, mfp) in imports {
2695            let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2696            let full_name = self
2697                .catalog
2698                .for_session(session)
2699                .resolve_full_name(&catalog_entry.name);
2700            let name = format!("{}", full_name);
2701            let relation_desc = catalog_entry
2702                .desc_opt()
2703                .expect("source should have a proper desc")
2704                .into_owned();
2705            let stats_future = self
2706                .controller
2707                .storage_collections
2708                .snapshot_parts_stats(id, as_of.clone())
2709                .await;
2710
2711            let mz_now = mz_now.clone();
2712            // These futures may block if the source is not yet readable at the as-of;
2713            // stash them in `futures` and only block on them in a separate task.
2714            futures.push_back(async move {
2715                let snapshot_stats = match stats_future.await {
2716                    Ok(stats) => stats,
2717                    Err(e) => return Err(e),
2718                };
2719                let mut total_bytes = 0;
2720                let mut total_parts = 0;
2721                let mut selected_bytes = 0;
2722                let mut selected_parts = 0;
2723                for SnapshotPartStats {
2724                    encoded_size_bytes: bytes,
2725                    stats,
2726                } in &snapshot_stats.parts
2727                {
2728                    let bytes = u64::cast_from(*bytes);
2729                    total_bytes += bytes;
2730                    total_parts += 1u64;
2731                    let selected = match stats {
2732                        None => true,
2733                        Some(stats) => {
2734                            let stats = stats.decode();
2735                            let stats = RelationPartStats::new(
2736                                name.as_str(),
2737                                &snapshot_stats.metrics.pushdown.part_stats,
2738                                &relation_desc,
2739                                &stats,
2740                            );
2741                            stats.may_match_mfp(mz_now.clone(), &mfp)
2742                        }
2743                    };
2744
2745                    if selected {
2746                        selected_bytes += bytes;
2747                        selected_parts += 1u64;
2748                    }
2749                }
2750                Ok(Row::pack_slice(&[
2751                    name.as_str().into(),
2752                    total_bytes.into(),
2753                    selected_bytes.into(),
2754                    total_parts.into(),
2755                    selected_parts.into(),
2756                ]))
2757            });
2758        }
2759
2760        let fut = async move {
2761            match tokio::time::timeout(
2762                explain_timeout,
2763                futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2764            )
2765            .await
2766            {
2767                Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2768                    rows: Box::new(rows.into_row_iter()),
2769                }),
2770                Ok(Err(err)) => Err(err.into()),
2771                Err(_) => Err(AdapterError::StatementTimeout),
2772            }
2773        };
2774        fut
2775    }
2776
2777    #[instrument]
2778    pub(super) async fn sequence_insert(
2779        &mut self,
2780        mut ctx: ExecuteContext,
2781        plan: plan::InsertPlan,
2782    ) {
2783        // Normally, this would get checked when trying to add "write ops" to
2784        // the transaction but we go down diverging paths below, based on
2785        // whether the INSERT is only constant values or not.
2786        //
2787        // For the non-constant case we sequence an implicit read-then-write,
2788        // which messes with the transaction ops and would allow an implicit
2789        // read-then-write to sneak into a read-only transaction.
2790        if !ctx.session_mut().transaction().allows_writes() {
2791            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2792            return;
2793        }
2794
2795        // The structure of this code originates from a time where
2796        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2797        // optimized `MirRelationExpr`.
2798        //
2799        // Ideally, we would like to make the `selection.as_const().is_some()`
2800        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2801        // are planned as a Wrap($n, $vals) call, so until we can reduce
2802        // HirRelationExpr this will always returns false.
2803        //
2804        // Unfortunately, hitting the default path of the match below also
2805        // causes a lot of tests to fail, so we opted to go with the extra
2806        // `plan.values.clone()` statements when producing the `optimized_mir`
2807        // and re-optimize the values in the `sequence_read_then_write` call.
2808        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2809            // We don't perform any optimizations on an expression that is already
2810            // a constant for writes, as we want to maximize bulk-insert throughput.
2811            let expr = return_if_err!(
2812                plan.values
2813                    .clone()
2814                    .lower(self.catalog().system_config(), None),
2815                ctx
2816            );
2817            OptimizedMirRelationExpr(expr)
2818        } else {
2819            // Collect optimizer parameters.
2820            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2821
2822            // (`optimize::view::Optimizer` has a special case for constant queries.)
2823            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2824
2825            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2826            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2827        };
2828
2829        match optimized_mir.into_inner() {
2830            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2831                let catalog = self.owned_catalog();
2832                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2833                    let result =
2834                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2835                    ctx.retire(result);
2836                });
2837            }
2838            // All non-constant values must be planned as read-then-writes.
2839            _ => {
2840                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2841                    Some(table) => {
2842                        let fullname = self
2843                            .catalog()
2844                            .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2845                        // Inserts always occur at the latest version of the table.
2846                        table
2847                            .desc_latest(&fullname)
2848                            .expect("desc called on table")
2849                            .arity()
2850                    }
2851                    None => {
2852                        ctx.retire(Err(AdapterError::Catalog(
2853                            mz_catalog::memory::error::Error {
2854                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2855                                    CatalogError::UnknownItem(plan.id.to_string()),
2856                                ),
2857                            },
2858                        )));
2859                        return;
2860                    }
2861                };
2862
2863                let finishing = RowSetFinishing {
2864                    order_by: vec![],
2865                    limit: None,
2866                    offset: 0,
2867                    project: (0..desc_arity).collect(),
2868                };
2869
2870                let read_then_write_plan = plan::ReadThenWritePlan {
2871                    id: plan.id,
2872                    selection: plan.values,
2873                    finishing,
2874                    assignments: BTreeMap::new(),
2875                    kind: MutationKind::Insert,
2876                    returning: plan.returning,
2877                };
2878
2879                self.sequence_read_then_write(ctx, read_then_write_plan)
2880                    .await;
2881            }
2882        }
2883    }
2884
2885    /// ReadThenWrite is a plan whose writes depend on the results of a
2886    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2887    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2888    /// serializability violation could occur.
2889    #[instrument]
2890    pub(super) async fn sequence_read_then_write(
2891        &mut self,
2892        mut ctx: ExecuteContext,
2893        plan: plan::ReadThenWritePlan,
2894    ) {
2895        let mut source_ids: BTreeSet<_> = plan
2896            .selection
2897            .depends_on()
2898            .into_iter()
2899            .map(|gid| self.catalog().resolve_item_id(&gid))
2900            .collect();
2901        source_ids.insert(plan.id);
2902
2903        // If the transaction doesn't already have write locks, acquire them.
2904        if ctx.session().transaction().write_locks().is_none() {
2905            // Pre-define all of the locks we need.
2906            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2907
2908            // Try acquiring all of our locks.
2909            for id in &source_ids {
2910                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2911                    write_locks.insert_lock(*id, lock);
2912                }
2913            }
2914
2915            // See if we acquired all of the neccessary locks.
2916            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2917                Ok(locks) => locks,
2918                Err(missing) => {
2919                    // Defer our write if we couldn't acquire all of the locks.
2920                    let role_metadata = ctx.session().role_metadata().clone();
2921                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2922                    let plan = DeferredPlan {
2923                        ctx,
2924                        plan: Plan::ReadThenWrite(plan),
2925                        validity: PlanValidity::new(
2926                            self.catalog.transient_revision(),
2927                            source_ids.clone(),
2928                            None,
2929                            None,
2930                            role_metadata,
2931                        ),
2932                        requires_locks: source_ids,
2933                    };
2934                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2935                }
2936            };
2937
2938            ctx.session_mut()
2939                .try_grant_write_locks(write_locks)
2940                .expect("session has already been granted write locks");
2941        }
2942
2943        let plan::ReadThenWritePlan {
2944            id,
2945            kind,
2946            selection,
2947            mut assignments,
2948            finishing,
2949            returning,
2950        } = plan;
2951
2952        // Read then writes can be queued, so re-verify the id exists.
2953        let desc = match self.catalog().try_get_entry(&id) {
2954            Some(table) => {
2955                let full_name = self
2956                    .catalog()
2957                    .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2958                // Inserts always occur at the latest version of the table.
2959                table
2960                    .desc_latest(&full_name)
2961                    .expect("desc called on table")
2962                    .into_owned()
2963            }
2964            None => {
2965                ctx.retire(Err(AdapterError::Catalog(
2966                    mz_catalog::memory::error::Error {
2967                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2968                            id.to_string(),
2969                        )),
2970                    },
2971                )));
2972                return;
2973            }
2974        };
2975
2976        // Disallow mz_now in any position because read time and write time differ.
2977        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2978            || assignments.values().any(|e| e.contains_temporal())
2979            || returning.iter().any(|e| e.contains_temporal());
2980        if contains_temporal {
2981            ctx.retire(Err(AdapterError::Unsupported(
2982                "calls to mz_now in write statements",
2983            )));
2984            return;
2985        }
2986
2987        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2988        //
2989        // - They do not refer to any objects whose notion of time moves differently than that of
2990        // user tables. This limitation is meant to ensure no writes occur between this read and the
2991        // subsequent write.
2992        // - They do not use mz_now(), whose time produced during read will differ from the write
2993        //   timestamp.
2994        fn validate_read_dependencies(
2995            catalog: &Catalog,
2996            id: &CatalogItemId,
2997        ) -> Result<(), AdapterError> {
2998            use CatalogItemType::*;
2999            use mz_catalog::memory::objects;
3000            let mut ids_to_check = Vec::new();
3001            let valid = match catalog.try_get_entry(id) {
3002                Some(entry) => {
3003                    if let CatalogItem::View(objects::View { optimized_expr, .. })
3004                    | CatalogItem::MaterializedView(objects::MaterializedView {
3005                        optimized_expr,
3006                        ..
3007                    }) = entry.item()
3008                    {
3009                        if optimized_expr.contains_temporal() {
3010                            return Err(AdapterError::Unsupported(
3011                                "calls to mz_now in write statements",
3012                            ));
3013                        }
3014                    }
3015                    match entry.item().typ() {
3016                        typ @ (Func | View | MaterializedView | ContinualTask) => {
3017                            ids_to_check.extend(entry.uses());
3018                            let valid_id = id.is_user() || matches!(typ, Func);
3019                            valid_id
3020                        }
3021                        Source | Secret | Connection => false,
3022                        // Cannot select from sinks or indexes.
3023                        Sink | Index => unreachable!(),
3024                        Table => {
3025                            if !id.is_user() {
3026                                // We can't read from non-user tables
3027                                false
3028                            } else {
3029                                // We can't read from tables that are source-exports
3030                                entry.source_export_details().is_none()
3031                            }
3032                        }
3033                        Type => true,
3034                    }
3035                }
3036                None => false,
3037            };
3038            if !valid {
3039                return Err(AdapterError::InvalidTableMutationSelection);
3040            }
3041            for id in ids_to_check {
3042                validate_read_dependencies(catalog, &id)?;
3043            }
3044            Ok(())
3045        }
3046
3047        for gid in selection.depends_on() {
3048            let item_id = self.catalog().resolve_item_id(&gid);
3049            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3050                ctx.retire(Err(err));
3051                return;
3052            }
3053        }
3054
3055        let (peek_tx, peek_rx) = oneshot::channel();
3056        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3057        let (tx, _, session, extra) = ctx.into_parts();
3058        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
3059        // execution context, because this peek does not directly correspond to an execute,
3060        // and so we don't need to take any action on its retirement.
3061        // TODO[btv]: we might consider extending statement logging to log the inner
3062        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
3063        // and mint a new "real" execution context here. We'd also have to add some logic to
3064        // make sure such "sub-statements" are always sampled when the top-level statement is
3065        //
3066        // It's debatable whether this makes sense conceptually,
3067        // because the inner fragment here is not actually a
3068        // "statement" in its own right.
3069        let peek_ctx = ExecuteContext::from_parts(
3070            peek_client_tx,
3071            self.internal_cmd_tx.clone(),
3072            session,
3073            Default::default(),
3074        );
3075
3076        self.sequence_peek(
3077            peek_ctx,
3078            plan::SelectPlan {
3079                select: None,
3080                source: selection,
3081                when: QueryWhen::FreshestTableWrite,
3082                finishing,
3083                copy_to: None,
3084            },
3085            TargetCluster::Active,
3086            None,
3087        )
3088        .await;
3089
3090        let internal_cmd_tx = self.internal_cmd_tx.clone();
3091        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3092        let catalog = self.owned_catalog();
3093        let max_result_size = self.catalog().system_config().max_result_size();
3094
3095        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3096            let (peek_response, session) = match peek_rx.await {
3097                Ok(Response {
3098                    result: Ok(resp),
3099                    session,
3100                    otel_ctx,
3101                }) => {
3102                    otel_ctx.attach_as_parent();
3103                    (resp, session)
3104                }
3105                Ok(Response {
3106                    result: Err(e),
3107                    session,
3108                    otel_ctx,
3109                }) => {
3110                    let ctx =
3111                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3112                    otel_ctx.attach_as_parent();
3113                    ctx.retire(Err(e));
3114                    return;
3115                }
3116                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
3117                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3118            };
3119            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3120            let mut timeout_dur = *ctx.session().vars().statement_timeout();
3121
3122            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
3123            if timeout_dur == Duration::ZERO {
3124                timeout_dur = Duration::MAX;
3125            }
3126
3127            let style = ExprPrepStyle::OneShot {
3128                logical_time: EvalTime::NotAvailable,
3129                session: ctx.session(),
3130                catalog_state: catalog.state(),
3131            };
3132            for expr in assignments.values_mut() {
3133                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3134            }
3135
3136            let make_diffs =
3137                move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3138                    let arena = RowArena::new();
3139                    let mut diffs = Vec::new();
3140                    let mut datum_vec = mz_repr::DatumVec::new();
3141
3142                    while let Some(row) = rows.next() {
3143                        if !assignments.is_empty() {
3144                            assert!(
3145                                matches!(kind, MutationKind::Update),
3146                                "only updates support assignments"
3147                            );
3148                            let mut datums = datum_vec.borrow_with(row);
3149                            let mut updates = vec![];
3150                            for (idx, expr) in &assignments {
3151                                let updated = match expr.eval(&datums, &arena) {
3152                                    Ok(updated) => updated,
3153                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3154                                };
3155                                updates.push((*idx, updated));
3156                            }
3157                            for (idx, new_value) in updates {
3158                                datums[idx] = new_value;
3159                            }
3160                            let updated = Row::pack_slice(&datums);
3161                            diffs.push((updated, Diff::ONE));
3162                        }
3163                        match kind {
3164                            // Updates and deletes always remove the
3165                            // current row. Updates will also add an
3166                            // updated value.
3167                            MutationKind::Update | MutationKind::Delete => {
3168                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
3169                            }
3170                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3171                        }
3172                    }
3173
3174                    // Sum of all the rows' byte size, for checking if we go
3175                    // above the max_result_size threshold.
3176                    let mut byte_size: u64 = 0;
3177                    for (row, diff) in &diffs {
3178                        byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3179                        if diff.is_positive() {
3180                            for (idx, datum) in row.iter().enumerate() {
3181                                desc.constraints_met(idx, &datum)?;
3182                            }
3183                        }
3184                    }
3185                    Ok((diffs, byte_size))
3186                };
3187
3188            let diffs = match peek_response {
3189                ExecuteResponse::SendingRowsStreaming {
3190                    rows: mut rows_stream,
3191                    ..
3192                } => {
3193                    let mut byte_size: u64 = 0;
3194                    let mut diffs = Vec::new();
3195                    let result = loop {
3196                        match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3197                            Ok(Some(res)) => match res {
3198                                PeekResponseUnary::Rows(new_rows) => {
3199                                    match make_diffs(new_rows) {
3200                                        Ok((mut new_diffs, new_byte_size)) => {
3201                                            byte_size = byte_size.saturating_add(new_byte_size);
3202                                            if byte_size > max_result_size {
3203                                                break Err(AdapterError::ResultSize(format!(
3204                                                    "result exceeds max size of {max_result_size}"
3205                                                )));
3206                                            }
3207                                            diffs.append(&mut new_diffs)
3208                                        }
3209                                        Err(e) => break Err(e),
3210                                    };
3211                                }
3212                                PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3213                                PeekResponseUnary::Error(e) => {
3214                                    break Err(AdapterError::Unstructured(anyhow!(e)));
3215                                }
3216                            },
3217                            Ok(None) => break Ok(diffs),
3218                            Err(_) => {
3219                                // We timed out, so remove the pending peek. This is
3220                                // best-effort and doesn't guarantee we won't
3221                                // receive a response.
3222                                // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
3223                                let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3224                                    conn_id: ctx.session().conn_id().clone(),
3225                                });
3226                                if let Err(e) = result {
3227                                    warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3228                                }
3229                                break Err(AdapterError::StatementTimeout);
3230                            }
3231                        }
3232                    };
3233
3234                    result
3235                }
3236                ExecuteResponse::SendingRowsImmediate { rows } => {
3237                    make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3238                }
3239                resp => Err(AdapterError::Unstructured(anyhow!(
3240                    "unexpected peek response: {resp:?}"
3241                ))),
3242            };
3243
3244            let mut returning_rows = Vec::new();
3245            let mut diff_err: Option<AdapterError> = None;
3246            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3247                let arena = RowArena::new();
3248                for (row, diff) in diffs {
3249                    if !diff.is_positive() {
3250                        continue;
3251                    }
3252                    let mut returning_row = Row::with_capacity(returning.len());
3253                    let mut packer = returning_row.packer();
3254                    for expr in &returning {
3255                        let datums: Vec<_> = row.iter().collect();
3256                        match expr.eval(&datums, &arena) {
3257                            Ok(datum) => {
3258                                packer.push(datum);
3259                            }
3260                            Err(err) => {
3261                                diff_err = Some(err.into());
3262                                break;
3263                            }
3264                        }
3265                    }
3266                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3267                    let diff = match NonZeroUsize::try_from(diff) {
3268                        Ok(diff) => diff,
3269                        Err(err) => {
3270                            diff_err = Some(err.into());
3271                            break;
3272                        }
3273                    };
3274                    returning_rows.push((returning_row, diff));
3275                    if diff_err.is_some() {
3276                        break;
3277                    }
3278                }
3279            }
3280            let diffs = if let Some(err) = diff_err {
3281                Err(err)
3282            } else {
3283                diffs
3284            };
3285
3286            // We need to clear out the timestamp context so the write doesn't fail due to a
3287            // read only transaction.
3288            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3289            // No matter what isolation level the client is using, we must linearize this
3290            // read. The write will be performed right after this, as part of a single
3291            // transaction, so the write must have a timestamp greater than or equal to the
3292            // read.
3293            //
3294            // Note: It's only OK for the write to have a greater timestamp than the read
3295            // because the write lock prevents any other writes from happening in between
3296            // the read and write.
3297            if let Some(timestamp_context) = timestamp_context {
3298                let (tx, rx) = tokio::sync::oneshot::channel();
3299                let conn_id = ctx.session().conn_id().clone();
3300                let pending_read_txn = PendingReadTxn {
3301                    txn: PendingRead::ReadThenWrite { ctx, tx },
3302                    timestamp_context,
3303                    created: Instant::now(),
3304                    num_requeues: 0,
3305                    otel_ctx: OpenTelemetryContext::obtain(),
3306                };
3307                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3308                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3309                if let Err(e) = result {
3310                    warn!(
3311                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3312                        e
3313                    );
3314                    return;
3315                }
3316                let result = rx.await;
3317                // It is not an error for these results to be ready after `tx` has been dropped.
3318                ctx = match result {
3319                    Ok(Some(ctx)) => ctx,
3320                    Ok(None) => {
3321                        // Coordinator took our context and will handle responding to the client.
3322                        // This usually indicates that our transaction was aborted.
3323                        return;
3324                    }
3325                    Err(e) => {
3326                        warn!(
3327                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3328                            e
3329                        );
3330                        return;
3331                    }
3332                };
3333            }
3334
3335            match diffs {
3336                Ok(diffs) => {
3337                    let result = Self::send_diffs(
3338                        ctx.session_mut(),
3339                        plan::SendDiffsPlan {
3340                            id,
3341                            updates: diffs,
3342                            kind,
3343                            returning: returning_rows,
3344                            max_result_size,
3345                        },
3346                    );
3347                    ctx.retire(result);
3348                }
3349                Err(e) => {
3350                    ctx.retire(Err(e));
3351                }
3352            }
3353        });
3354    }
3355
3356    #[instrument]
3357    pub(super) async fn sequence_alter_item_rename(
3358        &mut self,
3359        ctx: &mut ExecuteContext,
3360        plan: plan::AlterItemRenamePlan,
3361    ) -> Result<ExecuteResponse, AdapterError> {
3362        let op = catalog::Op::RenameItem {
3363            id: plan.id,
3364            current_full_name: plan.current_full_name,
3365            to_name: plan.to_name,
3366        };
3367        match self
3368            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3369            .await
3370        {
3371            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3372            Err(err) => Err(err),
3373        }
3374    }
3375
3376    #[instrument]
3377    pub(super) async fn sequence_alter_retain_history(
3378        &mut self,
3379        ctx: &mut ExecuteContext,
3380        plan: plan::AlterRetainHistoryPlan,
3381    ) -> Result<ExecuteResponse, AdapterError> {
3382        let ops = vec![catalog::Op::AlterRetainHistory {
3383            id: plan.id,
3384            value: plan.value,
3385            window: plan.window,
3386        }];
3387        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3388            Box::pin(async move {
3389                let catalog_item = coord.catalog().get_entry(&plan.id).item();
3390                let cluster = match catalog_item {
3391                    CatalogItem::Table(_)
3392                    | CatalogItem::MaterializedView(_)
3393                    | CatalogItem::Source(_)
3394                    | CatalogItem::ContinualTask(_) => None,
3395                    CatalogItem::Index(index) => Some(index.cluster_id),
3396                    CatalogItem::Log(_)
3397                    | CatalogItem::View(_)
3398                    | CatalogItem::Sink(_)
3399                    | CatalogItem::Type(_)
3400                    | CatalogItem::Func(_)
3401                    | CatalogItem::Secret(_)
3402                    | CatalogItem::Connection(_) => unreachable!(),
3403                };
3404                match cluster {
3405                    Some(cluster) => {
3406                        coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3407                    }
3408                    None => {
3409                        coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3410                    }
3411                }
3412            })
3413        })
3414        .await?;
3415        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3416    }
3417
3418    #[instrument]
3419    pub(super) async fn sequence_alter_schema_rename(
3420        &mut self,
3421        ctx: &mut ExecuteContext,
3422        plan: plan::AlterSchemaRenamePlan,
3423    ) -> Result<ExecuteResponse, AdapterError> {
3424        let (database_spec, schema_spec) = plan.cur_schema_spec;
3425        let op = catalog::Op::RenameSchema {
3426            database_spec,
3427            schema_spec,
3428            new_name: plan.new_schema_name,
3429            check_reserved_names: true,
3430        };
3431        match self
3432            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3433            .await
3434        {
3435            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3436            Err(err) => Err(err),
3437        }
3438    }
3439
3440    #[instrument]
3441    pub(super) async fn sequence_alter_schema_swap(
3442        &mut self,
3443        ctx: &mut ExecuteContext,
3444        plan: plan::AlterSchemaSwapPlan,
3445    ) -> Result<ExecuteResponse, AdapterError> {
3446        let plan::AlterSchemaSwapPlan {
3447            schema_a_spec: (schema_a_db, schema_a),
3448            schema_a_name,
3449            schema_b_spec: (schema_b_db, schema_b),
3450            schema_b_name,
3451            name_temp,
3452        } = plan;
3453
3454        let op_a = catalog::Op::RenameSchema {
3455            database_spec: schema_a_db,
3456            schema_spec: schema_a,
3457            new_name: name_temp,
3458            check_reserved_names: false,
3459        };
3460        let op_b = catalog::Op::RenameSchema {
3461            database_spec: schema_b_db,
3462            schema_spec: schema_b,
3463            new_name: schema_a_name,
3464            check_reserved_names: false,
3465        };
3466        let op_c = catalog::Op::RenameSchema {
3467            database_spec: schema_a_db,
3468            schema_spec: schema_a,
3469            new_name: schema_b_name,
3470            check_reserved_names: false,
3471        };
3472
3473        match self
3474            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3475                Box::pin(async {})
3476            })
3477            .await
3478        {
3479            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3480            Err(err) => Err(err),
3481        }
3482    }
3483
3484    #[instrument]
3485    pub(super) async fn sequence_alter_role(
3486        &mut self,
3487        session: &Session,
3488        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3489    ) -> Result<ExecuteResponse, AdapterError> {
3490        let catalog = self.catalog().for_session(session);
3491        let role = catalog.get_role(&id);
3492
3493        // We'll send these notices to the user, if the operation is successful.
3494        let mut notices = vec![];
3495
3496        // Get the attributes and variables from the role, as they currently are.
3497        let mut attributes = role.attributes().clone();
3498        let mut vars = role.vars().clone();
3499
3500        // Apply our updates.
3501        match option {
3502            PlannedAlterRoleOption::Attributes(attrs) => {
3503                self.validate_role_attributes(&attrs.clone().into())?;
3504
3505                if let Some(inherit) = attrs.inherit {
3506                    attributes.inherit = inherit;
3507                }
3508
3509                if let Some(password) = attrs.password {
3510                    attributes.password = Some(password);
3511                }
3512
3513                if let Some(superuser) = attrs.superuser {
3514                    attributes.superuser = Some(superuser);
3515                }
3516
3517                if let Some(login) = attrs.login {
3518                    attributes.login = Some(login);
3519                }
3520
3521                if attrs.nopassword.unwrap_or(false) {
3522                    attributes.password = None;
3523                }
3524
3525                if let Some(notice) = self.should_emit_rbac_notice(session) {
3526                    notices.push(notice);
3527                }
3528            }
3529            PlannedAlterRoleOption::Variable(variable) => {
3530                // Get the variable to make sure it's valid and visible.
3531                let session_var = session.vars().inspect(variable.name())?;
3532                // Return early if it's not visible.
3533                session_var.visible(session.user(), catalog.system_vars())?;
3534
3535                // Emit a warning when deprecated variables are used.
3536                // TODO(database-issues#8069) remove this after sufficient time has passed
3537                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3538                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3539                } else if let PlannedRoleVariable::Set {
3540                    name,
3541                    value: VariableValue::Values(vals),
3542                } = &variable
3543                {
3544                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3545                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3546                    }
3547                }
3548
3549                let var_name = match variable {
3550                    PlannedRoleVariable::Set { name, value } => {
3551                        // Update our persisted set.
3552                        match &value {
3553                            VariableValue::Default => {
3554                                vars.remove(&name);
3555                            }
3556                            VariableValue::Values(vals) => {
3557                                let var = match &vals[..] {
3558                                    [val] => OwnedVarInput::Flat(val.clone()),
3559                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3560                                };
3561                                // Make sure the input is valid.
3562                                session_var.check(var.borrow())?;
3563
3564                                vars.insert(name.clone(), var);
3565                            }
3566                        };
3567                        name
3568                    }
3569                    PlannedRoleVariable::Reset { name } => {
3570                        // Remove it from our persisted values.
3571                        vars.remove(&name);
3572                        name
3573                    }
3574                };
3575
3576                // Emit a notice that they need to reconnect to see the change take effect.
3577                notices.push(AdapterNotice::VarDefaultUpdated {
3578                    role: Some(name.clone()),
3579                    var_name: Some(var_name),
3580                });
3581            }
3582        }
3583
3584        let op = catalog::Op::AlterRole {
3585            id,
3586            name,
3587            attributes,
3588            vars: RoleVars { map: vars },
3589        };
3590        let response = self
3591            .catalog_transact(Some(session), vec![op])
3592            .await
3593            .map(|_| ExecuteResponse::AlteredRole)?;
3594
3595        // Send all of our queued notices.
3596        session.add_notices(notices);
3597
3598        Ok(response)
3599    }
3600
3601    #[instrument]
3602    pub(super) async fn sequence_alter_sink_prepare(
3603        &mut self,
3604        ctx: ExecuteContext,
3605        plan: plan::AlterSinkPlan,
3606    ) {
3607        // Put a read hold on the new relation
3608        let id_bundle = crate::CollectionIdBundle {
3609            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3610            compute_ids: BTreeMap::new(),
3611        };
3612        let read_hold = self.acquire_read_holds(&id_bundle);
3613
3614        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3615            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3616            return;
3617        };
3618
3619        let otel_ctx = OpenTelemetryContext::obtain();
3620        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3621
3622        let plan_validity = PlanValidity::new(
3623            self.catalog().transient_revision(),
3624            BTreeSet::from_iter([from_item_id]),
3625            Some(plan.in_cluster),
3626            None,
3627            ctx.session().role_metadata().clone(),
3628        );
3629
3630        // Re-resolve items in the altered statement
3631        // Parse statement.
3632        let create_sink_stmt = match mz_sql::parse::parse(&plan.sink.create_sql)
3633            .expect("invalid create sink sql")
3634            .into_element()
3635            .ast
3636        {
3637            Statement::CreateSink(stmt) => stmt,
3638            _ => unreachable!("invalid statment kind for sink"),
3639        };
3640        let catalog = self.catalog().for_system_session();
3641        let (_, resolved_ids) = match mz_sql::names::resolve(&catalog, create_sink_stmt) {
3642            Ok(ok) => ok,
3643            Err(e) => {
3644                ctx.retire(Err(AdapterError::internal("ALTER SINK", e)));
3645                return;
3646            }
3647        };
3648
3649        info!(
3650            "preparing alter sink for {}: frontiers={:?} export={:?}",
3651            plan.global_id,
3652            self.controller
3653                .storage_collections
3654                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3655            self.controller.storage.export(plan.global_id)
3656        );
3657
3658        // Now we must wait for the sink to make enough progress such that there is overlap between
3659        // the new `from` collection's read hold and the sink's write frontier.
3660        self.install_storage_watch_set(
3661            ctx.session().conn_id().clone(),
3662            BTreeSet::from_iter([plan.global_id]),
3663            read_ts,
3664            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3665                ctx: Some(ctx),
3666                otel_ctx,
3667                plan,
3668                plan_validity,
3669                resolved_ids,
3670                read_hold,
3671            }),
3672        );
3673    }
3674
3675    #[instrument]
3676    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3677        ctx.otel_ctx.attach_as_parent();
3678        match ctx.plan_validity.check(self.catalog()) {
3679            Ok(()) => {}
3680            Err(err) => {
3681                ctx.retire(Err(err));
3682                return;
3683            }
3684        }
3685        {
3686            let plan = &ctx.plan;
3687            info!(
3688                "finishing alter sink for {}: frontiers={:?} export={:?}",
3689                plan.global_id,
3690                self.controller
3691                    .storage_collections
3692                    .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3693                self.controller.storage.export(plan.global_id)
3694            );
3695        }
3696
3697        let plan::AlterSinkPlan {
3698            item_id,
3699            global_id,
3700            sink,
3701            with_snapshot,
3702            in_cluster,
3703        } = ctx.plan.clone();
3704        // Assert that we can recover the updates that happened at the timestamps of the write
3705        // frontier. This must be true in this call.
3706        let write_frontier = &self
3707            .controller
3708            .storage
3709            .export(global_id)
3710            .expect("sink known to exist")
3711            .write_frontier;
3712        let as_of = ctx.read_hold.least_valid_read();
3713        assert!(
3714            write_frontier.iter().all(|t| as_of.less_than(t)),
3715            "{:?} should be strictly less than {:?}",
3716            &*as_of,
3717            &**write_frontier
3718        );
3719
3720        let catalog_sink = Sink {
3721            create_sql: sink.create_sql,
3722            global_id,
3723            from: sink.from,
3724            connection: sink.connection.clone(),
3725            envelope: sink.envelope,
3726            version: sink.version,
3727            with_snapshot,
3728            resolved_ids: ctx.resolved_ids.clone(),
3729            cluster_id: in_cluster,
3730        };
3731
3732        let ops = vec![catalog::Op::UpdateItem {
3733            id: item_id,
3734            name: self.catalog.get_entry(&item_id).name().clone(),
3735            to_item: CatalogItem::Sink(catalog_sink),
3736        }];
3737
3738        match self
3739            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3740            .await
3741        {
3742            Ok(()) => {}
3743            Err(err) => {
3744                ctx.retire(Err(err));
3745                return;
3746            }
3747        }
3748
3749        let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3750        let storage_sink_desc = StorageSinkDesc {
3751            from: sink.from,
3752            from_desc: from_entry
3753                .desc_opt()
3754                .expect("sinks can only be built on items with descs")
3755                .into_owned(),
3756            connection: sink
3757                .connection
3758                .clone()
3759                .into_inline_connection(self.catalog().state()),
3760            envelope: sink.envelope,
3761            as_of,
3762            with_snapshot,
3763            version: sink.version,
3764            from_storage_metadata: (),
3765            to_storage_metadata: (),
3766        };
3767
3768        self.controller
3769            .storage
3770            .alter_export(
3771                global_id,
3772                ExportDescription {
3773                    sink: storage_sink_desc,
3774                    instance_id: in_cluster,
3775                },
3776            )
3777            .await
3778            .unwrap_or_terminate("cannot fail to alter source desc");
3779
3780        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3781    }
3782
3783    #[instrument]
3784    pub(super) async fn sequence_alter_connection(
3785        &mut self,
3786        ctx: ExecuteContext,
3787        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3788    ) {
3789        match action {
3790            AlterConnectionAction::RotateKeys => {
3791                self.sequence_rotate_keys(ctx, id).await;
3792            }
3793            AlterConnectionAction::AlterOptions {
3794                set_options,
3795                drop_options,
3796                validate,
3797            } => {
3798                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3799                    .await
3800            }
3801        }
3802    }
3803
3804    #[instrument]
3805    async fn sequence_alter_connection_options(
3806        &mut self,
3807        mut ctx: ExecuteContext,
3808        id: CatalogItemId,
3809        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3810        drop_options: BTreeSet<ConnectionOptionName>,
3811        validate: bool,
3812    ) {
3813        let cur_entry = self.catalog().get_entry(&id);
3814        let cur_conn = cur_entry.connection().expect("known to be connection");
3815        let connection_gid = cur_conn.global_id();
3816
3817        let inner = || -> Result<Connection, AdapterError> {
3818            // Parse statement.
3819            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3820                .expect("invalid create sql persisted to catalog")
3821                .into_element()
3822                .ast
3823            {
3824                Statement::CreateConnection(stmt) => stmt,
3825                _ => unreachable!("proved type is source"),
3826            };
3827
3828            let catalog = self.catalog().for_system_session();
3829
3830            // Resolve items in statement
3831            let (mut create_conn_stmt, resolved_ids) =
3832                mz_sql::names::resolve(&catalog, create_conn_stmt)
3833                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3834
3835            // Retain options that are neither set nor dropped.
3836            create_conn_stmt
3837                .values
3838                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3839
3840            // Set new values
3841            create_conn_stmt.values.extend(
3842                set_options
3843                    .into_iter()
3844                    .map(|(name, value)| ConnectionOption { name, value }),
3845            );
3846
3847            // Open a new catalog, which we will use to re-plan our
3848            // statement with the desired config.
3849            let mut catalog = self.catalog().for_system_session();
3850            catalog.mark_id_unresolvable_for_replanning(id);
3851
3852            // Re-define our source in terms of the amended statement
3853            let plan = match mz_sql::plan::plan(
3854                None,
3855                &catalog,
3856                Statement::CreateConnection(create_conn_stmt),
3857                &Params::empty(),
3858                &resolved_ids,
3859            )
3860            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3861            {
3862                Plan::CreateConnection(plan) => plan,
3863                _ => unreachable!("create source plan is only valid response"),
3864            };
3865
3866            // Parse statement.
3867            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3868                .expect("invalid create sql persisted to catalog")
3869                .into_element()
3870                .ast
3871            {
3872                Statement::CreateConnection(stmt) => stmt,
3873                _ => unreachable!("proved type is source"),
3874            };
3875
3876            let catalog = self.catalog().for_system_session();
3877
3878            // Resolve items in statement
3879            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3880                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3881
3882            Ok(Connection {
3883                create_sql: plan.connection.create_sql,
3884                global_id: cur_conn.global_id,
3885                details: plan.connection.details,
3886                resolved_ids: new_deps,
3887            })
3888        };
3889
3890        let conn = match inner() {
3891            Ok(conn) => conn,
3892            Err(e) => {
3893                return ctx.retire(Err(e));
3894            }
3895        };
3896
3897        if validate {
3898            let connection = conn
3899                .details
3900                .to_connection()
3901                .into_inline_connection(self.catalog().state());
3902
3903            let internal_cmd_tx = self.internal_cmd_tx.clone();
3904            let transient_revision = self.catalog().transient_revision();
3905            let conn_id = ctx.session().conn_id().clone();
3906            let otel_ctx = OpenTelemetryContext::obtain();
3907            let role_metadata = ctx.session().role_metadata().clone();
3908            let current_storage_parameters = self.controller.storage.config().clone();
3909
3910            task::spawn(
3911                || format!("validate_alter_connection:{conn_id}"),
3912                async move {
3913                    let resolved_ids = conn.resolved_ids.clone();
3914                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3915                    let result = match connection.validate(id, &current_storage_parameters).await {
3916                        Ok(()) => Ok(conn),
3917                        Err(err) => Err(err.into()),
3918                    };
3919
3920                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3921                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3922                        AlterConnectionValidationReady {
3923                            ctx,
3924                            result,
3925                            connection_id: id,
3926                            connection_gid,
3927                            plan_validity: PlanValidity::new(
3928                                transient_revision,
3929                                dependency_ids.clone(),
3930                                None,
3931                                None,
3932                                role_metadata,
3933                            ),
3934                            otel_ctx,
3935                            resolved_ids,
3936                        },
3937                    ));
3938                    if let Err(e) = result {
3939                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3940                    }
3941                },
3942            );
3943        } else {
3944            let result = self
3945                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3946                .await;
3947            ctx.retire(result);
3948        }
3949    }
3950
3951    #[instrument]
3952    pub(crate) async fn sequence_alter_connection_stage_finish(
3953        &mut self,
3954        session: &mut Session,
3955        id: CatalogItemId,
3956        connection: Connection,
3957    ) -> Result<ExecuteResponse, AdapterError> {
3958        match self.catalog.get_entry(&id).item() {
3959            CatalogItem::Connection(curr_conn) => {
3960                curr_conn
3961                    .details
3962                    .to_connection()
3963                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3964                    .map_err(StorageError::from)?;
3965            }
3966            _ => unreachable!("known to be a connection"),
3967        };
3968
3969        let ops = vec![catalog::Op::UpdateItem {
3970            id,
3971            name: self.catalog.get_entry(&id).name().clone(),
3972            to_item: CatalogItem::Connection(connection.clone()),
3973        }];
3974
3975        self.catalog_transact(Some(session), ops).await?;
3976
3977        match connection.details {
3978            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3979                let spec = VpcEndpointConfig {
3980                    aws_service_name: privatelink.service_name.to_owned(),
3981                    availability_zone_ids: privatelink.availability_zones.to_owned(),
3982                };
3983                self.cloud_resource_controller
3984                    .as_ref()
3985                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3986                    .ensure_vpc_endpoint(id, spec)
3987                    .await?;
3988            }
3989            _ => {}
3990        };
3991
3992        let entry = self.catalog().get_entry(&id);
3993
3994        let mut connections = VecDeque::new();
3995        connections.push_front(entry.id());
3996
3997        let mut source_connections = BTreeMap::new();
3998        let mut sink_connections = BTreeMap::new();
3999        let mut source_export_data_configs = BTreeMap::new();
4000
4001        while let Some(id) = connections.pop_front() {
4002            for id in self.catalog.get_entry(&id).used_by() {
4003                let entry = self.catalog.get_entry(id);
4004                match entry.item() {
4005                    CatalogItem::Connection(_) => connections.push_back(*id),
4006                    CatalogItem::Source(source) => {
4007                        let desc = match &entry.source().expect("known to be source").data_source {
4008                            DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
4009                                .desc
4010                                .clone()
4011                                .into_inline_connection(self.catalog().state()),
4012                            _ => unreachable!("only ingestions reference connections"),
4013                        };
4014
4015                        source_connections.insert(source.global_id, desc.connection);
4016                    }
4017                    CatalogItem::Sink(sink) => {
4018                        let export = entry.sink().expect("known to be sink");
4019                        sink_connections.insert(
4020                            sink.global_id,
4021                            export
4022                                .connection
4023                                .clone()
4024                                .into_inline_connection(self.catalog().state()),
4025                        );
4026                    }
4027                    CatalogItem::Table(table) => {
4028                        // This is a source-fed table that reference a schema registry
4029                        // connection as a part of its encoding / data config
4030                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4031                            let data_config = export_data_config.clone();
4032                            source_export_data_configs.insert(
4033                                table.global_id_writes(),
4034                                data_config.into_inline_connection(self.catalog().state()),
4035                            );
4036                        }
4037                    }
4038                    t => unreachable!("connection dependency not expected on {:?}", t),
4039                }
4040            }
4041        }
4042
4043        if !source_connections.is_empty() {
4044            self.controller
4045                .storage
4046                .alter_ingestion_connections(source_connections)
4047                .await
4048                .unwrap_or_terminate("cannot fail to alter ingestion connection");
4049        }
4050
4051        if !sink_connections.is_empty() {
4052            self.controller
4053                .storage
4054                .alter_export_connections(sink_connections)
4055                .await
4056                .unwrap_or_terminate("altering exports after txn must succeed");
4057        }
4058
4059        if !source_export_data_configs.is_empty() {
4060            self.controller
4061                .storage
4062                .alter_ingestion_export_data_configs(source_export_data_configs)
4063                .await
4064                .unwrap_or_terminate("altering source export data configs after txn must succeed");
4065        }
4066
4067        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4068    }
4069
4070    #[instrument]
4071    pub(super) async fn sequence_alter_source(
4072        &mut self,
4073        session: &Session,
4074        plan::AlterSourcePlan {
4075            item_id,
4076            ingestion_id,
4077            action,
4078        }: plan::AlterSourcePlan,
4079    ) -> Result<ExecuteResponse, AdapterError> {
4080        let cur_entry = self.catalog().get_entry(&item_id);
4081        let cur_source = cur_entry.source().expect("known to be source");
4082
4083        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4084            // Parse statement.
4085            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4086                .expect("invalid create sql persisted to catalog")
4087                .into_element()
4088                .ast
4089            {
4090                Statement::CreateSource(stmt) => stmt,
4091                _ => unreachable!("proved type is source"),
4092            };
4093
4094            let catalog = coord.catalog().for_system_session();
4095
4096            // Resolve items in statement
4097            mz_sql::names::resolve(&catalog, create_source_stmt)
4098                .map_err(|e| AdapterError::internal(err_cx, e))
4099        };
4100
4101        match action {
4102            plan::AlterSourceAction::AddSubsourceExports {
4103                subsources,
4104                options,
4105            } => {
4106                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4107
4108                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4109                    text_columns: mut new_text_columns,
4110                    exclude_columns: mut new_exclude_columns,
4111                    ..
4112                } = options.try_into()?;
4113
4114                // Resolve items in statement
4115                let (mut create_source_stmt, resolved_ids) =
4116                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4117
4118                // Get all currently referred-to items
4119                let catalog = self.catalog();
4120                let curr_references: BTreeSet<_> = catalog
4121                    .get_entry(&item_id)
4122                    .used_by()
4123                    .into_iter()
4124                    .filter_map(|subsource| {
4125                        catalog
4126                            .get_entry(subsource)
4127                            .subsource_details()
4128                            .map(|(_id, reference, _details)| reference)
4129                    })
4130                    .collect();
4131
4132                // We are doing a lot of unwrapping, so just make an error to reference; all of
4133                // these invariants are guaranteed to be true because of how we plan subsources.
4134                let purification_err =
4135                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4136
4137                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
4138                // we remove support for implicitly created subsources from a `CREATE SOURCE`
4139                // statement.
4140                match &mut create_source_stmt.connection {
4141                    CreateSourceConnection::Postgres {
4142                        options: curr_options,
4143                        ..
4144                    } => {
4145                        let mz_sql::plan::PgConfigOptionExtracted {
4146                            mut text_columns, ..
4147                        } = curr_options.clone().try_into()?;
4148
4149                        // Drop text columns; we will add them back in
4150                        // as appropriate below.
4151                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4152
4153                        // Drop all text columns that are not currently referred to.
4154                        text_columns.retain(|column_qualified_reference| {
4155                            mz_ore::soft_assert_eq_or_log!(
4156                                column_qualified_reference.0.len(),
4157                                4,
4158                                "all TEXT COLUMNS values must be column-qualified references"
4159                            );
4160                            let mut table = column_qualified_reference.clone();
4161                            table.0.truncate(3);
4162                            curr_references.contains(&table)
4163                        });
4164
4165                        // Merge the current text columns into the new text columns.
4166                        new_text_columns.extend(text_columns);
4167
4168                        // If we have text columns, add them to the options.
4169                        if !new_text_columns.is_empty() {
4170                            new_text_columns.sort();
4171                            let new_text_columns = new_text_columns
4172                                .into_iter()
4173                                .map(WithOptionValue::UnresolvedItemName)
4174                                .collect();
4175
4176                            curr_options.push(PgConfigOption {
4177                                name: PgConfigOptionName::TextColumns,
4178                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4179                            });
4180                        }
4181                    }
4182                    CreateSourceConnection::MySql {
4183                        options: curr_options,
4184                        ..
4185                    } => {
4186                        let mz_sql::plan::MySqlConfigOptionExtracted {
4187                            mut text_columns,
4188                            mut exclude_columns,
4189                            ..
4190                        } = curr_options.clone().try_into()?;
4191
4192                        // Drop both ignore and text columns; we will add them back in
4193                        // as appropriate below.
4194                        curr_options.retain(|o| {
4195                            !matches!(
4196                                o.name,
4197                                MySqlConfigOptionName::TextColumns
4198                                    | MySqlConfigOptionName::ExcludeColumns
4199                            )
4200                        });
4201
4202                        // Drop all text / exclude columns that are not currently referred to.
4203                        let column_referenced =
4204                            |column_qualified_reference: &UnresolvedItemName| {
4205                                mz_ore::soft_assert_eq_or_log!(
4206                                    column_qualified_reference.0.len(),
4207                                    3,
4208                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4209                                );
4210                                let mut table = column_qualified_reference.clone();
4211                                table.0.truncate(2);
4212                                curr_references.contains(&table)
4213                            };
4214                        text_columns.retain(column_referenced);
4215                        exclude_columns.retain(column_referenced);
4216
4217                        // Merge the current text / exclude columns into the new text / exclude columns.
4218                        new_text_columns.extend(text_columns);
4219                        new_exclude_columns.extend(exclude_columns);
4220
4221                        // If we have text columns, add them to the options.
4222                        if !new_text_columns.is_empty() {
4223                            new_text_columns.sort();
4224                            let new_text_columns = new_text_columns
4225                                .into_iter()
4226                                .map(WithOptionValue::UnresolvedItemName)
4227                                .collect();
4228
4229                            curr_options.push(MySqlConfigOption {
4230                                name: MySqlConfigOptionName::TextColumns,
4231                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4232                            });
4233                        }
4234                        // If we have exclude columns, add them to the options.
4235                        if !new_exclude_columns.is_empty() {
4236                            new_exclude_columns.sort();
4237                            let new_exclude_columns = new_exclude_columns
4238                                .into_iter()
4239                                .map(WithOptionValue::UnresolvedItemName)
4240                                .collect();
4241
4242                            curr_options.push(MySqlConfigOption {
4243                                name: MySqlConfigOptionName::ExcludeColumns,
4244                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4245                            });
4246                        }
4247                    }
4248                    CreateSourceConnection::SqlServer {
4249                        options: curr_options,
4250                        ..
4251                    } => {
4252                        let mz_sql::plan::SqlServerConfigOptionExtracted {
4253                            mut text_columns,
4254                            mut exclude_columns,
4255                            ..
4256                        } = curr_options.clone().try_into()?;
4257
4258                        // Drop both ignore and text columns; we will add them back in
4259                        // as appropriate below.
4260                        curr_options.retain(|o| {
4261                            !matches!(
4262                                o.name,
4263                                SqlServerConfigOptionName::TextColumns
4264                                    | SqlServerConfigOptionName::ExcludeColumns
4265                            )
4266                        });
4267
4268                        // Drop all text / exclude columns that are not currently referred to.
4269                        let column_referenced =
4270                            |column_qualified_reference: &UnresolvedItemName| {
4271                                mz_ore::soft_assert_eq_or_log!(
4272                                    column_qualified_reference.0.len(),
4273                                    3,
4274                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4275                                );
4276                                let mut table = column_qualified_reference.clone();
4277                                table.0.truncate(2);
4278                                curr_references.contains(&table)
4279                            };
4280                        text_columns.retain(column_referenced);
4281                        exclude_columns.retain(column_referenced);
4282
4283                        // Merge the current text / exclude columns into the new text / exclude columns.
4284                        new_text_columns.extend(text_columns);
4285                        new_exclude_columns.extend(exclude_columns);
4286
4287                        // If we have text columns, add them to the options.
4288                        if !new_text_columns.is_empty() {
4289                            new_text_columns.sort();
4290                            let new_text_columns = new_text_columns
4291                                .into_iter()
4292                                .map(WithOptionValue::UnresolvedItemName)
4293                                .collect();
4294
4295                            curr_options.push(SqlServerConfigOption {
4296                                name: SqlServerConfigOptionName::TextColumns,
4297                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4298                            });
4299                        }
4300                        // If we have exclude columns, add them to the options.
4301                        if !new_exclude_columns.is_empty() {
4302                            new_exclude_columns.sort();
4303                            let new_exclude_columns = new_exclude_columns
4304                                .into_iter()
4305                                .map(WithOptionValue::UnresolvedItemName)
4306                                .collect();
4307
4308                            curr_options.push(SqlServerConfigOption {
4309                                name: SqlServerConfigOptionName::ExcludeColumns,
4310                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4311                            });
4312                        }
4313                    }
4314                    _ => return Err(purification_err()),
4315                };
4316
4317                let mut catalog = self.catalog().for_system_session();
4318                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4319
4320                // Re-define our source in terms of the amended statement
4321                let plan = match mz_sql::plan::plan(
4322                    None,
4323                    &catalog,
4324                    Statement::CreateSource(create_source_stmt),
4325                    &Params::empty(),
4326                    &resolved_ids,
4327                )
4328                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4329                {
4330                    Plan::CreateSource(plan) => plan,
4331                    _ => unreachable!("create source plan is only valid response"),
4332                };
4333
4334                // Asserting that we've done the right thing with dependencies
4335                // here requires mocking out objects in the catalog, which is a
4336                // large task for an operation we have to cover in tests anyway.
4337                let source = Source::new(
4338                    plan,
4339                    cur_source.global_id,
4340                    resolved_ids,
4341                    cur_source.custom_logical_compaction_window,
4342                    cur_source.is_retained_metrics_object,
4343                );
4344
4345                let source_compaction_window = source.custom_logical_compaction_window;
4346
4347                // Get new ingestion description for storage.
4348                let desc = match &source.data_source {
4349                    DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
4350                        .desc
4351                        .clone()
4352                        .into_inline_connection(self.catalog().state()),
4353                    _ => unreachable!("already verified of type ingestion"),
4354                };
4355
4356                self.controller
4357                    .storage
4358                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4359                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4360
4361                // Redefine source. This must be done before we create any new
4362                // subsources so that it has the right ingestion.
4363                let mut ops = vec![catalog::Op::UpdateItem {
4364                    id: item_id,
4365                    // Look this up again so we don't have to hold an immutable reference to the
4366                    // entry for so long.
4367                    name: self.catalog.get_entry(&item_id).name().clone(),
4368                    to_item: CatalogItem::Source(source),
4369                }];
4370
4371                let CreateSourceInner {
4372                    ops: new_ops,
4373                    sources,
4374                    if_not_exists_ids,
4375                } = self.create_source_inner(session, subsources).await?;
4376
4377                ops.extend(new_ops.into_iter());
4378
4379                assert!(
4380                    if_not_exists_ids.is_empty(),
4381                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4382                );
4383
4384                self.catalog_transact(Some(session), ops).await?;
4385
4386                let mut item_ids = BTreeSet::new();
4387                let mut collections = Vec::with_capacity(sources.len());
4388                for (item_id, source) in sources {
4389                    let status_id = self.catalog().resolve_builtin_storage_collection(
4390                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4391                    );
4392                    let source_status_collection_id =
4393                        Some(self.catalog().get_entry(&status_id).latest_global_id());
4394
4395                    let (data_source, status_collection_id) = match source.data_source {
4396                        // Subsources use source statuses.
4397                        DataSourceDesc::IngestionExport {
4398                            ingestion_id,
4399                            external_reference: _,
4400                            details,
4401                            data_config,
4402                        } => {
4403                            // TODO(parkmycar): We should probably check the type here, but I'm not sure if
4404                            // this will always be a Source or a Table.
4405                            let ingestion_id =
4406                                self.catalog().get_entry(&ingestion_id).latest_global_id();
4407                            (
4408                                DataSource::IngestionExport {
4409                                    ingestion_id,
4410                                    details,
4411                                    data_config: data_config
4412                                        .into_inline_connection(self.catalog().state()),
4413                                },
4414                                source_status_collection_id,
4415                            )
4416                        }
4417                        o => {
4418                            unreachable!(
4419                                "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4420                                o
4421                            )
4422                        }
4423                    };
4424
4425                    collections.push((
4426                        source.global_id,
4427                        CollectionDescription {
4428                            desc: source.desc.clone(),
4429                            data_source,
4430                            since: None,
4431                            status_collection_id,
4432                            timeline: Some(source.timeline.clone()),
4433                        },
4434                    ));
4435
4436                    item_ids.insert(item_id);
4437                }
4438
4439                let storage_metadata = self.catalog.state().storage_metadata();
4440
4441                self.controller
4442                    .storage
4443                    .create_collections(storage_metadata, None, collections)
4444                    .await
4445                    .unwrap_or_terminate("cannot fail to create collections");
4446
4447                self.initialize_storage_read_policies(
4448                    item_ids,
4449                    source_compaction_window.unwrap_or(CompactionWindow::Default),
4450                )
4451                .await;
4452            }
4453            plan::AlterSourceAction::RefreshReferences { references } => {
4454                self.catalog_transact(
4455                    Some(session),
4456                    vec![catalog::Op::UpdateSourceReferences {
4457                        source_id: item_id,
4458                        references: references.into(),
4459                    }],
4460                )
4461                .await?;
4462            }
4463        }
4464
4465        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4466    }
4467
4468    #[instrument]
4469    pub(super) async fn sequence_alter_system_set(
4470        &mut self,
4471        session: &Session,
4472        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4473    ) -> Result<ExecuteResponse, AdapterError> {
4474        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4475        // We want to ensure that the network policy we're switching too actually exists.
4476        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4477            self.validate_alter_system_network_policy(session, &value)?;
4478        }
4479
4480        let op = match value {
4481            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4482                name: name.clone(),
4483                value: OwnedVarInput::SqlSet(values),
4484            },
4485            plan::VariableValue::Default => {
4486                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4487            }
4488        };
4489        self.catalog_transact(Some(session), vec![op]).await?;
4490
4491        session.add_notice(AdapterNotice::VarDefaultUpdated {
4492            role: None,
4493            var_name: Some(name),
4494        });
4495        Ok(ExecuteResponse::AlteredSystemConfiguration)
4496    }
4497
4498    #[instrument]
4499    pub(super) async fn sequence_alter_system_reset(
4500        &mut self,
4501        session: &Session,
4502        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4503    ) -> Result<ExecuteResponse, AdapterError> {
4504        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4505        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4506        self.catalog_transact(Some(session), vec![op]).await?;
4507        session.add_notice(AdapterNotice::VarDefaultUpdated {
4508            role: None,
4509            var_name: Some(name),
4510        });
4511        Ok(ExecuteResponse::AlteredSystemConfiguration)
4512    }
4513
4514    #[instrument]
4515    pub(super) async fn sequence_alter_system_reset_all(
4516        &mut self,
4517        session: &Session,
4518        _: plan::AlterSystemResetAllPlan,
4519    ) -> Result<ExecuteResponse, AdapterError> {
4520        self.is_user_allowed_to_alter_system(session, None)?;
4521        let op = catalog::Op::ResetAllSystemConfiguration;
4522        self.catalog_transact(Some(session), vec![op]).await?;
4523        session.add_notice(AdapterNotice::VarDefaultUpdated {
4524            role: None,
4525            var_name: None,
4526        });
4527        Ok(ExecuteResponse::AlteredSystemConfiguration)
4528    }
4529
4530    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4531    fn is_user_allowed_to_alter_system(
4532        &self,
4533        session: &Session,
4534        var_name: Option<&str>,
4535    ) -> Result<(), AdapterError> {
4536        match (session.user().kind(), var_name) {
4537            // Only internal superusers can reset all system variables.
4538            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4539            // Whether or not a variable can be modified depends if we're an internal superuser.
4540            (UserKind::Superuser, Some(name))
4541                if session.user().is_internal()
4542                    || self.catalog().system_config().user_modifiable(name) =>
4543            {
4544                // In lieu of plumbing the user to all system config functions, just check that
4545                // the var is visible.
4546                let var = self.catalog().system_config().get(name)?;
4547                var.visible(session.user(), self.catalog().system_config())?;
4548                Ok(())
4549            }
4550            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4551            // session variables.
4552            (UserKind::Regular, Some(name))
4553                if self.catalog().system_config().user_modifiable(name) =>
4554            {
4555                Err(AdapterError::Unauthorized(
4556                    rbac::UnauthorizedError::Superuser {
4557                        action: format!("toggle the '{name}' system configuration parameter"),
4558                    },
4559                ))
4560            }
4561            _ => Err(AdapterError::Unauthorized(
4562                rbac::UnauthorizedError::MzSystem {
4563                    action: "alter system".into(),
4564                },
4565            )),
4566        }
4567    }
4568
4569    fn validate_alter_system_network_policy(
4570        &self,
4571        session: &Session,
4572        policy_value: &plan::VariableValue,
4573    ) -> Result<(), AdapterError> {
4574        let policy_name = match &policy_value {
4575            // Make sure the compiled in default still exists.
4576            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4577            plan::VariableValue::Values(values) if values.len() == 1 => {
4578                values.iter().next().cloned()
4579            }
4580            plan::VariableValue::Values(values) => {
4581                tracing::warn!(?values, "can't set multiple network policies at once");
4582                None
4583            }
4584        };
4585        let maybe_network_policy = policy_name
4586            .as_ref()
4587            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4588        let Some(network_policy) = maybe_network_policy else {
4589            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4590                VarError::InvalidParameterValue {
4591                    name: NETWORK_POLICY.name(),
4592                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4593                    reason: "no network policy with such name exists".to_string(),
4594                },
4595            )));
4596        };
4597        self.validate_alter_network_policy(session, &network_policy.rules)
4598    }
4599
4600    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4601    ///
4602    /// This helps prevent users from modifying network policies in a way that would lock out their
4603    /// current connection.
4604    fn validate_alter_network_policy(
4605        &self,
4606        session: &Session,
4607        policy_rules: &Vec<NetworkPolicyRule>,
4608    ) -> Result<(), AdapterError> {
4609        // If the user is not an internal user attempt to protect them from
4610        // blocking themselves.
4611        if session.user().is_internal() {
4612            return Ok(());
4613        }
4614        if let Some(ip) = session.meta().client_ip() {
4615            validate_ip_with_policy_rules(ip, policy_rules)
4616                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4617        } else {
4618            // Sessions without IPs are only temporarily constructed for default values
4619            // they should not be permitted here.
4620            return Err(AdapterError::NetworkPolicyDenied(
4621                NetworkPolicyError::MissingIp,
4622            ));
4623        }
4624        Ok(())
4625    }
4626
4627    // Returns the name of the portal to execute.
4628    #[instrument]
4629    pub(super) fn sequence_execute(
4630        &mut self,
4631        session: &mut Session,
4632        plan: plan::ExecutePlan,
4633    ) -> Result<String, AdapterError> {
4634        // Verify the stmt is still valid.
4635        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4636        let ps = session
4637            .get_prepared_statement_unverified(&plan.name)
4638            .expect("known to exist");
4639        let stmt = ps.stmt().cloned();
4640        let desc = ps.desc().clone();
4641        let state_revision = ps.state_revision;
4642        let logging = Arc::clone(ps.logging());
4643        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4644    }
4645
4646    #[instrument]
4647    pub(super) async fn sequence_grant_privileges(
4648        &mut self,
4649        session: &Session,
4650        plan::GrantPrivilegesPlan {
4651            update_privileges,
4652            grantees,
4653        }: plan::GrantPrivilegesPlan,
4654    ) -> Result<ExecuteResponse, AdapterError> {
4655        self.sequence_update_privileges(
4656            session,
4657            update_privileges,
4658            grantees,
4659            UpdatePrivilegeVariant::Grant,
4660        )
4661        .await
4662    }
4663
4664    #[instrument]
4665    pub(super) async fn sequence_revoke_privileges(
4666        &mut self,
4667        session: &Session,
4668        plan::RevokePrivilegesPlan {
4669            update_privileges,
4670            revokees,
4671        }: plan::RevokePrivilegesPlan,
4672    ) -> Result<ExecuteResponse, AdapterError> {
4673        self.sequence_update_privileges(
4674            session,
4675            update_privileges,
4676            revokees,
4677            UpdatePrivilegeVariant::Revoke,
4678        )
4679        .await
4680    }
4681
4682    #[instrument]
4683    async fn sequence_update_privileges(
4684        &mut self,
4685        session: &Session,
4686        update_privileges: Vec<UpdatePrivilege>,
4687        grantees: Vec<RoleId>,
4688        variant: UpdatePrivilegeVariant,
4689    ) -> Result<ExecuteResponse, AdapterError> {
4690        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4691        let mut warnings = Vec::new();
4692        let catalog = self.catalog().for_session(session);
4693
4694        for UpdatePrivilege {
4695            acl_mode,
4696            target_id,
4697            grantor,
4698        } in update_privileges
4699        {
4700            let actual_object_type = catalog.get_system_object_type(&target_id);
4701            // For all relations we allow all applicable table privileges, but send a warning if the
4702            // privilege isn't actually applicable to the object type.
4703            if actual_object_type.is_relation() {
4704                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4705                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4706                if !non_applicable_privileges.is_empty() {
4707                    let object_description =
4708                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4709                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4710                        non_applicable_privileges,
4711                        object_description,
4712                    })
4713                }
4714            }
4715
4716            if let SystemObjectId::Object(object_id) = &target_id {
4717                self.catalog()
4718                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4719            }
4720
4721            let privileges = self
4722                .catalog()
4723                .get_privileges(&target_id, session.conn_id())
4724                // Should be unreachable since the parser will refuse to parse grant/revoke
4725                // statements on objects without privileges.
4726                .ok_or(AdapterError::Unsupported(
4727                    "GRANTs/REVOKEs on an object type with no privileges",
4728                ))?;
4729
4730            for grantee in &grantees {
4731                self.catalog().ensure_not_system_role(grantee)?;
4732                self.catalog().ensure_not_predefined_role(grantee)?;
4733                let existing_privilege = privileges
4734                    .get_acl_item(grantee, &grantor)
4735                    .map(Cow::Borrowed)
4736                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4737
4738                match variant {
4739                    UpdatePrivilegeVariant::Grant
4740                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4741                    {
4742                        ops.push(catalog::Op::UpdatePrivilege {
4743                            target_id: target_id.clone(),
4744                            privilege: MzAclItem {
4745                                grantee: *grantee,
4746                                grantor,
4747                                acl_mode,
4748                            },
4749                            variant,
4750                        });
4751                    }
4752                    UpdatePrivilegeVariant::Revoke
4753                        if !existing_privilege
4754                            .acl_mode
4755                            .intersection(acl_mode)
4756                            .is_empty() =>
4757                    {
4758                        ops.push(catalog::Op::UpdatePrivilege {
4759                            target_id: target_id.clone(),
4760                            privilege: MzAclItem {
4761                                grantee: *grantee,
4762                                grantor,
4763                                acl_mode,
4764                            },
4765                            variant,
4766                        });
4767                    }
4768                    // no-op
4769                    _ => {}
4770                }
4771            }
4772        }
4773
4774        if ops.is_empty() {
4775            session.add_notices(warnings);
4776            return Ok(variant.into());
4777        }
4778
4779        let res = self
4780            .catalog_transact(Some(session), ops)
4781            .await
4782            .map(|_| match variant {
4783                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4784                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4785            });
4786        if res.is_ok() {
4787            session.add_notices(warnings);
4788        }
4789        res
4790    }
4791
4792    #[instrument]
4793    pub(super) async fn sequence_alter_default_privileges(
4794        &mut self,
4795        session: &Session,
4796        plan::AlterDefaultPrivilegesPlan {
4797            privilege_objects,
4798            privilege_acl_items,
4799            is_grant,
4800        }: plan::AlterDefaultPrivilegesPlan,
4801    ) -> Result<ExecuteResponse, AdapterError> {
4802        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4803        let variant = if is_grant {
4804            UpdatePrivilegeVariant::Grant
4805        } else {
4806            UpdatePrivilegeVariant::Revoke
4807        };
4808        for privilege_object in &privilege_objects {
4809            self.catalog()
4810                .ensure_not_system_role(&privilege_object.role_id)?;
4811            self.catalog()
4812                .ensure_not_predefined_role(&privilege_object.role_id)?;
4813            if let Some(database_id) = privilege_object.database_id {
4814                self.catalog()
4815                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4816            }
4817            if let Some(schema_id) = privilege_object.schema_id {
4818                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4819                let schema_spec: SchemaSpecifier = schema_id.into();
4820
4821                self.catalog().ensure_not_reserved_object(
4822                    &(database_spec, schema_spec).into(),
4823                    session.conn_id(),
4824                )?;
4825            }
4826            for privilege_acl_item in &privilege_acl_items {
4827                self.catalog()
4828                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4829                self.catalog()
4830                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4831                ops.push(catalog::Op::UpdateDefaultPrivilege {
4832                    privilege_object: privilege_object.clone(),
4833                    privilege_acl_item: privilege_acl_item.clone(),
4834                    variant,
4835                })
4836            }
4837        }
4838
4839        self.catalog_transact(Some(session), ops).await?;
4840        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4841    }
4842
4843    #[instrument]
4844    pub(super) async fn sequence_grant_role(
4845        &mut self,
4846        session: &Session,
4847        plan::GrantRolePlan {
4848            role_ids,
4849            member_ids,
4850            grantor_id,
4851        }: plan::GrantRolePlan,
4852    ) -> Result<ExecuteResponse, AdapterError> {
4853        let catalog = self.catalog();
4854        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4855        for role_id in role_ids {
4856            for member_id in &member_ids {
4857                let member_membership: BTreeSet<_> =
4858                    catalog.get_role(member_id).membership().keys().collect();
4859                if member_membership.contains(&role_id) {
4860                    let role_name = catalog.get_role(&role_id).name().to_string();
4861                    let member_name = catalog.get_role(member_id).name().to_string();
4862                    // We need this check so we don't accidentally return a success on a reserved role.
4863                    catalog.ensure_not_reserved_role(member_id)?;
4864                    catalog.ensure_grantable_role(&role_id)?;
4865                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4866                        role_name,
4867                        member_name,
4868                    });
4869                } else {
4870                    ops.push(catalog::Op::GrantRole {
4871                        role_id,
4872                        member_id: *member_id,
4873                        grantor_id,
4874                    });
4875                }
4876            }
4877        }
4878
4879        if ops.is_empty() {
4880            return Ok(ExecuteResponse::GrantedRole);
4881        }
4882
4883        self.catalog_transact(Some(session), ops)
4884            .await
4885            .map(|_| ExecuteResponse::GrantedRole)
4886    }
4887
4888    #[instrument]
4889    pub(super) async fn sequence_revoke_role(
4890        &mut self,
4891        session: &Session,
4892        plan::RevokeRolePlan {
4893            role_ids,
4894            member_ids,
4895            grantor_id,
4896        }: plan::RevokeRolePlan,
4897    ) -> Result<ExecuteResponse, AdapterError> {
4898        let catalog = self.catalog();
4899        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4900        for role_id in role_ids {
4901            for member_id in &member_ids {
4902                let member_membership: BTreeSet<_> =
4903                    catalog.get_role(member_id).membership().keys().collect();
4904                if !member_membership.contains(&role_id) {
4905                    let role_name = catalog.get_role(&role_id).name().to_string();
4906                    let member_name = catalog.get_role(member_id).name().to_string();
4907                    // We need this check so we don't accidentally return a success on a reserved role.
4908                    catalog.ensure_not_reserved_role(member_id)?;
4909                    catalog.ensure_grantable_role(&role_id)?;
4910                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4911                        role_name,
4912                        member_name,
4913                    });
4914                } else {
4915                    ops.push(catalog::Op::RevokeRole {
4916                        role_id,
4917                        member_id: *member_id,
4918                        grantor_id,
4919                    });
4920                }
4921            }
4922        }
4923
4924        if ops.is_empty() {
4925            return Ok(ExecuteResponse::RevokedRole);
4926        }
4927
4928        self.catalog_transact(Some(session), ops)
4929            .await
4930            .map(|_| ExecuteResponse::RevokedRole)
4931    }
4932
4933    #[instrument]
4934    pub(super) async fn sequence_alter_owner(
4935        &mut self,
4936        session: &Session,
4937        plan::AlterOwnerPlan {
4938            id,
4939            object_type,
4940            new_owner,
4941        }: plan::AlterOwnerPlan,
4942    ) -> Result<ExecuteResponse, AdapterError> {
4943        let mut ops = vec![catalog::Op::UpdateOwner {
4944            id: id.clone(),
4945            new_owner,
4946        }];
4947
4948        match &id {
4949            ObjectId::Item(global_id) => {
4950                let entry = self.catalog().get_entry(global_id);
4951
4952                // Cannot directly change the owner of an index.
4953                if entry.is_index() {
4954                    let name = self
4955                        .catalog()
4956                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4957                        .to_string();
4958                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4959                    return Ok(ExecuteResponse::AlteredObject(object_type));
4960                }
4961
4962                // Alter owner cascades down to dependent indexes.
4963                let dependent_index_ops = entry
4964                    .used_by()
4965                    .into_iter()
4966                    .filter(|id| self.catalog().get_entry(id).is_index())
4967                    .map(|id| catalog::Op::UpdateOwner {
4968                        id: ObjectId::Item(*id),
4969                        new_owner,
4970                    });
4971                ops.extend(dependent_index_ops);
4972
4973                // Alter owner cascades down to progress collections.
4974                let dependent_subsources =
4975                    entry
4976                        .progress_id()
4977                        .into_iter()
4978                        .map(|item_id| catalog::Op::UpdateOwner {
4979                            id: ObjectId::Item(item_id),
4980                            new_owner,
4981                        });
4982                ops.extend(dependent_subsources);
4983            }
4984            ObjectId::Cluster(cluster_id) => {
4985                let cluster = self.catalog().get_cluster(*cluster_id);
4986                // Alter owner cascades down to cluster replicas.
4987                let managed_cluster_replica_ops =
4988                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4989                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4990                        new_owner,
4991                    });
4992                ops.extend(managed_cluster_replica_ops);
4993            }
4994            _ => {}
4995        }
4996
4997        self.catalog_transact(Some(session), ops)
4998            .await
4999            .map(|_| ExecuteResponse::AlteredObject(object_type))
5000    }
5001
5002    #[instrument]
5003    pub(super) async fn sequence_reassign_owned(
5004        &mut self,
5005        session: &Session,
5006        plan::ReassignOwnedPlan {
5007            old_roles,
5008            new_role,
5009            reassign_ids,
5010        }: plan::ReassignOwnedPlan,
5011    ) -> Result<ExecuteResponse, AdapterError> {
5012        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5013            self.catalog().ensure_not_reserved_role(role_id)?;
5014        }
5015
5016        let ops = reassign_ids
5017            .into_iter()
5018            .map(|id| catalog::Op::UpdateOwner {
5019                id,
5020                new_owner: new_role,
5021            })
5022            .collect();
5023
5024        self.catalog_transact(Some(session), ops)
5025            .await
5026            .map(|_| ExecuteResponse::ReassignOwned)
5027    }
5028
5029    #[instrument]
5030    pub(crate) async fn handle_deferred_statement(&mut self) {
5031        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
5032        // was processed, removing the single element from deferred_statements, so it is expected
5033        // that this is sometimes empty.
5034        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5035            return;
5036        };
5037        match ps {
5038            crate::coord::PlanStatement::Statement { stmt, params } => {
5039                self.handle_execute_inner(stmt, params, ctx).await;
5040            }
5041            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5042                self.sequence_plan(ctx, plan, resolved_ids).await;
5043            }
5044        }
5045    }
5046
5047    #[instrument]
5048    // TODO(parkmycar): Remove this once we have an actual implementation.
5049    #[allow(clippy::unused_async)]
5050    pub(super) async fn sequence_alter_table(
5051        &mut self,
5052        ctx: &mut ExecuteContext,
5053        plan: plan::AlterTablePlan,
5054    ) -> Result<ExecuteResponse, AdapterError> {
5055        let plan::AlterTablePlan {
5056            relation_id,
5057            column_name,
5058            column_type,
5059            raw_sql_type,
5060        } = plan;
5061
5062        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
5063        let id_ts = self.get_catalog_write_ts().await;
5064        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5065        let ops = vec![catalog::Op::AlterAddColumn {
5066            id: relation_id,
5067            new_global_id,
5068            name: column_name,
5069            typ: column_type,
5070            sql: raw_sql_type,
5071        }];
5072
5073        let entry = self.catalog().get_entry(&relation_id);
5074        let CatalogItem::Table(table) = &entry.item else {
5075            let err = format!("expected table, found {:?}", entry.item);
5076            return Err(AdapterError::Internal(err));
5077        };
5078        // Expected schema version, before altering the table.
5079        let expected_version = table.desc.latest_version();
5080        let existing_global_id = table.global_id_writes();
5081
5082        self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5083            Box::pin(async move {
5084                let entry = coord.catalog().get_entry(&relation_id);
5085                let CatalogItem::Table(table) = &entry.item else {
5086                    panic!("programming error, expected table found {:?}", entry.item);
5087                };
5088                let table = table.clone();
5089
5090                // Acquire a read hold on the original table for the duration of
5091                // the alter to prevent the since of the original table from
5092                // getting advanced, while the ALTER is running.
5093                let existing_table = crate::CollectionIdBundle {
5094                    storage_ids: btreeset![existing_global_id],
5095                    compute_ids: BTreeMap::new(),
5096                };
5097                let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5098
5099                let new_version = table.desc.latest_version();
5100                let new_desc = table
5101                    .desc
5102                    .at_version(RelationVersionSelector::Specific(new_version));
5103                let register_ts = coord.get_local_write_ts().await.timestamp;
5104
5105                // Alter the table description, creating a "new" collection.
5106                coord
5107                    .controller
5108                    .storage
5109                    .alter_table_desc(
5110                        existing_global_id,
5111                        new_global_id,
5112                        new_desc,
5113                        expected_version,
5114                        register_ts,
5115                    )
5116                    .await
5117                    .expect("failed to alter desc of table");
5118
5119                // Initialize the ReadPolicy which ensures we have the correct read holds.
5120                let compaction_window = table
5121                    .custom_logical_compaction_window
5122                    .unwrap_or(CompactionWindow::Default);
5123                coord
5124                    .initialize_read_policies(
5125                        &crate::CollectionIdBundle {
5126                            storage_ids: btreeset![new_global_id],
5127                            compute_ids: BTreeMap::new(),
5128                        },
5129                        compaction_window,
5130                    )
5131                    .await;
5132                coord.apply_local_write(register_ts).await;
5133
5134                // Alter is complete! We can drop our read hold.
5135                drop(existing_table_read_hold);
5136            })
5137        })
5138        .await?;
5139
5140        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5141    }
5142}
5143
5144#[derive(Debug)]
5145struct CachedStatisticsOracle {
5146    cache: BTreeMap<GlobalId, usize>,
5147}
5148
5149impl CachedStatisticsOracle {
5150    pub async fn new<T: TimelyTimestamp>(
5151        ids: &BTreeSet<GlobalId>,
5152        as_of: &Antichain<T>,
5153        storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5154    ) -> Result<Self, StorageError<T>> {
5155        let mut cache = BTreeMap::new();
5156
5157        for id in ids {
5158            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5159
5160            match stats {
5161                Ok(stats) => {
5162                    cache.insert(*id, stats.num_updates);
5163                }
5164                Err(StorageError::IdentifierMissing(id)) => {
5165                    ::tracing::debug!("no statistics for {id}")
5166                }
5167                Err(e) => return Err(e),
5168            }
5169        }
5170
5171        Ok(Self { cache })
5172    }
5173}
5174
5175impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5176    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5177        self.cache.get(&id).map(|estimate| *estimate)
5178    }
5179
5180    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5181        self.cache.clone()
5182    }
5183}
5184
5185impl Coordinator {
5186    pub(super) async fn statistics_oracle(
5187        &self,
5188        session: &Session,
5189        source_ids: &BTreeSet<GlobalId>,
5190        query_as_of: &Antichain<Timestamp>,
5191        is_oneshot: bool,
5192    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5193        if !session.vars().enable_session_cardinality_estimates() {
5194            return Ok(Box::new(EmptyStatisticsOracle));
5195        }
5196
5197        let timeout = if is_oneshot {
5198            // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
5199            self.catalog()
5200                .system_config()
5201                .optimizer_oneshot_stats_timeout()
5202        } else {
5203            self.catalog().system_config().optimizer_stats_timeout()
5204        };
5205
5206        let cached_stats = mz_ore::future::timeout(
5207            timeout,
5208            CachedStatisticsOracle::new(
5209                source_ids,
5210                query_as_of,
5211                self.controller.storage_collections.as_ref(),
5212            ),
5213        )
5214        .await;
5215
5216        match cached_stats {
5217            Ok(stats) => Ok(Box::new(stats)),
5218            Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5219                warn!(
5220                    is_oneshot = is_oneshot,
5221                    "optimizer statistics collection timed out after {}ms",
5222                    timeout.as_millis()
5223                );
5224
5225                Ok(Box::new(EmptyStatisticsOracle))
5226            }
5227            Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5228        }
5229    }
5230}
5231
5232/// Checks whether we should emit diagnostic
5233/// information associated with reading per-replica sources.
5234///
5235/// If an unrecoverable error is found (today: an untargeted read on a
5236/// cluster with a non-1 number of replicas), return that.  Otherwise,
5237/// return a list of associated notices (today: we always emit exactly
5238/// one notice if there are any per-replica log dependencies and if
5239/// `emit_introspection_query_notice` is set, and none otherwise.)
5240pub(super) fn check_log_reads(
5241    catalog: &Catalog,
5242    cluster: &Cluster,
5243    source_ids: &BTreeSet<GlobalId>,
5244    target_replica: &mut Option<ReplicaId>,
5245    vars: &SessionVars,
5246) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5247where
5248{
5249    let log_names = source_ids
5250        .iter()
5251        .map(|gid| catalog.resolve_item_id(gid))
5252        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5253        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5254        .collect::<Vec<_>>();
5255
5256    if log_names.is_empty() {
5257        return Ok(None);
5258    }
5259
5260    // Reading from log sources on replicated clusters is only allowed if a
5261    // target replica is selected. Otherwise, we have no way of knowing which
5262    // replica we read the introspection data from.
5263    let num_replicas = cluster.replicas().count();
5264    if target_replica.is_none() {
5265        if num_replicas == 1 {
5266            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5267        } else {
5268            return Err(AdapterError::UntargetedLogRead { log_names });
5269        }
5270    }
5271
5272    // Ensure that logging is initialized for the target replica, lest
5273    // we try to read from a non-existing arrangement.
5274    let replica_id = target_replica.expect("set to `Some` above");
5275    let replica = &cluster.replica(replica_id).expect("Replica must exist");
5276    if !replica.config.compute.logging.enabled() {
5277        return Err(AdapterError::IntrospectionDisabled { log_names });
5278    }
5279
5280    Ok(vars
5281        .emit_introspection_query_notice()
5282        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5283}
5284
5285impl Coordinator {
5286    /// Forward notices that we got from the optimizer.
5287    fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5288        // `for_session` below is expensive, so return early if there's nothing to do.
5289        if notices.is_empty() {
5290            return;
5291        }
5292        let humanizer = self.catalog.for_session(session);
5293        let system_vars = self.catalog.system_config();
5294        for notice in notices {
5295            let kind = OptimizerNoticeKind::from(notice);
5296            let notice_enabled = match kind {
5297                OptimizerNoticeKind::IndexAlreadyExists => {
5298                    system_vars.enable_notices_for_index_already_exists()
5299                }
5300                OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5301                    system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5302                }
5303                OptimizerNoticeKind::IndexKeyEmpty => {
5304                    system_vars.enable_notices_for_index_empty_key()
5305                }
5306            };
5307            if notice_enabled {
5308                // We don't need to redact the notice parts because
5309                // `emit_optimizer_notices` is only called by the `sequence_~`
5310                // method for the statement that produces that notice.
5311                session.add_notice(AdapterNotice::OptimizerNotice {
5312                    notice: notice.message(&humanizer, false).to_string(),
5313                    hint: notice.hint(&humanizer, false).to_string(),
5314                });
5315            }
5316            self.metrics
5317                .optimization_notices
5318                .with_label_values(&[kind.metric_label()])
5319                .inc_by(1);
5320        }
5321    }
5322
5323    /// Process the metainfo from a newly created non-transient dataflow.
5324    async fn process_dataflow_metainfo(
5325        &mut self,
5326        df_meta: DataflowMetainfo,
5327        export_id: GlobalId,
5328        ctx: Option<&mut ExecuteContext>,
5329        notice_ids: Vec<GlobalId>,
5330    ) -> Option<BuiltinTableAppendNotify> {
5331        // Emit raw notices to the user.
5332        if let Some(ctx) = ctx {
5333            self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5334        }
5335
5336        // Create a metainfo with rendered notices.
5337        let df_meta = self
5338            .catalog()
5339            .render_notices(df_meta, notice_ids, Some(export_id));
5340
5341        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
5342        // in-memory state.
5343        if self.catalog().state().system_config().enable_mz_notices()
5344            && !df_meta.optimizer_notices.is_empty()
5345        {
5346            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5347            self.catalog().state().pack_optimizer_notices(
5348                &mut builtin_table_updates,
5349                df_meta.optimizer_notices.iter(),
5350                Diff::ONE,
5351            );
5352
5353            // Save the metainfo.
5354            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5355
5356            Some(
5357                self.builtin_table_update()
5358                    .execute(builtin_table_updates)
5359                    .await
5360                    .0,
5361            )
5362        } else {
5363            // Save the metainfo.
5364            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5365
5366            None
5367        }
5368    }
5369}