mz_adapter/coord/sequencer/
inner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_SELF_MANAGED_AUTH};
26use mz_catalog::memory::objects::{
27    CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32    CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::vec::VecExt;
39use mz_ore::{assert_none, instrument};
40use mz_persist_client::stats::SnapshotPartStats;
41use mz_repr::adt::jsonb::Jsonb;
42use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
43use mz_repr::explain::ExprHumanizer;
44use mz_repr::explain::json::json_string;
45use mz_repr::role_id::RoleId;
46use mz_repr::{
47    CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
48    RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
49};
50use mz_sql::ast::{
51    AlterSourceAddSubsourceOption, SqlServerConfigOption, SqlServerConfigOptionName,
52};
53use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
54use mz_sql::catalog::{
55    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
56    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
57    ErrorMessageObjectDescription, ObjectType, RoleAttributes, RoleVars, SessionCatalog,
58};
59use mz_sql::names::{
60    Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
61    SchemaSpecifier, SystemObjectId,
62};
63use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
68use mz_sql::plan::{
69    AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70    Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71    PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76    self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
77    TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82    ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83    MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84    WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_storage_types::stats::RelationPartStats;
92use mz_transform::EmptyStatisticsOracle;
93use mz_transform::dataflow::DataflowMetainfo;
94use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
95use smallvec::SmallVec;
96use timely::progress::Antichain;
97use timely::progress::Timestamp as TimelyTimestamp;
98use tokio::sync::{oneshot, watch};
99use tracing::{Instrument, Span, info, warn};
100
101use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
102use crate::command::{ExecuteResponse, Response};
103use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
104use crate::coord::{
105    AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
106    CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
107    Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
108    PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
109    validate_ip_with_policy_rules,
110};
111use crate::error::AdapterError;
112use crate::notice::{AdapterNotice, DroppedInUseIndex};
113use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
114use crate::optimize::{self, Optimize};
115use crate::session::{
116    EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
117    WriteLocks, WriteOp,
118};
119use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
120use crate::{PeekResponseUnary, ReadHolds};
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133/// Attempts to evaluate an expression. If an error is returned then the error is sent
134/// to the client and the function is exited.
135macro_rules! return_if_err {
136    ($expr:expr, $ctx:expr) => {
137        match $expr {
138            Ok(v) => v,
139            Err(e) => return $ctx.retire(Err(e.into())),
140        }
141    };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147    ops: Vec<catalog::Op>,
148    dropped_active_db: bool,
149    dropped_active_cluster: bool,
150    dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153// A bundle of values returned from create_source_inner
154struct CreateSourceInner {
155    ops: Vec<catalog::Op>,
156    sources: Vec<(CatalogItemId, Source)>,
157    if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161    /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
162    /// execute both on and off of the coordinator thread. Stages can either produce another stage
163    /// to execute or a final response. An explicit [Span] is passed to allow for convenient
164    /// tracing.
165    pub(crate) async fn sequence_staged<S>(
166        &mut self,
167        mut ctx: S::Ctx,
168        parent_span: Span,
169        mut stage: S,
170    ) where
171        S: Staged + 'static,
172        S::Ctx: Send + 'static,
173    {
174        return_if_err!(stage.validity().check(self.catalog()), ctx);
175        loop {
176            let mut cancel_enabled = stage.cancel_enabled();
177            if let Some(session) = ctx.session() {
178                if cancel_enabled {
179                    // Channel to await cancellation. Insert a new channel, but check if the previous one
180                    // was already canceled.
181                    if let Some((_prev_tx, prev_rx)) = self
182                        .staged_cancellation
183                        .insert(session.conn_id().clone(), watch::channel(false))
184                    {
185                        let was_canceled = *prev_rx.borrow();
186                        if was_canceled {
187                            ctx.retire(Err(AdapterError::Canceled));
188                            return;
189                        }
190                    }
191                } else {
192                    // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
193                    // when cancel_enabled may have been true on an earlier stage.
194                    self.staged_cancellation.remove(session.conn_id());
195                }
196            } else {
197                cancel_enabled = false
198            };
199            let next = stage
200                .stage(self, &mut ctx)
201                .instrument(parent_span.clone())
202                .await;
203            let res = return_if_err!(next, ctx);
204            stage = match res {
205                StageResult::Handle(handle) => {
206                    let internal_cmd_tx = self.internal_cmd_tx.clone();
207                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208                        let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209                    });
210                    return;
211                }
212                StageResult::HandleRetire(handle) => {
213                    self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214                        ctx.retire(Ok(resp));
215                    });
216                    return;
217                }
218                StageResult::Response(resp) => {
219                    ctx.retire(Ok(resp));
220                    return;
221                }
222                StageResult::Immediate(stage) => *stage,
223            }
224        }
225    }
226
227    fn handle_spawn<C, T, F>(
228        &self,
229        ctx: C,
230        handle: JoinHandle<Result<T, AdapterError>>,
231        cancel_enabled: bool,
232        f: F,
233    ) where
234        C: StagedContext + Send + 'static,
235        T: Send + 'static,
236        F: FnOnce(C, T) + Send + 'static,
237    {
238        let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239            .session()
240            .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241        {
242            let mut rx = rx.clone();
243            Box::pin(async move {
244                // Wait for true or dropped sender.
245                let _ = rx.wait_for(|v| *v).await;
246                ()
247            })
248        } else {
249            Box::pin(future::pending())
250        };
251        spawn(|| "sequence_staged", async move {
252            tokio::select! {
253                res = handle => {
254                    let next = match res {
255                        Ok(next) => return_if_err!(next, ctx),
256                        Err(err) => {
257                            tracing::error!("sequence_staged join error {err}");
258                            ctx.retire(Err(AdapterError::Internal(
259                                "sequence_staged join error".into(),
260                            )));
261                            return;
262                        }
263                    };
264                    f(ctx, next);
265                }
266                _ = rx, if cancel_enabled => {
267                    ctx.retire(Err(AdapterError::Canceled));
268                }
269            }
270        });
271    }
272
273    async fn create_source_inner(
274        &self,
275        session: &Session,
276        plans: Vec<plan::CreateSourcePlanBundle>,
277    ) -> Result<CreateSourceInner, AdapterError> {
278        let mut ops = vec![];
279        let mut sources = vec![];
280
281        let if_not_exists_ids = plans
282            .iter()
283            .filter_map(
284                |plan::CreateSourcePlanBundle {
285                     item_id,
286                     global_id: _,
287                     plan,
288                     resolved_ids: _,
289                     available_source_references: _,
290                 }| {
291                    if plan.if_not_exists {
292                        Some((*item_id, plan.name.clone()))
293                    } else {
294                        None
295                    }
296                },
297            )
298            .collect::<BTreeMap<_, _>>();
299
300        for plan::CreateSourcePlanBundle {
301            item_id,
302            global_id,
303            mut plan,
304            resolved_ids,
305            available_source_references,
306        } in plans
307        {
308            let name = plan.name.clone();
309
310            match plan.source.data_source {
311                plan::DataSourceDesc::Ingestion(ref ingestion) => {
312                    let cluster_id = plan
313                        .in_cluster
314                        .expect("ingestion plans must specify cluster");
315                    match ingestion.desc.connection {
316                        GenericSourceConnection::Postgres(_)
317                        | GenericSourceConnection::MySql(_)
318                        | GenericSourceConnection::SqlServer(_)
319                        | GenericSourceConnection::Kafka(_)
320                        | GenericSourceConnection::LoadGenerator(_) => {
321                            if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
322                                let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
323                                    .get(self.catalog().system_config().dyncfgs());
324
325                                if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
326                                {
327                                    return Err(AdapterError::Unsupported(
328                                        "sources in clusters with >1 replicas",
329                                    ));
330                                }
331                            }
332                        }
333                    }
334                }
335                plan::DataSourceDesc::Webhook { .. } => {
336                    let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
337                    if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
338                        if cluster.replica_ids().len() > 1 {
339                            return Err(AdapterError::Unsupported(
340                                "webhook sources in clusters with >1 replicas",
341                            ));
342                        }
343                    }
344                }
345                plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
346            }
347
348            // Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
349            if let mz_sql::plan::DataSourceDesc::Webhook {
350                validate_using: Some(validate),
351                ..
352            } = &mut plan.source.data_source
353            {
354                if let Err(reason) = validate.reduce_expression().await {
355                    self.metrics
356                        .webhook_validation_reduce_failures
357                        .with_label_values(&[reason])
358                        .inc();
359                    return Err(AdapterError::Internal(format!(
360                        "failed to reduce check expression, {reason}"
361                    )));
362                }
363            }
364
365            // If this source contained a set of available source references, update the
366            // source references catalog table.
367            let mut reference_ops = vec![];
368            if let Some(references) = &available_source_references {
369                reference_ops.push(catalog::Op::UpdateSourceReferences {
370                    source_id: item_id,
371                    references: references.clone().into(),
372                });
373            }
374
375            let source = Source::new(plan, global_id, resolved_ids, None, false);
376            ops.push(catalog::Op::CreateItem {
377                id: item_id,
378                name,
379                item: CatalogItem::Source(source.clone()),
380                owner_id: *session.current_role_id(),
381            });
382            sources.push((item_id, source));
383            // These operations must be executed after the source is added to the catalog.
384            ops.extend(reference_ops);
385        }
386
387        Ok(CreateSourceInner {
388            ops,
389            sources,
390            if_not_exists_ids,
391        })
392    }
393
394    /// Subsources are planned differently from other statements because they
395    /// are typically synthesized from other statements, e.g. `CREATE SOURCE`.
396    /// Because of this, we have usually "missed" the opportunity to plan them
397    /// through the normal statement execution life cycle (the exception being
398    /// during bootstrapping).
399    ///
400    /// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
401    pub(crate) fn plan_subsource(
402        &self,
403        session: &Session,
404        params: &mz_sql::plan::Params,
405        subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
406        item_id: CatalogItemId,
407        global_id: GlobalId,
408    ) -> Result<CreateSourcePlanBundle, AdapterError> {
409        let catalog = self.catalog().for_session(session);
410        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
411
412        let plan = self.plan_statement(
413            session,
414            Statement::CreateSubsource(subsource_stmt),
415            params,
416            &resolved_ids,
417        )?;
418        let plan = match plan {
419            Plan::CreateSource(plan) => plan,
420            _ => unreachable!(),
421        };
422        Ok(CreateSourcePlanBundle {
423            item_id,
424            global_id,
425            plan,
426            resolved_ids,
427            available_source_references: None,
428        })
429    }
430
431    /// Prepares an `ALTER SOURCE...ADD SUBSOURCE`.
432    pub(crate) async fn plan_purified_alter_source_add_subsource(
433        &mut self,
434        session: &Session,
435        params: Params,
436        source_name: ResolvedItemName,
437        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
438        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
439    ) -> Result<(Plan, ResolvedIds), AdapterError> {
440        let mut subsource_plans = Vec::with_capacity(subsources.len());
441
442        // Generate subsource statements
443        let conn_catalog = self.catalog().for_system_session();
444        let pcx = plan::PlanContext::zero();
445        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
446
447        let entry = self.catalog().get_entry(source_name.item_id());
448        let source = entry.source().ok_or_else(|| {
449            AdapterError::internal(
450                "plan alter source",
451                format!("expected Source found {entry:?}"),
452            )
453        })?;
454
455        let item_id = entry.id();
456        let ingestion_id = source.global_id();
457        let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
458
459        let id_ts = self.get_catalog_write_ts().await;
460        let ids = self
461            .catalog_mut()
462            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
463            .await?;
464        for (subsource_stmt, (item_id, global_id)) in
465            subsource_stmts.into_iter().zip(ids.into_iter())
466        {
467            let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
468            subsource_plans.push(s);
469        }
470
471        let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
472            subsources: subsource_plans,
473            options,
474        };
475
476        Ok((
477            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
478                item_id,
479                ingestion_id,
480                action,
481            }),
482            ResolvedIds::empty(),
483        ))
484    }
485
486    /// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
487    pub(crate) fn plan_purified_alter_source_refresh_references(
488        &self,
489        _session: &Session,
490        _params: Params,
491        source_name: ResolvedItemName,
492        available_source_references: plan::SourceReferences,
493    ) -> Result<(Plan, ResolvedIds), AdapterError> {
494        let entry = self.catalog().get_entry(source_name.item_id());
495        let source = entry.source().ok_or_else(|| {
496            AdapterError::internal(
497                "plan alter source",
498                format!("expected Source found {entry:?}"),
499            )
500        })?;
501        let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
502            references: available_source_references,
503        };
504
505        Ok((
506            Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
507                item_id: entry.id(),
508                ingestion_id: source.global_id(),
509                action,
510            }),
511            ResolvedIds::empty(),
512        ))
513    }
514
515    /// Prepares a `CREATE SOURCE` statement to create its progress subsource,
516    /// the primary source, and any ingestion export subsources (e.g. PG
517    /// tables).
518    pub(crate) async fn plan_purified_create_source(
519        &mut self,
520        ctx: &ExecuteContext,
521        params: Params,
522        progress_stmt: CreateSubsourceStatement<Aug>,
523        mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
524        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
525        available_source_references: plan::SourceReferences,
526    ) -> Result<(Plan, ResolvedIds), AdapterError> {
527        let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
528
529        // 1. First plan the progress subsource.
530        //
531        // The primary source depends on this subsource because the primary
532        // source needs to know its shard ID, and the easiest way of
533        // guaranteeing that the shard ID is discoverable is to create this
534        // collection first.
535        assert_none!(progress_stmt.of_source);
536        let id_ts = self.get_catalog_write_ts().await;
537        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
538        let progress_plan =
539            self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
540        let progress_full_name = self
541            .catalog()
542            .resolve_full_name(&progress_plan.plan.name, None);
543        let progress_subsource = ResolvedItemName::Item {
544            id: progress_plan.item_id,
545            qualifiers: progress_plan.plan.name.qualifiers.clone(),
546            full_name: progress_full_name,
547            print_id: true,
548            version: RelationVersionSelector::Latest,
549        };
550
551        create_source_plans.push(progress_plan);
552
553        source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
554
555        let catalog = self.catalog().for_session(ctx.session());
556        let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
557
558        // 2. Then plan the main source.
559        let source_plan = match self.plan_statement(
560            ctx.session(),
561            Statement::CreateSource(source_stmt),
562            &params,
563            &resolved_ids,
564        )? {
565            Plan::CreateSource(plan) => plan,
566            p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
567        };
568
569        let id_ts = self.get_catalog_write_ts().await;
570        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
571
572        let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
573        let of_source = ResolvedItemName::Item {
574            id: item_id,
575            qualifiers: source_plan.name.qualifiers.clone(),
576            full_name: source_full_name,
577            print_id: true,
578            version: RelationVersionSelector::Latest,
579        };
580
581        // Generate subsource statements
582        let conn_catalog = self.catalog().for_system_session();
583        let pcx = plan::PlanContext::zero();
584        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
585
586        let subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
587
588        create_source_plans.push(CreateSourcePlanBundle {
589            item_id,
590            global_id,
591            plan: source_plan,
592            resolved_ids: resolved_ids.clone(),
593            available_source_references: Some(available_source_references),
594        });
595
596        // 3. Finally, plan all the subsources
597        let id_ts = self.get_catalog_write_ts().await;
598        let ids = self
599            .catalog_mut()
600            .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
601            .await?;
602        for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip(ids.into_iter()) {
603            let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
604            create_source_plans.push(plan);
605        }
606
607        Ok((
608            Plan::CreateSources(create_source_plans),
609            ResolvedIds::empty(),
610        ))
611    }
612
613    #[instrument]
614    pub(super) async fn sequence_create_source(
615        &mut self,
616        session: &mut Session,
617        plans: Vec<plan::CreateSourcePlanBundle>,
618    ) -> Result<ExecuteResponse, AdapterError> {
619        let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
620        let CreateSourceInner {
621            ops,
622            sources,
623            if_not_exists_ids,
624        } = self.create_source_inner(session, plans).await?;
625
626        let transact_result = self
627            .catalog_transact_with_side_effects(Some(session), ops, |coord| async {
628                // TODO(roshan): This will be unnecessary when we drop support for
629                // automatically created subsources.
630                // To improve the performance of creating a source and many
631                // subsources, we create all of the collections at once. If we
632                // created each collection independently, we would reschedule
633                // the primary source's execution for each subsources, causing
634                // unncessary thrashing.
635                //
636                // Note that creating all of the collections at once should
637                // never be semantic bearing; we should always be able to break
638                // this apart into creating the collections sequentially.
639                let mut collections = Vec::with_capacity(sources.len());
640
641                for (item_id, source) in sources {
642                    let source_status_item_id = coord.catalog().resolve_builtin_storage_collection(
643                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
644                    );
645                    let source_status_collection_id = Some(
646                        coord
647                            .catalog()
648                            .get_entry(&source_status_item_id)
649                            .latest_global_id(),
650                    );
651
652                    let (data_source, status_collection_id) = match source.data_source {
653                        DataSourceDesc::Ingestion {
654                            ingestion_desc:
655                                mz_sql::plan::Ingestion {
656                                    desc,
657                                    progress_subsource,
658                                },
659                            cluster_id,
660                        } => {
661                            let desc = desc.into_inline_connection(coord.catalog().state());
662                            // TODO(parkmycar): We should probably check the type here, but I'm not
663                            // sure if this will always be a Source or a Table.
664                            let progress_subsource = coord
665                                .catalog()
666                                .get_entry(&progress_subsource)
667                                .latest_global_id();
668
669                            let ingestion = mz_storage_types::sources::IngestionDescription::new(
670                                desc,
671                                cluster_id,
672                                progress_subsource,
673                            );
674
675                            (
676                                DataSource::Ingestion(ingestion),
677                                source_status_collection_id,
678                            )
679                        }
680                        DataSourceDesc::IngestionExport {
681                            ingestion_id,
682                            external_reference: _,
683                            details,
684                            data_config,
685                        } => {
686                            // TODO(parkmycar): We should probably check the type here, but I'm not sure if
687                            // this will always be a Source or a Table.
688                            let ingestion_id =
689                                coord.catalog().get_entry(&ingestion_id).latest_global_id();
690                            (
691                                DataSource::IngestionExport {
692                                    ingestion_id,
693                                    details,
694                                    data_config: data_config
695                                        .into_inline_connection(coord.catalog().state()),
696                                },
697                                source_status_collection_id,
698                            )
699                        }
700                        DataSourceDesc::Progress => (DataSource::Progress, None),
701                        DataSourceDesc::Webhook { .. } => {
702                            if let Some(url) = coord.catalog().state().try_get_webhook_url(&item_id)
703                            {
704                                session.add_notice(AdapterNotice::WebhookSourceCreated { url })
705                            }
706
707                            (DataSource::Webhook, None)
708                        }
709                        DataSourceDesc::Introspection(_) => {
710                            unreachable!("cannot create sources with introspection data sources")
711                        }
712                    };
713
714                    collections.push((
715                        source.global_id,
716                        CollectionDescription::<Timestamp> {
717                            desc: source.desc.clone(),
718                            data_source,
719                            timeline: Some(source.timeline),
720                            since: None,
721                            status_collection_id,
722                        },
723                    ));
724                }
725
726                let storage_metadata = coord.catalog.state().storage_metadata();
727
728                coord
729                    .controller
730                    .storage
731                    .create_collections(storage_metadata, None, collections)
732                    .await
733                    .unwrap_or_terminate("cannot fail to create collections");
734
735                // It is _very_ important that we only initialize read policies
736                // after we have created all the sources/collections. Some of
737                // the sources created in this collection might have
738                // dependencies on other sources, so the controller must get a
739                // chance to install read holds before we set a policy that
740                // might make the since advance.
741                //
742                // One instance of this is the remap shard: it presents as a
743                // SUBSOURCE, and all other SUBSOURCES of a SOURCE will depend
744                // on it. Both subsources and sources will show up as a `Source`
745                // in the above.
746                // Although there should only be one parent source that sequence_create_source is
747                // ever called with, hedge our bets a bit and collect the compaction windows for
748                // each id in the bundle (these should all be identical). This is some extra work
749                // but seems safer.
750                let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
751                for (compaction_window, storage_policies) in read_policies {
752                    coord
753                        .initialize_storage_read_policies(storage_policies, compaction_window)
754                        .await;
755                }
756            })
757            .await;
758
759        match transact_result {
760            Ok(()) => Ok(ExecuteResponse::CreatedSource),
761            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
762                kind:
763                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
764            })) if if_not_exists_ids.contains_key(&id) => {
765                session.add_notice(AdapterNotice::ObjectAlreadyExists {
766                    name: if_not_exists_ids[&id].item.clone(),
767                    ty: "source",
768                });
769                Ok(ExecuteResponse::CreatedSource)
770            }
771            Err(err) => Err(err),
772        }
773    }
774
775    #[instrument]
776    pub(super) async fn sequence_create_connection(
777        &mut self,
778        mut ctx: ExecuteContext,
779        plan: plan::CreateConnectionPlan,
780        resolved_ids: ResolvedIds,
781    ) {
782        let id_ts = self.get_catalog_write_ts().await;
783        let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
784        {
785            Ok(item_id) => item_id,
786            Err(err) => return ctx.retire(Err(err.into())),
787        };
788
789        match &plan.connection.details {
790            ConnectionDetails::Ssh { key_1, key_2, .. } => {
791                let key_1 = match key_1.as_key_pair() {
792                    Some(key_1) => key_1.clone(),
793                    None => {
794                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
795                            "the PUBLIC KEY 1 option cannot be explicitly specified"
796                        ))));
797                    }
798                };
799
800                let key_2 = match key_2.as_key_pair() {
801                    Some(key_2) => key_2.clone(),
802                    None => {
803                        return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
804                            "the PUBLIC KEY 2 option cannot be explicitly specified"
805                        ))));
806                    }
807                };
808
809                let key_set = SshKeyPairSet::from_parts(key_1, key_2);
810                let secret = key_set.to_bytes();
811                if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
812                    return ctx.retire(Err(err.into()));
813                }
814            }
815            _ => (),
816        };
817
818        if plan.validate {
819            let internal_cmd_tx = self.internal_cmd_tx.clone();
820            let transient_revision = self.catalog().transient_revision();
821            let conn_id = ctx.session().conn_id().clone();
822            let otel_ctx = OpenTelemetryContext::obtain();
823            let role_metadata = ctx.session().role_metadata().clone();
824
825            let connection = plan
826                .connection
827                .details
828                .to_connection()
829                .into_inline_connection(self.catalog().state());
830
831            let current_storage_parameters = self.controller.storage.config().clone();
832            task::spawn(|| format!("validate_connection:{conn_id}"), async move {
833                let result = match connection
834                    .validate(connection_id, &current_storage_parameters)
835                    .await
836                {
837                    Ok(()) => Ok(plan),
838                    Err(err) => Err(err.into()),
839                };
840
841                // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
842                let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
843                    CreateConnectionValidationReady {
844                        ctx,
845                        result,
846                        connection_id,
847                        connection_gid,
848                        plan_validity: PlanValidity::new(
849                            transient_revision,
850                            resolved_ids.items().copied().collect(),
851                            None,
852                            None,
853                            role_metadata,
854                        ),
855                        otel_ctx,
856                        resolved_ids: resolved_ids.clone(),
857                    },
858                ));
859                if let Err(e) = result {
860                    tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
861                }
862            });
863        } else {
864            let result = self
865                .sequence_create_connection_stage_finish(
866                    ctx.session_mut(),
867                    connection_id,
868                    connection_gid,
869                    plan,
870                    resolved_ids,
871                )
872                .await;
873            ctx.retire(result);
874        }
875    }
876
877    #[instrument]
878    pub(crate) async fn sequence_create_connection_stage_finish(
879        &mut self,
880        session: &mut Session,
881        connection_id: CatalogItemId,
882        connection_gid: GlobalId,
883        plan: plan::CreateConnectionPlan,
884        resolved_ids: ResolvedIds,
885    ) -> Result<ExecuteResponse, AdapterError> {
886        let ops = vec![catalog::Op::CreateItem {
887            id: connection_id,
888            name: plan.name.clone(),
889            item: CatalogItem::Connection(Connection {
890                create_sql: plan.connection.create_sql,
891                global_id: connection_gid,
892                details: plan.connection.details.clone(),
893                resolved_ids,
894            }),
895            owner_id: *session.current_role_id(),
896        }];
897
898        let transact_result = self
899            .catalog_transact_with_side_effects(Some(session), ops, |coord| async {
900                match plan.connection.details {
901                    ConnectionDetails::AwsPrivatelink(ref privatelink) => {
902                        let spec = VpcEndpointConfig {
903                            aws_service_name: privatelink.service_name.to_owned(),
904                            availability_zone_ids: privatelink.availability_zones.to_owned(),
905                        };
906                        let cloud_resource_controller =
907                            match coord.cloud_resource_controller.as_ref().cloned() {
908                                Some(controller) => controller,
909                                None => {
910                                    tracing::warn!("AWS PrivateLink connections unsupported");
911                                    return;
912                                }
913                            };
914                        if let Err(err) = cloud_resource_controller
915                            .ensure_vpc_endpoint(connection_id, spec)
916                            .await
917                        {
918                            tracing::warn!(?err, "failed to ensure vpc endpoint!");
919                        }
920                    }
921                    _ => {}
922                }
923            })
924            .await;
925
926        match transact_result {
927            Ok(_) => Ok(ExecuteResponse::CreatedConnection),
928            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
929                kind:
930                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
931            })) if plan.if_not_exists => {
932                session.add_notice(AdapterNotice::ObjectAlreadyExists {
933                    name: plan.name.item,
934                    ty: "connection",
935                });
936                Ok(ExecuteResponse::CreatedConnection)
937            }
938            Err(err) => Err(err),
939        }
940    }
941
942    #[instrument]
943    pub(super) async fn sequence_create_database(
944        &mut self,
945        session: &mut Session,
946        plan: plan::CreateDatabasePlan,
947    ) -> Result<ExecuteResponse, AdapterError> {
948        let ops = vec![catalog::Op::CreateDatabase {
949            name: plan.name.clone(),
950            owner_id: *session.current_role_id(),
951        }];
952        match self.catalog_transact(Some(session), ops).await {
953            Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
954            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
955                kind:
956                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
957            })) if plan.if_not_exists => {
958                session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
959                Ok(ExecuteResponse::CreatedDatabase)
960            }
961            Err(err) => Err(err),
962        }
963    }
964
965    #[instrument]
966    pub(super) async fn sequence_create_schema(
967        &mut self,
968        session: &mut Session,
969        plan: plan::CreateSchemaPlan,
970    ) -> Result<ExecuteResponse, AdapterError> {
971        let op = catalog::Op::CreateSchema {
972            database_id: plan.database_spec,
973            schema_name: plan.schema_name.clone(),
974            owner_id: *session.current_role_id(),
975        };
976        match self.catalog_transact(Some(session), vec![op]).await {
977            Ok(_) => Ok(ExecuteResponse::CreatedSchema),
978            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
979                kind:
980                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
981            })) if plan.if_not_exists => {
982                session.add_notice(AdapterNotice::SchemaAlreadyExists {
983                    name: plan.schema_name,
984                });
985                Ok(ExecuteResponse::CreatedSchema)
986            }
987            Err(err) => Err(err),
988        }
989    }
990
991    /// Validates the role attributes for a `CREATE ROLE` statement.
992    fn validate_role_attributes(&self, attributes: &RoleAttributes) -> Result<(), AdapterError> {
993        if !ENABLE_SELF_MANAGED_AUTH.get(self.catalog().system_config().dyncfgs()) {
994            if attributes.superuser.is_some()
995                || attributes.password.is_some()
996                || attributes.login.is_some()
997            {
998                return Err(AdapterError::UnavailableFeature {
999                    feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1000                    docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1001                });
1002            }
1003        }
1004        Ok(())
1005    }
1006
1007    #[instrument]
1008    pub(super) async fn sequence_create_role(
1009        &mut self,
1010        conn_id: Option<&ConnectionId>,
1011        plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1012    ) -> Result<ExecuteResponse, AdapterError> {
1013        self.validate_role_attributes(&attributes.clone())?;
1014        let op = catalog::Op::CreateRole { name, attributes };
1015        self.catalog_transact_conn(conn_id, vec![op])
1016            .await
1017            .map(|_| ExecuteResponse::CreatedRole)
1018    }
1019
1020    #[instrument]
1021    pub(super) async fn sequence_create_network_policy(
1022        &mut self,
1023        session: &Session,
1024        plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1025    ) -> Result<ExecuteResponse, AdapterError> {
1026        let op = catalog::Op::CreateNetworkPolicy {
1027            rules,
1028            name,
1029            owner_id: *session.current_role_id(),
1030        };
1031        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1032            .await
1033            .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1034    }
1035
1036    #[instrument]
1037    pub(super) async fn sequence_alter_network_policy(
1038        &mut self,
1039        session: &Session,
1040        plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1041    ) -> Result<ExecuteResponse, AdapterError> {
1042        // TODO(network_policy): Consider role based network policies here.
1043        let current_network_policy_name = self
1044            .owned_catalog()
1045            .system_config()
1046            .default_network_policy_name();
1047        // Check if the way we're alerting the policy is still valid for the current connection.
1048        if current_network_policy_name == name {
1049            self.validate_alter_network_policy(session, &rules)?;
1050        }
1051
1052        let op = catalog::Op::AlterNetworkPolicy {
1053            id,
1054            rules,
1055            name,
1056            owner_id: *session.current_role_id(),
1057        };
1058        self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1059            .await
1060            .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1061    }
1062
1063    #[instrument]
1064    pub(super) async fn sequence_create_table(
1065        &mut self,
1066        ctx: &mut ExecuteContext,
1067        plan: plan::CreateTablePlan,
1068        resolved_ids: ResolvedIds,
1069    ) -> Result<ExecuteResponse, AdapterError> {
1070        let plan::CreateTablePlan {
1071            name,
1072            table,
1073            if_not_exists,
1074        } = plan;
1075
1076        let conn_id = if table.temporary {
1077            Some(ctx.session().conn_id())
1078        } else {
1079            None
1080        };
1081        let id_ts = self.get_catalog_write_ts().await;
1082        let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1083        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1084
1085        let data_source = match table.data_source {
1086            plan::TableDataSource::TableWrites { defaults } => {
1087                TableDataSource::TableWrites { defaults }
1088            }
1089            plan::TableDataSource::DataSource {
1090                desc: data_source_plan,
1091                timeline,
1092            } => match data_source_plan {
1093                plan::DataSourceDesc::IngestionExport {
1094                    ingestion_id,
1095                    external_reference,
1096                    details,
1097                    data_config,
1098                } => TableDataSource::DataSource {
1099                    desc: DataSourceDesc::IngestionExport {
1100                        ingestion_id,
1101                        external_reference,
1102                        details,
1103                        data_config,
1104                    },
1105                    timeline,
1106                },
1107                plan::DataSourceDesc::Webhook {
1108                    validate_using,
1109                    body_format,
1110                    headers,
1111                    cluster_id,
1112                } => TableDataSource::DataSource {
1113                    desc: DataSourceDesc::Webhook {
1114                        validate_using,
1115                        body_format,
1116                        headers,
1117                        cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1118                    },
1119                    timeline,
1120                },
1121                o => {
1122                    unreachable!("CREATE TABLE data source got {:?}", o)
1123                }
1124            },
1125        };
1126        let table = Table {
1127            create_sql: Some(table.create_sql),
1128            desc: table.desc,
1129            collections,
1130            conn_id: conn_id.cloned(),
1131            resolved_ids,
1132            custom_logical_compaction_window: table.compaction_window,
1133            is_retained_metrics_object: false,
1134            data_source,
1135        };
1136        let ops = vec![catalog::Op::CreateItem {
1137            id: table_id,
1138            name: name.clone(),
1139            item: CatalogItem::Table(table.clone()),
1140            owner_id: *ctx.session().current_role_id(),
1141        }];
1142
1143        let catalog_result = self
1144            .catalog_transact_with_side_effects(Some(ctx.session()), ops, |coord| async {
1145                // The table data_source determines whether this table will be written to
1146                // by environmentd (e.g. with INSERT INTO statements) or by the storage layer
1147                // (e.g. a source-fed table).
1148                let (collections, register_ts, read_policies) = match table.data_source {
1149                    TableDataSource::TableWrites { defaults: _ } => {
1150                        // Determine the initial validity for the table.
1151                        let register_ts = coord.get_local_write_ts().await.timestamp;
1152
1153                        // After acquiring `register_ts` but before using it, we need to
1154                        // be sure we're still the leader. Otherwise a new generation
1155                        // may also be trying to use `register_ts` for a different
1156                        // purpose.
1157                        //
1158                        // See database-issues#8273.
1159                        coord
1160                            .catalog
1161                            .confirm_leadership()
1162                            .await
1163                            .unwrap_or_terminate("unable to confirm leadership");
1164
1165                        if let Some(id) = ctx.extra().contents() {
1166                            coord.set_statement_execution_timestamp(id, register_ts);
1167                        }
1168
1169                        // When initially creating a table it should only have a single version.
1170                        let relation_version = RelationVersion::root();
1171                        assert_eq!(table.desc.latest_version(), relation_version);
1172                        let relation_desc = table
1173                            .desc
1174                            .at_version(RelationVersionSelector::Specific(relation_version));
1175                        // We assert above we have a single version, and thus we are the primary.
1176                        let collection_desc = CollectionDescription::for_table(relation_desc, None);
1177                        let collections = vec![(global_id, collection_desc)];
1178
1179                        let compaction_window = table
1180                            .custom_logical_compaction_window
1181                            .unwrap_or(CompactionWindow::Default);
1182                        let read_policies =
1183                            BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1184
1185                        (collections, Some(register_ts), read_policies)
1186                    }
1187                    TableDataSource::DataSource {
1188                        desc: data_source,
1189                        timeline,
1190                    } => {
1191                        match data_source {
1192                            DataSourceDesc::IngestionExport {
1193                                ingestion_id,
1194                                external_reference: _,
1195                                details,
1196                                data_config,
1197                            } => {
1198                                // TODO: It's a little weird that a table will be present in this
1199                                // source status collection, we might want to split out into a separate
1200                                // status collection.
1201                                let source_status_item_id =
1202                                    coord.catalog().resolve_builtin_storage_collection(
1203                                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1204                                    );
1205                                let status_collection_id = Some(
1206                                    coord
1207                                        .catalog()
1208                                        .get_entry(&source_status_item_id)
1209                                        .latest_global_id(),
1210                                );
1211                                // TODO(parkmycar): We should probably check the type here, but I'm not sure if
1212                                // this will always be a Source or a Table.
1213                                let ingestion_id =
1214                                    coord.catalog().get_entry(&ingestion_id).latest_global_id();
1215                                // Create the underlying collection with the latest schema from the Table.
1216                                let collection_desc = CollectionDescription::<Timestamp> {
1217                                    desc: table.desc.at_version(RelationVersionSelector::Latest),
1218                                    data_source: DataSource::IngestionExport {
1219                                        ingestion_id,
1220                                        details,
1221                                        data_config: data_config
1222                                            .into_inline_connection(coord.catalog.state()),
1223                                    },
1224                                    since: None,
1225                                    status_collection_id,
1226                                    timeline: Some(timeline.clone()),
1227                                };
1228
1229                                let collections = vec![(global_id, collection_desc)];
1230                                let read_policies = coord
1231                                    .catalog()
1232                                    .state()
1233                                    .source_compaction_windows(vec![table_id]);
1234
1235                                (collections, None, read_policies)
1236                            }
1237                            DataSourceDesc::Webhook { .. } => {
1238                                if let Some(url) =
1239                                    coord.catalog().state().try_get_webhook_url(&table_id)
1240                                {
1241                                    ctx.session()
1242                                        .add_notice(AdapterNotice::WebhookSourceCreated { url })
1243                                }
1244
1245                                // Create the underlying collection with the latest schema from the Table.
1246                                assert_eq!(
1247                                    table.desc.latest_version(),
1248                                    RelationVersion::root(),
1249                                    "found webhook with more than 1 relation version, {:?}",
1250                                    table.desc
1251                                );
1252                                let desc = table.desc.latest();
1253
1254                                let collection_desc = CollectionDescription {
1255                                    desc,
1256                                    data_source: DataSource::Webhook,
1257                                    since: None,
1258                                    status_collection_id: None,
1259                                    timeline: Some(timeline.clone()),
1260                                };
1261                                let collections = vec![(global_id, collection_desc)];
1262                                let read_policies = coord
1263                                    .catalog()
1264                                    .state()
1265                                    .source_compaction_windows(vec![table_id]);
1266
1267                                (collections, None, read_policies)
1268                            }
1269                            _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1270                        }
1271                    }
1272                };
1273
1274                // Create the collections.
1275                let storage_metadata = coord.catalog.state().storage_metadata();
1276                coord
1277                    .controller
1278                    .storage
1279                    .create_collections(storage_metadata, register_ts, collections)
1280                    .await
1281                    .unwrap_or_terminate("cannot fail to create collections");
1282
1283                // Mark the register timestamp as completed.
1284                if let Some(register_ts) = register_ts {
1285                    coord.apply_local_write(register_ts).await;
1286                }
1287
1288                // Initialize the Read Policies.
1289                for (compaction_window, storage_policies) in read_policies {
1290                    coord
1291                        .initialize_storage_read_policies(storage_policies, compaction_window)
1292                        .await;
1293                }
1294            })
1295            .await;
1296
1297        match catalog_result {
1298            Ok(()) => Ok(ExecuteResponse::CreatedTable),
1299            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1300                kind:
1301                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1302            })) if if_not_exists => {
1303                ctx.session_mut()
1304                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1305                        name: name.item,
1306                        ty: "table",
1307                    });
1308                Ok(ExecuteResponse::CreatedTable)
1309            }
1310            Err(err) => Err(err),
1311        }
1312    }
1313
1314    #[instrument]
1315    pub(super) async fn sequence_create_sink(
1316        &mut self,
1317        ctx: ExecuteContext,
1318        plan: plan::CreateSinkPlan,
1319        resolved_ids: ResolvedIds,
1320    ) {
1321        let plan::CreateSinkPlan {
1322            name,
1323            sink,
1324            with_snapshot,
1325            if_not_exists,
1326            in_cluster,
1327        } = plan;
1328
1329        // First try to allocate an ID and an OID. If either fails, we're done.
1330        let id_ts = self.get_catalog_write_ts().await;
1331        let (item_id, global_id) =
1332            return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1333
1334        let catalog_sink = Sink {
1335            create_sql: sink.create_sql,
1336            global_id,
1337            from: sink.from,
1338            connection: sink.connection,
1339            envelope: sink.envelope,
1340            version: sink.version,
1341            with_snapshot,
1342            resolved_ids,
1343            cluster_id: in_cluster,
1344        };
1345
1346        let ops = vec![catalog::Op::CreateItem {
1347            id: item_id,
1348            name: name.clone(),
1349            item: CatalogItem::Sink(catalog_sink.clone()),
1350            owner_id: *ctx.session().current_role_id(),
1351        }];
1352
1353        let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1354        if let Err(e) = self
1355            .controller
1356            .storage
1357            .check_exists(sink.from)
1358            .map_err(|e| match e {
1359                StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1360                    "{} is a {}, which cannot be exported as a sink",
1361                    from.name().item.clone(),
1362                    from.item().typ().to_string(),
1363                )),
1364                e => AdapterError::Storage(e),
1365            })
1366        {
1367            ctx.retire(Err(e));
1368            return;
1369        }
1370
1371        let result = self.catalog_transact(Some(ctx.session()), ops).await;
1372
1373        match result {
1374            Ok(()) => {}
1375            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1376                kind:
1377                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1378            })) if if_not_exists => {
1379                ctx.session()
1380                    .add_notice(AdapterNotice::ObjectAlreadyExists {
1381                        name: name.item,
1382                        ty: "sink",
1383                    });
1384                ctx.retire(Ok(ExecuteResponse::CreatedSink));
1385                return;
1386            }
1387            Err(e) => {
1388                ctx.retire(Err(e));
1389                return;
1390            }
1391        };
1392
1393        self.create_storage_export(global_id, &catalog_sink)
1394            .await
1395            .unwrap_or_terminate("cannot fail to create exports");
1396
1397        self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1398            .await;
1399
1400        ctx.retire(Ok(ExecuteResponse::CreatedSink))
1401    }
1402
1403    /// Validates that a view definition does not contain any expressions that may lead to
1404    /// ambiguous column references to system tables. For example `NATURAL JOIN` or `SELECT *`.
1405    ///
1406    /// We prevent these expressions so that we can add columns to system tables without
1407    /// changing the definition of the view.
1408    ///
1409    /// Here is a bit of a hand wavy proof as to why we only need to check the
1410    /// immediate view definition for system objects and ambiguous column
1411    /// references, and not the entire dependency tree:
1412    ///
1413    ///   - A view with no object references cannot have any ambiguous column
1414    ///   references to a system object, because it has no system objects.
1415    ///   - A view with a direct reference to a system object and a * or
1416    ///   NATURAL JOIN will be rejected due to ambiguous column references.
1417    ///   - A view with system objects but no * or NATURAL JOINs cannot have
1418    ///   any ambiguous column references to a system object, because all column
1419    ///   references are explicitly named.
1420    ///   - A view with * or NATURAL JOINs, that doesn't directly reference a
1421    ///   system object cannot have any ambiguous column references to a system
1422    ///   object, because there are no system objects in the top level view and
1423    ///   all sub-views are guaranteed to have no ambiguous column references to
1424    ///   system objects.
1425    pub(super) fn validate_system_column_references(
1426        &self,
1427        uses_ambiguous_columns: bool,
1428        depends_on: &BTreeSet<GlobalId>,
1429    ) -> Result<(), AdapterError> {
1430        if uses_ambiguous_columns
1431            && depends_on
1432                .iter()
1433                .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1434        {
1435            Err(AdapterError::AmbiguousSystemColumnReference)
1436        } else {
1437            Ok(())
1438        }
1439    }
1440
1441    #[instrument]
1442    pub(super) async fn sequence_create_type(
1443        &mut self,
1444        session: &Session,
1445        plan: plan::CreateTypePlan,
1446        resolved_ids: ResolvedIds,
1447    ) -> Result<ExecuteResponse, AdapterError> {
1448        let id_ts = self.get_catalog_write_ts().await;
1449        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1450        let typ = Type {
1451            create_sql: Some(plan.typ.create_sql),
1452            global_id,
1453            desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1454            details: CatalogTypeDetails {
1455                array_id: None,
1456                typ: plan.typ.inner,
1457                pg_metadata: None,
1458            },
1459            resolved_ids,
1460        };
1461        let op = catalog::Op::CreateItem {
1462            id: item_id,
1463            name: plan.name,
1464            item: CatalogItem::Type(typ),
1465            owner_id: *session.current_role_id(),
1466        };
1467        match self.catalog_transact(Some(session), vec![op]).await {
1468            Ok(()) => Ok(ExecuteResponse::CreatedType),
1469            Err(err) => Err(err),
1470        }
1471    }
1472
1473    #[instrument]
1474    pub(super) async fn sequence_comment_on(
1475        &mut self,
1476        session: &Session,
1477        plan: plan::CommentPlan,
1478    ) -> Result<ExecuteResponse, AdapterError> {
1479        let op = catalog::Op::Comment {
1480            object_id: plan.object_id,
1481            sub_component: plan.sub_component,
1482            comment: plan.comment,
1483        };
1484        self.catalog_transact(Some(session), vec![op]).await?;
1485        Ok(ExecuteResponse::Comment)
1486    }
1487
1488    #[instrument]
1489    pub(super) async fn sequence_drop_objects(
1490        &mut self,
1491        session: &Session,
1492        plan::DropObjectsPlan {
1493            drop_ids,
1494            object_type,
1495            referenced_ids,
1496        }: plan::DropObjectsPlan,
1497    ) -> Result<ExecuteResponse, AdapterError> {
1498        let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1499        let mut objects = Vec::new();
1500        for obj_id in &drop_ids {
1501            if !referenced_ids_hashset.contains(obj_id) {
1502                let object_info = ErrorMessageObjectDescription::from_id(
1503                    obj_id,
1504                    &self.catalog().for_session(session),
1505                )
1506                .to_string();
1507                objects.push(object_info);
1508            }
1509        }
1510
1511        if !objects.is_empty() {
1512            session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1513        }
1514
1515        let DropOps {
1516            ops,
1517            dropped_active_db,
1518            dropped_active_cluster,
1519            dropped_in_use_indexes,
1520        } = self.sequence_drop_common(session, drop_ids)?;
1521
1522        self.catalog_transact(Some(session), ops).await?;
1523
1524        fail::fail_point!("after_sequencer_drop_replica");
1525
1526        if dropped_active_db {
1527            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1528                name: session.vars().database().to_string(),
1529            });
1530        }
1531        if dropped_active_cluster {
1532            session.add_notice(AdapterNotice::DroppedActiveCluster {
1533                name: session.vars().cluster().to_string(),
1534            });
1535        }
1536        for dropped_in_use_index in dropped_in_use_indexes {
1537            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1538            self.metrics
1539                .optimization_notices
1540                .with_label_values(&["DroppedInUseIndex"])
1541                .inc_by(1);
1542        }
1543        Ok(ExecuteResponse::DroppedObject(object_type))
1544    }
1545
1546    fn validate_dropped_role_ownership(
1547        &self,
1548        session: &Session,
1549        dropped_roles: &BTreeMap<RoleId, &str>,
1550    ) -> Result<(), AdapterError> {
1551        fn privilege_check(
1552            privileges: &PrivilegeMap,
1553            dropped_roles: &BTreeMap<RoleId, &str>,
1554            dependent_objects: &mut BTreeMap<String, Vec<String>>,
1555            object_id: &SystemObjectId,
1556            catalog: &ConnCatalog,
1557        ) {
1558            for privilege in privileges.all_values() {
1559                if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1560                    let grantor_name = catalog.get_role(&privilege.grantor).name();
1561                    let object_description =
1562                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1563                    dependent_objects
1564                        .entry(role_name.to_string())
1565                        .or_default()
1566                        .push(format!(
1567                            "privileges on {object_description} granted by {grantor_name}",
1568                        ));
1569                }
1570                if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1571                    let grantee_name = catalog.get_role(&privilege.grantee).name();
1572                    let object_description =
1573                        ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1574                    dependent_objects
1575                        .entry(role_name.to_string())
1576                        .or_default()
1577                        .push(format!(
1578                            "privileges granted on {object_description} to {grantee_name}"
1579                        ));
1580                }
1581            }
1582        }
1583
1584        let catalog = self.catalog().for_session(session);
1585        let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1586        for entry in self.catalog.entries() {
1587            let id = SystemObjectId::Object(entry.id().into());
1588            if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1589                let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1590                dependent_objects
1591                    .entry(role_name.to_string())
1592                    .or_default()
1593                    .push(format!("owner of {object_description}"));
1594            }
1595            privilege_check(
1596                entry.privileges(),
1597                dropped_roles,
1598                &mut dependent_objects,
1599                &id,
1600                &catalog,
1601            );
1602        }
1603        for database in self.catalog.databases() {
1604            let database_id = SystemObjectId::Object(database.id().into());
1605            if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1606                let object_description =
1607                    ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1608                dependent_objects
1609                    .entry(role_name.to_string())
1610                    .or_default()
1611                    .push(format!("owner of {object_description}"));
1612            }
1613            privilege_check(
1614                &database.privileges,
1615                dropped_roles,
1616                &mut dependent_objects,
1617                &database_id,
1618                &catalog,
1619            );
1620            for schema in database.schemas_by_id.values() {
1621                let schema_id = SystemObjectId::Object(
1622                    (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1623                );
1624                if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1625                    let object_description =
1626                        ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1627                    dependent_objects
1628                        .entry(role_name.to_string())
1629                        .or_default()
1630                        .push(format!("owner of {object_description}"));
1631                }
1632                privilege_check(
1633                    &schema.privileges,
1634                    dropped_roles,
1635                    &mut dependent_objects,
1636                    &schema_id,
1637                    &catalog,
1638                );
1639            }
1640        }
1641        for cluster in self.catalog.clusters() {
1642            let cluster_id = SystemObjectId::Object(cluster.id().into());
1643            if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1644                let object_description =
1645                    ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1646                dependent_objects
1647                    .entry(role_name.to_string())
1648                    .or_default()
1649                    .push(format!("owner of {object_description}"));
1650            }
1651            privilege_check(
1652                &cluster.privileges,
1653                dropped_roles,
1654                &mut dependent_objects,
1655                &cluster_id,
1656                &catalog,
1657            );
1658            for replica in cluster.replicas() {
1659                if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1660                    let replica_id =
1661                        SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1662                    let object_description =
1663                        ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1664                    dependent_objects
1665                        .entry(role_name.to_string())
1666                        .or_default()
1667                        .push(format!("owner of {object_description}"));
1668                }
1669            }
1670        }
1671        privilege_check(
1672            self.catalog().system_privileges(),
1673            dropped_roles,
1674            &mut dependent_objects,
1675            &SystemObjectId::System,
1676            &catalog,
1677        );
1678        for (default_privilege_object, default_privilege_acl_items) in
1679            self.catalog.default_privileges()
1680        {
1681            if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1682                dependent_objects
1683                    .entry(role_name.to_string())
1684                    .or_default()
1685                    .push(format!(
1686                        "default privileges on {}S created by {}",
1687                        default_privilege_object.object_type, role_name
1688                    ));
1689            }
1690            for default_privilege_acl_item in default_privilege_acl_items {
1691                if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1692                    dependent_objects
1693                        .entry(role_name.to_string())
1694                        .or_default()
1695                        .push(format!(
1696                            "default privileges on {}S granted to {}",
1697                            default_privilege_object.object_type, role_name
1698                        ));
1699                }
1700            }
1701        }
1702
1703        if !dependent_objects.is_empty() {
1704            Err(AdapterError::DependentObject(dependent_objects))
1705        } else {
1706            Ok(())
1707        }
1708    }
1709
1710    #[instrument]
1711    pub(super) async fn sequence_drop_owned(
1712        &mut self,
1713        session: &Session,
1714        plan: plan::DropOwnedPlan,
1715    ) -> Result<ExecuteResponse, AdapterError> {
1716        for role_id in &plan.role_ids {
1717            self.catalog().ensure_not_reserved_role(role_id)?;
1718        }
1719
1720        let mut privilege_revokes = plan.privilege_revokes;
1721
1722        // Make sure this stays in sync with the beginning of `rbac::check_plan`.
1723        let session_catalog = self.catalog().for_session(session);
1724        if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1725            && !session.is_superuser()
1726        {
1727            // Obtain all roles that the current session is a member of.
1728            let role_membership =
1729                session_catalog.collect_role_membership(session.current_role_id());
1730            let invalid_revokes: BTreeSet<_> = privilege_revokes
1731                .drain_filter_swapping(|(_, privilege)| {
1732                    !role_membership.contains(&privilege.grantor)
1733                })
1734                .map(|(object_id, _)| object_id)
1735                .collect();
1736            for invalid_revoke in invalid_revokes {
1737                let object_description =
1738                    ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1739                session.add_notice(AdapterNotice::CannotRevoke { object_description });
1740            }
1741        }
1742
1743        let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1744            catalog::Op::UpdatePrivilege {
1745                target_id: object_id,
1746                privilege,
1747                variant: UpdatePrivilegeVariant::Revoke,
1748            }
1749        });
1750        let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1751            |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1752                privilege_object,
1753                privilege_acl_item,
1754                variant: UpdatePrivilegeVariant::Revoke,
1755            },
1756        );
1757        let DropOps {
1758            ops: drop_ops,
1759            dropped_active_db,
1760            dropped_active_cluster,
1761            dropped_in_use_indexes,
1762        } = self.sequence_drop_common(session, plan.drop_ids)?;
1763
1764        let ops = privilege_revoke_ops
1765            .chain(default_privilege_revoke_ops)
1766            .chain(drop_ops.into_iter())
1767            .collect();
1768
1769        self.catalog_transact(Some(session), ops).await?;
1770
1771        if dropped_active_db {
1772            session.add_notice(AdapterNotice::DroppedActiveDatabase {
1773                name: session.vars().database().to_string(),
1774            });
1775        }
1776        if dropped_active_cluster {
1777            session.add_notice(AdapterNotice::DroppedActiveCluster {
1778                name: session.vars().cluster().to_string(),
1779            });
1780        }
1781        for dropped_in_use_index in dropped_in_use_indexes {
1782            session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1783        }
1784        Ok(ExecuteResponse::DroppedOwned)
1785    }
1786
1787    fn sequence_drop_common(
1788        &self,
1789        session: &Session,
1790        ids: Vec<ObjectId>,
1791    ) -> Result<DropOps, AdapterError> {
1792        let mut dropped_active_db = false;
1793        let mut dropped_active_cluster = false;
1794        let mut dropped_in_use_indexes = Vec::new();
1795        let mut dropped_roles = BTreeMap::new();
1796        let mut dropped_databases = BTreeSet::new();
1797        let mut dropped_schemas = BTreeSet::new();
1798        // Dropping either the group role or the member role of a role membership will trigger a
1799        // revoke role. We use a Set for the revokes to avoid trying to attempt to revoke the same
1800        // role membership twice.
1801        let mut role_revokes = BTreeSet::new();
1802        // Dropping a database or a schema will revoke all default roles associated with that
1803        // database or schema.
1804        let mut default_privilege_revokes = BTreeSet::new();
1805
1806        // Clusters we're dropping
1807        let mut clusters_to_drop = BTreeSet::new();
1808
1809        let ids_set = ids.iter().collect::<BTreeSet<_>>();
1810        for id in &ids {
1811            match id {
1812                ObjectId::Database(id) => {
1813                    let name = self.catalog().get_database(id).name();
1814                    if name == session.vars().database() {
1815                        dropped_active_db = true;
1816                    }
1817                    dropped_databases.insert(id);
1818                }
1819                ObjectId::Schema((_, spec)) => {
1820                    if let SchemaSpecifier::Id(id) = spec {
1821                        dropped_schemas.insert(id);
1822                    }
1823                }
1824                ObjectId::Cluster(id) => {
1825                    clusters_to_drop.insert(*id);
1826                    if let Some(active_id) = self
1827                        .catalog()
1828                        .active_cluster(session)
1829                        .ok()
1830                        .map(|cluster| cluster.id())
1831                    {
1832                        if id == &active_id {
1833                            dropped_active_cluster = true;
1834                        }
1835                    }
1836                }
1837                ObjectId::Role(id) => {
1838                    let role = self.catalog().get_role(id);
1839                    let name = role.name();
1840                    dropped_roles.insert(*id, name);
1841                    // We must revoke all role memberships that the dropped roles belongs to.
1842                    for (group_id, grantor_id) in &role.membership.map {
1843                        role_revokes.insert((*group_id, *id, *grantor_id));
1844                    }
1845                }
1846                ObjectId::Item(id) => {
1847                    if let Some(index) = self.catalog().get_entry(id).index() {
1848                        let humanizer = self.catalog().for_session(session);
1849                        let dependants = self
1850                            .controller
1851                            .compute
1852                            .collection_reverse_dependencies(index.cluster_id, index.global_id())
1853                            .ok()
1854                            .into_iter()
1855                            .flatten()
1856                            .filter(|dependant_id| {
1857                                // Transient Ids belong to Peeks. We are not interested for now in
1858                                // peeks depending on a dropped index.
1859                                // TODO: show a different notice in this case. Something like
1860                                // "There is an in-progress ad hoc SELECT that uses the dropped
1861                                // index. The resources used by the index will be freed when all
1862                                // such SELECTs complete."
1863                                if dependant_id.is_transient() {
1864                                    return false;
1865                                }
1866                                // The item should exist, but don't panic if it doesn't.
1867                                let Some(dependent_id) = humanizer
1868                                    .try_get_item_by_global_id(dependant_id)
1869                                    .map(|item| item.id())
1870                                else {
1871                                    return false;
1872                                };
1873                                // If the dependent object is also being dropped, then there is no
1874                                // problem, so we don't want a notice.
1875                                !ids_set.contains(&ObjectId::Item(dependent_id))
1876                            })
1877                            .flat_map(|dependant_id| {
1878                                // If we are not able to find a name for this ID it probably means
1879                                // we have already dropped the compute collection, in which case we
1880                                // can ignore it.
1881                                humanizer.humanize_id(dependant_id)
1882                            })
1883                            .collect_vec();
1884                        if !dependants.is_empty() {
1885                            dropped_in_use_indexes.push(DroppedInUseIndex {
1886                                index_name: humanizer
1887                                    .humanize_id(index.global_id())
1888                                    .unwrap_or_else(|| id.to_string()),
1889                                dependant_objects: dependants,
1890                            });
1891                        }
1892                    }
1893                }
1894                _ => {}
1895            }
1896        }
1897
1898        for id in &ids {
1899            match id {
1900                // Validate that `ClusterReplica` drops do not drop replicas of managed clusters,
1901                // unless they are internal replicas, which exist outside the scope
1902                // of managed clusters.
1903                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1904                    if !clusters_to_drop.contains(cluster_id) {
1905                        let cluster = self.catalog.get_cluster(*cluster_id);
1906                        if cluster.is_managed() {
1907                            let replica =
1908                                cluster.replica(*replica_id).expect("Catalog out of sync");
1909                            if !replica.config.location.internal() {
1910                                coord_bail!("cannot drop replica of managed cluster");
1911                            }
1912                        }
1913                    }
1914                }
1915                _ => {}
1916            }
1917        }
1918
1919        for role_id in dropped_roles.keys() {
1920            self.catalog().ensure_not_reserved_role(role_id)?;
1921        }
1922        self.validate_dropped_role_ownership(session, &dropped_roles)?;
1923        // If any role is a member of a dropped role, then we must revoke that membership.
1924        let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1925        for role in self.catalog().user_roles() {
1926            for dropped_role_id in
1927                dropped_role_ids.intersection(&role.membership.map.keys().collect())
1928            {
1929                role_revokes.insert((
1930                    **dropped_role_id,
1931                    role.id(),
1932                    *role
1933                        .membership
1934                        .map
1935                        .get(*dropped_role_id)
1936                        .expect("included in keys above"),
1937                ));
1938            }
1939        }
1940
1941        for (default_privilege_object, default_privilege_acls) in
1942            self.catalog().default_privileges()
1943        {
1944            if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1945                || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1946            {
1947                for default_privilege_acl in default_privilege_acls {
1948                    default_privilege_revokes.insert((
1949                        default_privilege_object.clone(),
1950                        default_privilege_acl.clone(),
1951                    ));
1952                }
1953            }
1954        }
1955
1956        let ops = role_revokes
1957            .into_iter()
1958            .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1959                role_id,
1960                member_id,
1961                grantor_id,
1962            })
1963            .chain(default_privilege_revokes.into_iter().map(
1964                |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1965                    privilege_object,
1966                    privilege_acl_item,
1967                    variant: UpdatePrivilegeVariant::Revoke,
1968                },
1969            ))
1970            .chain(iter::once(catalog::Op::DropObjects(
1971                ids.into_iter()
1972                    .map(DropObjectInfo::manual_drop_from_object_id)
1973                    .collect(),
1974            )))
1975            .collect();
1976
1977        Ok(DropOps {
1978            ops,
1979            dropped_active_db,
1980            dropped_active_cluster,
1981            dropped_in_use_indexes,
1982        })
1983    }
1984
1985    pub(super) fn sequence_explain_schema(
1986        &self,
1987        ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1988    ) -> Result<ExecuteResponse, AdapterError> {
1989        let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1990            AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1991        })?;
1992
1993        let json_string = json_string(&json_value);
1994        let row = Row::pack_slice(&[Datum::String(&json_string)]);
1995        Ok(Self::send_immediate_rows(row))
1996    }
1997
1998    pub(super) fn sequence_show_all_variables(
1999        &self,
2000        session: &Session,
2001    ) -> Result<ExecuteResponse, AdapterError> {
2002        let mut rows = viewable_variables(self.catalog().state(), session)
2003            .map(|v| (v.name(), v.value(), v.description()))
2004            .collect::<Vec<_>>();
2005        rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2006
2007        // TODO(parkmycar): Pack all of these into a single RowCollection.
2008        let rows: Vec<_> = rows
2009            .into_iter()
2010            .map(|(name, val, desc)| {
2011                Row::pack_slice(&[
2012                    Datum::String(name),
2013                    Datum::String(&val),
2014                    Datum::String(desc),
2015                ])
2016            })
2017            .collect();
2018        Ok(Self::send_immediate_rows(rows))
2019    }
2020
2021    pub(super) fn sequence_show_variable(
2022        &self,
2023        session: &Session,
2024        plan: plan::ShowVariablePlan,
2025    ) -> Result<ExecuteResponse, AdapterError> {
2026        if &plan.name == SCHEMA_ALIAS {
2027            let schemas = self.catalog.resolve_search_path(session);
2028            let schema = schemas.first();
2029            return match schema {
2030                Some((database_spec, schema_spec)) => {
2031                    let schema_name = &self
2032                        .catalog
2033                        .get_schema(database_spec, schema_spec, session.conn_id())
2034                        .name()
2035                        .schema;
2036                    let row = Row::pack_slice(&[Datum::String(schema_name)]);
2037                    Ok(Self::send_immediate_rows(row))
2038                }
2039                None => {
2040                    if session.vars().current_object_missing_warnings() {
2041                        session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2042                            search_path: session
2043                                .vars()
2044                                .search_path()
2045                                .into_iter()
2046                                .map(|schema| schema.to_string())
2047                                .collect(),
2048                        });
2049                    }
2050                    Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2051                }
2052            };
2053        }
2054
2055        let variable = session
2056            .vars()
2057            .get(self.catalog().system_config(), &plan.name)
2058            .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2059
2060        // In lieu of plumbing the user to all system config functions, just check that the var is
2061        // visible.
2062        variable.visible(session.user(), self.catalog().system_config())?;
2063
2064        let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2065        if variable.name() == vars::DATABASE.name()
2066            && matches!(
2067                self.catalog().resolve_database(&variable.value()),
2068                Err(CatalogError::UnknownDatabase(_))
2069            )
2070            && session.vars().current_object_missing_warnings()
2071        {
2072            let name = variable.value();
2073            session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2074        } else if variable.name() == vars::CLUSTER.name()
2075            && matches!(
2076                self.catalog().resolve_cluster(&variable.value()),
2077                Err(CatalogError::UnknownCluster(_))
2078            )
2079            && session.vars().current_object_missing_warnings()
2080        {
2081            let name = variable.value();
2082            session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2083        }
2084        Ok(Self::send_immediate_rows(row))
2085    }
2086
2087    #[instrument]
2088    pub(super) async fn sequence_inspect_shard(
2089        &self,
2090        session: &Session,
2091        plan: plan::InspectShardPlan,
2092    ) -> Result<ExecuteResponse, AdapterError> {
2093        // TODO: Not thrilled about this rbac special case here, but probably
2094        // sufficient for now.
2095        if !session.user().is_internal() {
2096            return Err(AdapterError::Unauthorized(
2097                rbac::UnauthorizedError::MzSystem {
2098                    action: "inspect".into(),
2099                },
2100            ));
2101        }
2102        let state = self
2103            .controller
2104            .storage
2105            .inspect_persist_state(plan.id)
2106            .await?;
2107        let jsonb = Jsonb::from_serde_json(state)?;
2108        Ok(Self::send_immediate_rows(jsonb.into_row()))
2109    }
2110
2111    #[instrument]
2112    pub(super) fn sequence_set_variable(
2113        &self,
2114        session: &mut Session,
2115        plan: plan::SetVariablePlan,
2116    ) -> Result<ExecuteResponse, AdapterError> {
2117        let (name, local) = (plan.name, plan.local);
2118        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2119            self.validate_set_isolation_level(session)?;
2120        }
2121        if &name == vars::CLUSTER.name() {
2122            self.validate_set_cluster(session)?;
2123        }
2124
2125        let vars = session.vars_mut();
2126        let values = match plan.value {
2127            plan::VariableValue::Default => None,
2128            plan::VariableValue::Values(values) => Some(values),
2129        };
2130
2131        match values {
2132            Some(values) => {
2133                vars.set(
2134                    self.catalog().system_config(),
2135                    &name,
2136                    VarInput::SqlSet(&values),
2137                    local,
2138                )?;
2139
2140                let vars = session.vars();
2141
2142                // Emit a warning when deprecated variables are used.
2143                // TODO(database-issues#8069) remove this after sufficient time has passed
2144                if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2145                    session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2146                } else if name == vars::CLUSTER.name()
2147                    && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2148                {
2149                    session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2150                }
2151
2152                // Database or cluster value does not correspond to a catalog item.
2153                if name.as_str() == vars::DATABASE.name()
2154                    && matches!(
2155                        self.catalog().resolve_database(vars.database()),
2156                        Err(CatalogError::UnknownDatabase(_))
2157                    )
2158                    && session.vars().current_object_missing_warnings()
2159                {
2160                    let name = vars.database().to_string();
2161                    session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2162                } else if name.as_str() == vars::CLUSTER.name()
2163                    && matches!(
2164                        self.catalog().resolve_cluster(vars.cluster()),
2165                        Err(CatalogError::UnknownCluster(_))
2166                    )
2167                    && session.vars().current_object_missing_warnings()
2168                {
2169                    let name = vars.cluster().to_string();
2170                    session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2171                } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2172                    let v = values.into_first().to_lowercase();
2173                    if v == IsolationLevel::ReadUncommitted.as_str()
2174                        || v == IsolationLevel::ReadCommitted.as_str()
2175                        || v == IsolationLevel::RepeatableRead.as_str()
2176                    {
2177                        session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2178                            isolation_level: v,
2179                        });
2180                    } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2181                        session.add_notice(AdapterNotice::StrongSessionSerializable);
2182                    }
2183                }
2184            }
2185            None => vars.reset(self.catalog().system_config(), &name, local)?,
2186        }
2187
2188        Ok(ExecuteResponse::SetVariable { name, reset: false })
2189    }
2190
2191    pub(super) fn sequence_reset_variable(
2192        &self,
2193        session: &mut Session,
2194        plan: plan::ResetVariablePlan,
2195    ) -> Result<ExecuteResponse, AdapterError> {
2196        let name = plan.name;
2197        if &name == TRANSACTION_ISOLATION_VAR_NAME {
2198            self.validate_set_isolation_level(session)?;
2199        }
2200        if &name == vars::CLUSTER.name() {
2201            self.validate_set_cluster(session)?;
2202        }
2203        session
2204            .vars_mut()
2205            .reset(self.catalog().system_config(), &name, false)?;
2206        Ok(ExecuteResponse::SetVariable { name, reset: true })
2207    }
2208
2209    pub(super) fn sequence_set_transaction(
2210        &self,
2211        session: &mut Session,
2212        plan: plan::SetTransactionPlan,
2213    ) -> Result<ExecuteResponse, AdapterError> {
2214        // TODO(jkosh44) Only supports isolation levels for now.
2215        for mode in plan.modes {
2216            match mode {
2217                TransactionMode::AccessMode(_) => {
2218                    return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2219                }
2220                TransactionMode::IsolationLevel(isolation_level) => {
2221                    self.validate_set_isolation_level(session)?;
2222
2223                    session.vars_mut().set(
2224                        self.catalog().system_config(),
2225                        TRANSACTION_ISOLATION_VAR_NAME,
2226                        VarInput::Flat(&isolation_level.to_ast_string_stable()),
2227                        plan.local,
2228                    )?
2229                }
2230            }
2231        }
2232        Ok(ExecuteResponse::SetVariable {
2233            name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2234            reset: false,
2235        })
2236    }
2237
2238    fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2239        if session.transaction().contains_ops() {
2240            Err(AdapterError::InvalidSetIsolationLevel)
2241        } else {
2242            Ok(())
2243        }
2244    }
2245
2246    fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2247        if session.transaction().contains_ops() {
2248            Err(AdapterError::InvalidSetCluster)
2249        } else {
2250            Ok(())
2251        }
2252    }
2253
2254    #[instrument]
2255    pub(super) async fn sequence_end_transaction(
2256        &mut self,
2257        mut ctx: ExecuteContext,
2258        mut action: EndTransactionAction,
2259    ) {
2260        // If the transaction has failed, we can only rollback.
2261        if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2262            (&action, ctx.session().transaction())
2263        {
2264            action = EndTransactionAction::Rollback;
2265        }
2266        let response = match action {
2267            EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2268                params: BTreeMap::new(),
2269            }),
2270            EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2271                params: BTreeMap::new(),
2272            }),
2273        };
2274
2275        let result = self
2276            .sequence_end_transaction_inner(ctx.session_mut(), action)
2277            .await;
2278
2279        let (response, action) = match result {
2280            Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2281                (response, action)
2282            }
2283            Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2284                // Make sure we have the correct set of write locks for this transaction.
2285                // Aggressively dropping partial sets of locks to prevent deadlocking separate
2286                // transactions.
2287                let validated_locks = match write_lock_guards {
2288                    None => None,
2289                    Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2290                        Ok(locks) => Some(locks),
2291                        Err(missing) => {
2292                            tracing::error!(?missing, "programming error, missing write locks");
2293                            return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2294                        }
2295                    },
2296                };
2297
2298                let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2299                for WriteOp { id, rows } in writes {
2300                    let total_rows = collected_writes.entry(id).or_default();
2301                    total_rows.push(rows);
2302                }
2303
2304                self.submit_write(PendingWriteTxn::User {
2305                    span: Span::current(),
2306                    writes: collected_writes,
2307                    write_locks: validated_locks,
2308                    pending_txn: PendingTxn {
2309                        ctx,
2310                        response,
2311                        action,
2312                    },
2313                });
2314                return;
2315            }
2316            Ok((
2317                Some(TransactionOps::Peeks {
2318                    determination,
2319                    requires_linearization: RequireLinearization::Required,
2320                    ..
2321                }),
2322                _,
2323            )) if ctx.session().vars().transaction_isolation()
2324                == &IsolationLevel::StrictSerializable =>
2325            {
2326                let conn_id = ctx.session().conn_id().clone();
2327                let pending_read_txn = PendingReadTxn {
2328                    txn: PendingRead::Read {
2329                        txn: PendingTxn {
2330                            ctx,
2331                            response,
2332                            action,
2333                        },
2334                    },
2335                    timestamp_context: determination.timestamp_context,
2336                    created: Instant::now(),
2337                    num_requeues: 0,
2338                    otel_ctx: OpenTelemetryContext::obtain(),
2339                };
2340                self.strict_serializable_reads_tx
2341                    .send((conn_id, pending_read_txn))
2342                    .expect("sending to strict_serializable_reads_tx cannot fail");
2343                return;
2344            }
2345            Ok((
2346                Some(TransactionOps::Peeks {
2347                    determination,
2348                    requires_linearization: RequireLinearization::Required,
2349                    ..
2350                }),
2351                _,
2352            )) if ctx.session().vars().transaction_isolation()
2353                == &IsolationLevel::StrongSessionSerializable =>
2354            {
2355                if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2356                    ctx.session_mut()
2357                        .ensure_timestamp_oracle(timeline.clone())
2358                        .apply_write(*ts);
2359                }
2360                (response, action)
2361            }
2362            Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2363                self.internal_cmd_tx
2364                    .send(Message::ExecuteSingleStatementTransaction {
2365                        ctx,
2366                        otel_ctx: OpenTelemetryContext::obtain(),
2367                        stmt,
2368                        params,
2369                    })
2370                    .expect("must send");
2371                return;
2372            }
2373            Ok((_, _)) => (response, action),
2374            Err(err) => (Err(err), EndTransactionAction::Rollback),
2375        };
2376        let changed = ctx.session_mut().vars_mut().end_transaction(action);
2377        // Append any parameters that changed to the response.
2378        let response = response.map(|mut r| {
2379            r.extend_params(changed);
2380            ExecuteResponse::from(r)
2381        });
2382
2383        ctx.retire(response);
2384    }
2385
2386    #[instrument]
2387    async fn sequence_end_transaction_inner(
2388        &mut self,
2389        session: &mut Session,
2390        action: EndTransactionAction,
2391    ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2392        let txn = self.clear_transaction(session).await;
2393
2394        if let EndTransactionAction::Commit = action {
2395            if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2396                match &mut ops {
2397                    TransactionOps::Writes(writes) => {
2398                        for WriteOp { id, .. } in &mut writes.iter() {
2399                            // Re-verify this id exists.
2400                            let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2401                                AdapterError::Catalog(mz_catalog::memory::error::Error {
2402                                    kind: mz_catalog::memory::error::ErrorKind::Sql(
2403                                        CatalogError::UnknownItem(id.to_string()),
2404                                    ),
2405                                })
2406                            })?;
2407                        }
2408
2409                        // `rows` can be empty if, say, a DELETE's WHERE clause had 0 results.
2410                        writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2411                    }
2412                    TransactionOps::DDL {
2413                        ops,
2414                        state: _,
2415                        revision,
2416                    } => {
2417                        // Make sure our catalog hasn't changed.
2418                        if *revision != self.catalog().transient_revision() {
2419                            return Err(AdapterError::DDLTransactionRace);
2420                        }
2421                        // Commit all of our queued ops.
2422                        self.catalog_transact(Some(session), std::mem::take(ops))
2423                            .await?;
2424                    }
2425                    _ => (),
2426                }
2427                return Ok((Some(ops), write_lock_guards));
2428            }
2429        }
2430
2431        Ok((None, None))
2432    }
2433
2434    pub(super) async fn sequence_side_effecting_func(
2435        &mut self,
2436        ctx: ExecuteContext,
2437        plan: SideEffectingFunc,
2438    ) {
2439        match plan {
2440            SideEffectingFunc::PgCancelBackend { connection_id } => {
2441                if ctx.session().conn_id().unhandled() == connection_id {
2442                    // As a special case, if we're canceling ourselves, we send
2443                    // back a canceled resposne to the client issuing the query,
2444                    // and so we need to do no further processing of the cancel.
2445                    ctx.retire(Err(AdapterError::Canceled));
2446                    return;
2447                }
2448
2449                let res = if let Some((id_handle, _conn_meta)) =
2450                    self.active_conns.get_key_value(&connection_id)
2451                {
2452                    // check_plan already verified role membership.
2453                    self.handle_privileged_cancel(id_handle.clone()).await;
2454                    Datum::True
2455                } else {
2456                    Datum::False
2457                };
2458                ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2459            }
2460        }
2461    }
2462
2463    /// Checks to see if the session needs a real time recency timestamp and if so returns
2464    /// a future that will return the timestamp.
2465    pub(super) async fn determine_real_time_recent_timestamp(
2466        &self,
2467        session: &Session,
2468        source_ids: impl Iterator<Item = CatalogItemId>,
2469    ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2470    {
2471        let vars = session.vars();
2472
2473        // Ideally this logic belongs inside of
2474        // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2475        // logic in there would make it extremely difficult and inconvenient to pull the waiting off
2476        // of the main coord thread.
2477        let r = if vars.real_time_recency()
2478            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2479            && !session.contains_read_timestamp()
2480        {
2481            // Find all dependencies transitively because we need to ensure that
2482            // RTR queries determine the timestamp from the sources' (i.e.
2483            // storage objects that ingest data from external systems) remap
2484            // data. We "cheat" a little bit and filter out any IDs that aren't
2485            // user objects because we know they are not a RTR source.
2486            let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2487            // If none of the sources are user objects, we don't need to provide
2488            // a RTR timestamp.
2489            if to_visit.is_empty() {
2490                return Ok(None);
2491            }
2492
2493            let mut timestamp_objects = BTreeSet::new();
2494
2495            while let Some(id) = to_visit.pop_front() {
2496                timestamp_objects.insert(id);
2497                to_visit.extend(
2498                    self.catalog()
2499                        .get_entry(&id)
2500                        .uses()
2501                        .into_iter()
2502                        .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2503                );
2504            }
2505            let timestamp_objects = timestamp_objects
2506                .into_iter()
2507                .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2508                .collect();
2509
2510            let r = self
2511                .controller
2512                .determine_real_time_recent_timestamp(
2513                    timestamp_objects,
2514                    *vars.real_time_recency_timeout(),
2515                )
2516                .await?;
2517
2518            Some(r)
2519        } else {
2520            None
2521        };
2522
2523        Ok(r)
2524    }
2525
2526    #[instrument]
2527    pub(super) async fn sequence_explain_plan(
2528        &mut self,
2529        ctx: ExecuteContext,
2530        plan: plan::ExplainPlanPlan,
2531        target_cluster: TargetCluster,
2532    ) {
2533        match &plan.explainee {
2534            plan::Explainee::Statement(stmt) => match stmt {
2535                plan::ExplaineeStatement::CreateView { .. } => {
2536                    self.explain_create_view(ctx, plan).await;
2537                }
2538                plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2539                    self.explain_create_materialized_view(ctx, plan).await;
2540                }
2541                plan::ExplaineeStatement::CreateIndex { .. } => {
2542                    self.explain_create_index(ctx, plan).await;
2543                }
2544                plan::ExplaineeStatement::Select { .. } => {
2545                    self.explain_peek(ctx, plan, target_cluster).await;
2546                }
2547            },
2548            plan::Explainee::View(_) => {
2549                let result = self.explain_view(&ctx, plan);
2550                ctx.retire(result);
2551            }
2552            plan::Explainee::MaterializedView(_) => {
2553                let result = self.explain_materialized_view(&ctx, plan);
2554                ctx.retire(result);
2555            }
2556            plan::Explainee::Index(_) => {
2557                let result = self.explain_index(&ctx, plan);
2558                ctx.retire(result);
2559            }
2560            plan::Explainee::ReplanView(_) => {
2561                self.explain_replan_view(ctx, plan).await;
2562            }
2563            plan::Explainee::ReplanMaterializedView(_) => {
2564                self.explain_replan_materialized_view(ctx, plan).await;
2565            }
2566            plan::Explainee::ReplanIndex(_) => {
2567                self.explain_replan_index(ctx, plan).await;
2568            }
2569        };
2570    }
2571
2572    pub(super) async fn sequence_explain_pushdown(
2573        &mut self,
2574        ctx: ExecuteContext,
2575        plan: plan::ExplainPushdownPlan,
2576        target_cluster: TargetCluster,
2577    ) {
2578        match plan.explainee {
2579            Explainee::Statement(ExplaineeStatement::Select {
2580                broken: false,
2581                plan,
2582                desc: _,
2583            }) => {
2584                let stage = return_if_err!(
2585                    self.peek_validate(
2586                        ctx.session(),
2587                        plan,
2588                        target_cluster,
2589                        None,
2590                        ExplainContext::Pushdown,
2591                        Some(ctx.session().vars().max_query_result_size()),
2592                    ),
2593                    ctx
2594                );
2595                self.sequence_staged(ctx, Span::current(), stage).await;
2596            }
2597            Explainee::MaterializedView(item_id) => {
2598                self.explain_pushdown_materialized_view(ctx, item_id).await;
2599            }
2600            _ => {
2601                ctx.retire(Err(AdapterError::Unsupported(
2602                    "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2603                )));
2604            }
2605        };
2606    }
2607
2608    async fn render_explain_pushdown(
2609        &self,
2610        ctx: ExecuteContext,
2611        as_of: Antichain<Timestamp>,
2612        mz_now: ResultSpec<'static>,
2613        read_holds: Option<ReadHolds<Timestamp>>,
2614        imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2615    ) {
2616        let fut = self
2617            .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2618            .await;
2619        task::spawn(|| "render explain pushdown", async move {
2620            // Transfer the necessary read holds over to the background task
2621            let _read_holds = read_holds;
2622            let res = fut.await;
2623            ctx.retire(res);
2624        });
2625    }
2626
2627    async fn render_explain_pushdown_prepare<
2628        I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2629    >(
2630        &self,
2631        session: &Session,
2632        as_of: Antichain<Timestamp>,
2633        mz_now: ResultSpec<'static>,
2634        imports: I,
2635    ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2636        let explain_timeout = *session.vars().statement_timeout();
2637        let mut futures = FuturesOrdered::new();
2638        for (id, mfp) in imports {
2639            let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2640            let full_name = self
2641                .catalog
2642                .for_session(session)
2643                .resolve_full_name(&catalog_entry.name);
2644            let name = format!("{}", full_name);
2645            let relation_desc = catalog_entry
2646                .desc_opt()
2647                .expect("source should have a proper desc")
2648                .into_owned();
2649            let stats_future = self
2650                .controller
2651                .storage_collections
2652                .snapshot_parts_stats(id, as_of.clone())
2653                .await;
2654
2655            let mz_now = mz_now.clone();
2656            // These futures may block if the source is not yet readable at the as-of;
2657            // stash them in `futures` and only block on them in a separate task.
2658            futures.push_back(async move {
2659                let snapshot_stats = match stats_future.await {
2660                    Ok(stats) => stats,
2661                    Err(e) => return Err(e),
2662                };
2663                let mut total_bytes = 0;
2664                let mut total_parts = 0;
2665                let mut selected_bytes = 0;
2666                let mut selected_parts = 0;
2667                for SnapshotPartStats {
2668                    encoded_size_bytes: bytes,
2669                    stats,
2670                } in &snapshot_stats.parts
2671                {
2672                    let bytes = u64::cast_from(*bytes);
2673                    total_bytes += bytes;
2674                    total_parts += 1u64;
2675                    let selected = match stats {
2676                        None => true,
2677                        Some(stats) => {
2678                            let stats = stats.decode();
2679                            let stats = RelationPartStats::new(
2680                                name.as_str(),
2681                                &snapshot_stats.metrics.pushdown.part_stats,
2682                                &relation_desc,
2683                                &stats,
2684                            );
2685                            stats.may_match_mfp(mz_now.clone(), &mfp)
2686                        }
2687                    };
2688
2689                    if selected {
2690                        selected_bytes += bytes;
2691                        selected_parts += 1u64;
2692                    }
2693                }
2694                Ok(Row::pack_slice(&[
2695                    name.as_str().into(),
2696                    total_bytes.into(),
2697                    selected_bytes.into(),
2698                    total_parts.into(),
2699                    selected_parts.into(),
2700                ]))
2701            });
2702        }
2703
2704        let fut = async move {
2705            match tokio::time::timeout(
2706                explain_timeout,
2707                futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2708            )
2709            .await
2710            {
2711                Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2712                    rows: Box::new(rows.into_row_iter()),
2713                }),
2714                Ok(Err(err)) => Err(err.into()),
2715                Err(_) => Err(AdapterError::StatementTimeout),
2716            }
2717        };
2718        fut
2719    }
2720
2721    #[instrument]
2722    pub(super) async fn sequence_insert(
2723        &mut self,
2724        mut ctx: ExecuteContext,
2725        plan: plan::InsertPlan,
2726    ) {
2727        // Normally, this would get checked when trying to add "write ops" to
2728        // the transaction but we go down diverging paths below, based on
2729        // whether the INSERT is only constant values or not.
2730        //
2731        // For the non-constant case we sequence an implicit read-then-write,
2732        // which messes with the transaction ops and would allow an implicit
2733        // read-then-write to sneak into a read-only transaction.
2734        if !ctx.session_mut().transaction().allows_writes() {
2735            ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2736            return;
2737        }
2738
2739        // The structure of this code originates from a time where
2740        // `ReadThenWritePlan` was carrying an `MirRelationExpr` instead of an
2741        // optimized `MirRelationExpr`.
2742        //
2743        // Ideally, we would like to make the `selection.as_const().is_some()`
2744        // check on `plan.values` instead. However, `VALUES (1), (3)` statements
2745        // are planned as a Wrap($n, $vals) call, so until we can reduce
2746        // HirRelationExpr this will always returns false.
2747        //
2748        // Unfortunately, hitting the default path of the match below also
2749        // causes a lot of tests to fail, so we opted to go with the extra
2750        // `plan.values.clone()` statements when producing the `optimized_mir`
2751        // and re-optimize the values in the `sequence_read_then_write` call.
2752        let optimized_mir = if let Some(..) = &plan.values.as_const() {
2753            // We don't perform any optimizations on an expression that is already
2754            // a constant for writes, as we want to maximize bulk-insert throughput.
2755            let expr = return_if_err!(
2756                plan.values
2757                    .clone()
2758                    .lower(self.catalog().system_config(), None),
2759                ctx
2760            );
2761            OptimizedMirRelationExpr(expr)
2762        } else {
2763            // Collect optimizer parameters.
2764            let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2765
2766            // (`optimize::view::Optimizer` has a special case for constant queries.)
2767            let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2768
2769            // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
2770            return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2771        };
2772
2773        match optimized_mir.into_inner() {
2774            selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2775                let catalog = self.owned_catalog();
2776                mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2777                    let result =
2778                        Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2779                    ctx.retire(result);
2780                });
2781            }
2782            // All non-constant values must be planned as read-then-writes.
2783            _ => {
2784                let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2785                    Some(table) => {
2786                        let fullname = self
2787                            .catalog()
2788                            .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2789                        // Inserts always occur at the latest version of the table.
2790                        table
2791                            .desc_latest(&fullname)
2792                            .expect("desc called on table")
2793                            .arity()
2794                    }
2795                    None => {
2796                        ctx.retire(Err(AdapterError::Catalog(
2797                            mz_catalog::memory::error::Error {
2798                                kind: mz_catalog::memory::error::ErrorKind::Sql(
2799                                    CatalogError::UnknownItem(plan.id.to_string()),
2800                                ),
2801                            },
2802                        )));
2803                        return;
2804                    }
2805                };
2806
2807                let finishing = RowSetFinishing {
2808                    order_by: vec![],
2809                    limit: None,
2810                    offset: 0,
2811                    project: (0..desc_arity).collect(),
2812                };
2813
2814                let read_then_write_plan = plan::ReadThenWritePlan {
2815                    id: plan.id,
2816                    selection: plan.values,
2817                    finishing,
2818                    assignments: BTreeMap::new(),
2819                    kind: MutationKind::Insert,
2820                    returning: plan.returning,
2821                };
2822
2823                self.sequence_read_then_write(ctx, read_then_write_plan)
2824                    .await;
2825            }
2826        }
2827    }
2828
2829    /// ReadThenWrite is a plan whose writes depend on the results of a
2830    /// read. This works by doing a Peek then queuing a SendDiffs. No writes
2831    /// or read-then-writes can occur between the Peek and SendDiff otherwise a
2832    /// serializability violation could occur.
2833    #[instrument]
2834    pub(super) async fn sequence_read_then_write(
2835        &mut self,
2836        mut ctx: ExecuteContext,
2837        plan: plan::ReadThenWritePlan,
2838    ) {
2839        let mut source_ids: BTreeSet<_> = plan
2840            .selection
2841            .depends_on()
2842            .into_iter()
2843            .map(|gid| self.catalog().resolve_item_id(&gid))
2844            .collect();
2845        source_ids.insert(plan.id);
2846
2847        // If the transaction doesn't already have write locks, acquire them.
2848        if ctx.session().transaction().write_locks().is_none() {
2849            // Pre-define all of the locks we need.
2850            let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2851
2852            // Try acquiring all of our locks.
2853            for id in &source_ids {
2854                if let Some(lock) = self.try_grant_object_write_lock(*id) {
2855                    write_locks.insert_lock(*id, lock);
2856                }
2857            }
2858
2859            // See if we acquired all of the neccessary locks.
2860            let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2861                Ok(locks) => locks,
2862                Err(missing) => {
2863                    // Defer our write if we couldn't acquire all of the locks.
2864                    let role_metadata = ctx.session().role_metadata().clone();
2865                    let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2866                    let plan = DeferredPlan {
2867                        ctx,
2868                        plan: Plan::ReadThenWrite(plan),
2869                        validity: PlanValidity::new(
2870                            self.catalog.transient_revision(),
2871                            source_ids.clone(),
2872                            None,
2873                            None,
2874                            role_metadata,
2875                        ),
2876                        requires_locks: source_ids,
2877                    };
2878                    return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2879                }
2880            };
2881
2882            ctx.session_mut()
2883                .try_grant_write_locks(write_locks)
2884                .expect("session has already been granted write locks");
2885        }
2886
2887        let plan::ReadThenWritePlan {
2888            id,
2889            kind,
2890            selection,
2891            mut assignments,
2892            finishing,
2893            returning,
2894        } = plan;
2895
2896        // Read then writes can be queued, so re-verify the id exists.
2897        let desc = match self.catalog().try_get_entry(&id) {
2898            Some(table) => {
2899                let full_name = self
2900                    .catalog()
2901                    .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2902                // Inserts always occur at the latest version of the table.
2903                table
2904                    .desc_latest(&full_name)
2905                    .expect("desc called on table")
2906                    .into_owned()
2907            }
2908            None => {
2909                ctx.retire(Err(AdapterError::Catalog(
2910                    mz_catalog::memory::error::Error {
2911                        kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2912                            id.to_string(),
2913                        )),
2914                    },
2915                )));
2916                return;
2917            }
2918        };
2919
2920        // Disallow mz_now in any position because read time and write time differ.
2921        let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2922            || assignments.values().any(|e| e.contains_temporal())
2923            || returning.iter().any(|e| e.contains_temporal());
2924        if contains_temporal {
2925            ctx.retire(Err(AdapterError::Unsupported(
2926                "calls to mz_now in write statements",
2927            )));
2928            return;
2929        }
2930
2931        // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations:
2932        //
2933        // - They do not refer to any objects whose notion of time moves differently than that of
2934        // user tables. This limitation is meant to ensure no writes occur between this read and the
2935        // subsequent write.
2936        // - They do not use mz_now(), whose time produced during read will differ from the write
2937        //   timestamp.
2938        fn validate_read_dependencies(
2939            catalog: &Catalog,
2940            id: &CatalogItemId,
2941        ) -> Result<(), AdapterError> {
2942            use CatalogItemType::*;
2943            use mz_catalog::memory::objects;
2944            let mut ids_to_check = Vec::new();
2945            let valid = match catalog.try_get_entry(id) {
2946                Some(entry) => {
2947                    if let CatalogItem::View(objects::View { optimized_expr, .. })
2948                    | CatalogItem::MaterializedView(objects::MaterializedView {
2949                        optimized_expr,
2950                        ..
2951                    }) = entry.item()
2952                    {
2953                        if optimized_expr.contains_temporal() {
2954                            return Err(AdapterError::Unsupported(
2955                                "calls to mz_now in write statements",
2956                            ));
2957                        }
2958                    }
2959                    match entry.item().typ() {
2960                        typ @ (Func | View | MaterializedView | ContinualTask) => {
2961                            ids_to_check.extend(entry.uses());
2962                            let valid_id = id.is_user() || matches!(typ, Func);
2963                            valid_id
2964                        }
2965                        Source | Secret | Connection => false,
2966                        // Cannot select from sinks or indexes.
2967                        Sink | Index => unreachable!(),
2968                        Table => {
2969                            if !id.is_user() {
2970                                // We can't read from non-user tables
2971                                false
2972                            } else {
2973                                // We can't read from tables that are source-exports
2974                                entry.source_export_details().is_none()
2975                            }
2976                        }
2977                        Type => true,
2978                    }
2979                }
2980                None => false,
2981            };
2982            if !valid {
2983                return Err(AdapterError::InvalidTableMutationSelection);
2984            }
2985            for id in ids_to_check {
2986                validate_read_dependencies(catalog, &id)?;
2987            }
2988            Ok(())
2989        }
2990
2991        for gid in selection.depends_on() {
2992            let item_id = self.catalog().resolve_item_id(&gid);
2993            if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2994                ctx.retire(Err(err));
2995                return;
2996            }
2997        }
2998
2999        let (peek_tx, peek_rx) = oneshot::channel();
3000        let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3001        let (tx, _, session, extra) = ctx.into_parts();
3002        // We construct a new execute context for the peek, with a trivial (`Default::default()`)
3003        // execution context, because this peek does not directly correspond to an execute,
3004        // and so we don't need to take any action on its retirement.
3005        // TODO[btv]: we might consider extending statement logging to log the inner
3006        // statement separately, here. That would require us to plumb through the SQL of the inner statement,
3007        // and mint a new "real" execution context here. We'd also have to add some logic to
3008        // make sure such "sub-statements" are always sampled when the top-level statement is
3009        //
3010        // It's debatable whether this makes sense conceptually,
3011        // because the inner fragment here is not actually a
3012        // "statement" in its own right.
3013        let peek_ctx = ExecuteContext::from_parts(
3014            peek_client_tx,
3015            self.internal_cmd_tx.clone(),
3016            session,
3017            Default::default(),
3018        );
3019        self.sequence_peek(
3020            peek_ctx,
3021            plan::SelectPlan {
3022                select: None,
3023                source: selection,
3024                when: QueryWhen::FreshestTableWrite,
3025                finishing,
3026                copy_to: None,
3027            },
3028            TargetCluster::Active,
3029            None,
3030        )
3031        .await;
3032
3033        let internal_cmd_tx = self.internal_cmd_tx.clone();
3034        let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3035        let catalog = self.owned_catalog();
3036        let max_result_size = self.catalog().system_config().max_result_size();
3037        task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3038            let (peek_response, session) = match peek_rx.await {
3039                Ok(Response {
3040                    result: Ok(resp),
3041                    session,
3042                    otel_ctx,
3043                }) => {
3044                    otel_ctx.attach_as_parent();
3045                    (resp, session)
3046                }
3047                Ok(Response {
3048                    result: Err(e),
3049                    session,
3050                    otel_ctx,
3051                }) => {
3052                    let ctx =
3053                        ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3054                    otel_ctx.attach_as_parent();
3055                    ctx.retire(Err(e));
3056                    return;
3057                }
3058                // It is not an error for these results to be ready after `peek_client_tx` has been dropped.
3059                Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3060            };
3061            let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3062            let mut timeout_dur = *ctx.session().vars().statement_timeout();
3063
3064            // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
3065            if timeout_dur == Duration::ZERO {
3066                timeout_dur = Duration::MAX;
3067            }
3068
3069            let style = ExprPrepStyle::OneShot {
3070                logical_time: EvalTime::NotAvailable,
3071                session: ctx.session(),
3072                catalog_state: catalog.state(),
3073            };
3074            for expr in assignments.values_mut() {
3075                return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3076            }
3077
3078            let make_diffs =
3079                move |mut rows: Box<dyn RowIterator>| -> Result<Vec<(Row, Diff)>, AdapterError> {
3080                    let arena = RowArena::new();
3081                    let mut diffs = Vec::new();
3082                    let mut datum_vec = mz_repr::DatumVec::new();
3083
3084                    while let Some(row) = rows.next() {
3085                        if !assignments.is_empty() {
3086                            assert!(
3087                                matches!(kind, MutationKind::Update),
3088                                "only updates support assignments"
3089                            );
3090                            let mut datums = datum_vec.borrow_with(row);
3091                            let mut updates = vec![];
3092                            for (idx, expr) in &assignments {
3093                                let updated = match expr.eval(&datums, &arena) {
3094                                    Ok(updated) => updated,
3095                                    Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3096                                };
3097                                updates.push((*idx, updated));
3098                            }
3099                            for (idx, new_value) in updates {
3100                                datums[idx] = new_value;
3101                            }
3102                            let updated = Row::pack_slice(&datums);
3103                            diffs.push((updated, Diff::ONE));
3104                        }
3105                        match kind {
3106                            // Updates and deletes always remove the
3107                            // current row. Updates will also add an
3108                            // updated value.
3109                            MutationKind::Update | MutationKind::Delete => {
3110                                diffs.push((row.to_owned(), Diff::MINUS_ONE))
3111                            }
3112                            MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3113                        }
3114                    }
3115                    for (row, diff) in &diffs {
3116                        if diff.is_positive() {
3117                            for (idx, datum) in row.iter().enumerate() {
3118                                desc.constraints_met(idx, &datum)?;
3119                            }
3120                        }
3121                    }
3122                    Ok(diffs)
3123                };
3124            let diffs = match peek_response {
3125                ExecuteResponse::SendingRows { future: batch, .. } => {
3126                    // TODO(jkosh44): This timeout should be removed;
3127                    // we should instead periodically ensure clusters are
3128                    // healthy and actively cancel any work waiting on unhealthy
3129                    // clusters.
3130                    match tokio::time::timeout(timeout_dur, batch).await {
3131                        Ok(res) => match res {
3132                            PeekResponseUnary::Rows(rows) => make_diffs(rows),
3133                            PeekResponseUnary::Canceled => Err(AdapterError::Canceled),
3134                            PeekResponseUnary::Error(e) => {
3135                                Err(AdapterError::Unstructured(anyhow!(e)))
3136                            }
3137                        },
3138                        Err(_) => {
3139                            // We timed out, so remove the pending peek. This is
3140                            // best-effort and doesn't guarantee we won't
3141                            // receive a response.
3142                            // It is not an error for this timeout to occur after `internal_cmd_rx` has been dropped.
3143                            let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3144                                conn_id: ctx.session().conn_id().clone(),
3145                            });
3146                            if let Err(e) = result {
3147                                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3148                            }
3149                            Err(AdapterError::StatementTimeout)
3150                        }
3151                    }
3152                }
3153                ExecuteResponse::SendingRowsImmediate { rows } => make_diffs(rows),
3154                resp => Err(AdapterError::Unstructured(anyhow!(
3155                    "unexpected peek response: {resp:?}"
3156                ))),
3157            };
3158            let mut returning_rows = Vec::new();
3159            let mut diff_err: Option<AdapterError> = None;
3160            if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3161                let arena = RowArena::new();
3162                for (row, diff) in diffs {
3163                    if !diff.is_positive() {
3164                        continue;
3165                    }
3166                    let mut returning_row = Row::with_capacity(returning.len());
3167                    let mut packer = returning_row.packer();
3168                    for expr in &returning {
3169                        let datums: Vec<_> = row.iter().collect();
3170                        match expr.eval(&datums, &arena) {
3171                            Ok(datum) => {
3172                                packer.push(datum);
3173                            }
3174                            Err(err) => {
3175                                diff_err = Some(err.into());
3176                                break;
3177                            }
3178                        }
3179                    }
3180                    let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3181                    let diff = match NonZeroUsize::try_from(diff) {
3182                        Ok(diff) => diff,
3183                        Err(err) => {
3184                            diff_err = Some(err.into());
3185                            break;
3186                        }
3187                    };
3188                    returning_rows.push((returning_row, diff));
3189                    if diff_err.is_some() {
3190                        break;
3191                    }
3192                }
3193            }
3194            let diffs = if let Some(err) = diff_err {
3195                Err(err)
3196            } else {
3197                diffs
3198            };
3199
3200            // We need to clear out the timestamp context so the write doesn't fail due to a
3201            // read only transaction.
3202            let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3203            // No matter what isolation level the client is using, we must linearize this
3204            // read. The write will be performed right after this, as part of a single
3205            // transaction, so the write must have a timestamp greater than or equal to the
3206            // read.
3207            //
3208            // Note: It's only OK for the write to have a greater timestamp than the read
3209            // because the write lock prevents any other writes from happening in between
3210            // the read and write.
3211            if let Some(timestamp_context) = timestamp_context {
3212                let (tx, rx) = tokio::sync::oneshot::channel();
3213                let conn_id = ctx.session().conn_id().clone();
3214                let pending_read_txn = PendingReadTxn {
3215                    txn: PendingRead::ReadThenWrite { ctx, tx },
3216                    timestamp_context,
3217                    created: Instant::now(),
3218                    num_requeues: 0,
3219                    otel_ctx: OpenTelemetryContext::obtain(),
3220                };
3221                let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3222                // It is not an error for these results to be ready after `strict_serializable_reads_rx` has been dropped.
3223                if let Err(e) = result {
3224                    warn!(
3225                        "strict_serializable_reads_tx dropped before we could send: {:?}",
3226                        e
3227                    );
3228                    return;
3229                }
3230                let result = rx.await;
3231                // It is not an error for these results to be ready after `tx` has been dropped.
3232                ctx = match result {
3233                    Ok(Some(ctx)) => ctx,
3234                    Ok(None) => {
3235                        // Coordinator took our context and will handle responding to the client.
3236                        // This usually indicates that our transaction was aborted.
3237                        return;
3238                    }
3239                    Err(e) => {
3240                        warn!(
3241                            "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3242                            e
3243                        );
3244                        return;
3245                    }
3246                };
3247            }
3248
3249            match diffs {
3250                Ok(diffs) => {
3251                    let result = Self::send_diffs(
3252                        ctx.session_mut(),
3253                        plan::SendDiffsPlan {
3254                            id,
3255                            updates: diffs,
3256                            kind,
3257                            returning: returning_rows,
3258                            max_result_size,
3259                        },
3260                    );
3261                    ctx.retire(result);
3262                }
3263                Err(e) => {
3264                    ctx.retire(Err(e));
3265                }
3266            }
3267        });
3268    }
3269
3270    #[instrument]
3271    pub(super) async fn sequence_alter_item_rename(
3272        &mut self,
3273        session: &mut Session,
3274        plan: plan::AlterItemRenamePlan,
3275    ) -> Result<ExecuteResponse, AdapterError> {
3276        let op = catalog::Op::RenameItem {
3277            id: plan.id,
3278            current_full_name: plan.current_full_name,
3279            to_name: plan.to_name,
3280        };
3281        match self
3282            .catalog_transact_with_ddl_transaction(session, vec![op])
3283            .await
3284        {
3285            Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3286            Err(err) => Err(err),
3287        }
3288    }
3289
3290    #[instrument]
3291    pub(super) async fn sequence_alter_retain_history(
3292        &mut self,
3293        session: &mut Session,
3294        plan: plan::AlterRetainHistoryPlan,
3295    ) -> Result<ExecuteResponse, AdapterError> {
3296        let ops = vec![catalog::Op::AlterRetainHistory {
3297            id: plan.id,
3298            value: plan.value,
3299            window: plan.window,
3300        }];
3301        self.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
3302            let catalog_item = coord.catalog().get_entry(&plan.id).item();
3303            let cluster = match catalog_item {
3304                CatalogItem::Table(_)
3305                | CatalogItem::MaterializedView(_)
3306                | CatalogItem::ContinualTask(_) => None,
3307                CatalogItem::Index(index) => Some(index.cluster_id),
3308                CatalogItem::Source(_) => {
3309                    let read_policies = coord.catalog().source_read_policies(plan.id);
3310                    coord.update_storage_read_policies(read_policies);
3311                    return;
3312                }
3313                CatalogItem::Log(_)
3314                | CatalogItem::View(_)
3315                | CatalogItem::Sink(_)
3316                | CatalogItem::Type(_)
3317                | CatalogItem::Func(_)
3318                | CatalogItem::Secret(_)
3319                | CatalogItem::Connection(_) => unreachable!(),
3320            };
3321            match cluster {
3322                Some(cluster) => {
3323                    coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3324                }
3325                None => {
3326                    coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3327                }
3328            }
3329        })
3330        .await?;
3331        Ok(ExecuteResponse::AlteredObject(plan.object_type))
3332    }
3333
3334    #[instrument]
3335    pub(super) async fn sequence_alter_schema_rename(
3336        &mut self,
3337        session: &mut Session,
3338        plan: plan::AlterSchemaRenamePlan,
3339    ) -> Result<ExecuteResponse, AdapterError> {
3340        let (database_spec, schema_spec) = plan.cur_schema_spec;
3341        let op = catalog::Op::RenameSchema {
3342            database_spec,
3343            schema_spec,
3344            new_name: plan.new_schema_name,
3345            check_reserved_names: true,
3346        };
3347        match self
3348            .catalog_transact_with_ddl_transaction(session, vec![op])
3349            .await
3350        {
3351            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3352            Err(err) => Err(err),
3353        }
3354    }
3355
3356    #[instrument]
3357    pub(super) async fn sequence_alter_schema_swap(
3358        &mut self,
3359        session: &mut Session,
3360        plan: plan::AlterSchemaSwapPlan,
3361    ) -> Result<ExecuteResponse, AdapterError> {
3362        let plan::AlterSchemaSwapPlan {
3363            schema_a_spec: (schema_a_db, schema_a),
3364            schema_a_name,
3365            schema_b_spec: (schema_b_db, schema_b),
3366            schema_b_name,
3367            name_temp,
3368        } = plan;
3369
3370        let op_a = catalog::Op::RenameSchema {
3371            database_spec: schema_a_db,
3372            schema_spec: schema_a,
3373            new_name: name_temp,
3374            check_reserved_names: false,
3375        };
3376        let op_b = catalog::Op::RenameSchema {
3377            database_spec: schema_b_db,
3378            schema_spec: schema_b,
3379            new_name: schema_a_name,
3380            check_reserved_names: false,
3381        };
3382        let op_c = catalog::Op::RenameSchema {
3383            database_spec: schema_a_db,
3384            schema_spec: schema_a,
3385            new_name: schema_b_name,
3386            check_reserved_names: false,
3387        };
3388
3389        match self
3390            .catalog_transact_with_ddl_transaction(session, vec![op_a, op_b, op_c])
3391            .await
3392        {
3393            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3394            Err(err) => Err(err),
3395        }
3396    }
3397
3398    #[instrument]
3399    pub(super) async fn sequence_alter_role(
3400        &mut self,
3401        session: &Session,
3402        plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3403    ) -> Result<ExecuteResponse, AdapterError> {
3404        let catalog = self.catalog().for_session(session);
3405        let role = catalog.get_role(&id);
3406
3407        // We'll send these notices to the user, if the operation is successful.
3408        let mut notices = vec![];
3409
3410        // Get the attributes and variables from the role, as they currently are.
3411        let mut attributes = role.attributes().clone();
3412        let mut vars = role.vars().clone();
3413
3414        // Apply our updates.
3415        match option {
3416            PlannedAlterRoleOption::Attributes(attrs) => {
3417                self.validate_role_attributes(&attrs.clone().into())?;
3418
3419                if let Some(inherit) = attrs.inherit {
3420                    attributes.inherit = inherit;
3421                }
3422
3423                if let Some(password) = attrs.password {
3424                    attributes.password = Some(password);
3425                }
3426
3427                if let Some(superuser) = attrs.superuser {
3428                    attributes.superuser = Some(superuser);
3429                }
3430
3431                if let Some(login) = attrs.login {
3432                    attributes.login = Some(login);
3433                }
3434
3435                if attrs.nopassword.unwrap_or(false) {
3436                    attributes.password = None;
3437                }
3438
3439                if let Some(notice) = self.should_emit_rbac_notice(session) {
3440                    notices.push(notice);
3441                }
3442            }
3443            PlannedAlterRoleOption::Variable(variable) => {
3444                // Get the variable to make sure it's valid and visible.
3445                let session_var = session.vars().inspect(variable.name())?;
3446                // Return early if it's not visible.
3447                session_var.visible(session.user(), catalog.system_vars())?;
3448
3449                // Emit a warning when deprecated variables are used.
3450                // TODO(database-issues#8069) remove this after sufficient time has passed
3451                if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3452                    notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3453                } else if let PlannedRoleVariable::Set {
3454                    name,
3455                    value: VariableValue::Values(vals),
3456                } = &variable
3457                {
3458                    if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3459                        notices.push(AdapterNotice::IntrospectionClusterUsage);
3460                    }
3461                }
3462
3463                let var_name = match variable {
3464                    PlannedRoleVariable::Set { name, value } => {
3465                        // Update our persisted set.
3466                        match &value {
3467                            VariableValue::Default => {
3468                                vars.remove(&name);
3469                            }
3470                            VariableValue::Values(vals) => {
3471                                let var = match &vals[..] {
3472                                    [val] => OwnedVarInput::Flat(val.clone()),
3473                                    vals => OwnedVarInput::SqlSet(vals.to_vec()),
3474                                };
3475                                // Make sure the input is valid.
3476                                session_var.check(var.borrow())?;
3477
3478                                vars.insert(name.clone(), var);
3479                            }
3480                        };
3481                        name
3482                    }
3483                    PlannedRoleVariable::Reset { name } => {
3484                        // Remove it from our persisted values.
3485                        vars.remove(&name);
3486                        name
3487                    }
3488                };
3489
3490                // Emit a notice that they need to reconnect to see the change take effect.
3491                notices.push(AdapterNotice::VarDefaultUpdated {
3492                    role: Some(name.clone()),
3493                    var_name: Some(var_name),
3494                });
3495            }
3496        }
3497
3498        let op = catalog::Op::AlterRole {
3499            id,
3500            name,
3501            attributes,
3502            vars: RoleVars { map: vars },
3503        };
3504        let response = self
3505            .catalog_transact(Some(session), vec![op])
3506            .await
3507            .map(|_| ExecuteResponse::AlteredRole)?;
3508
3509        // Send all of our queued notices.
3510        session.add_notices(notices);
3511
3512        Ok(response)
3513    }
3514
3515    #[instrument]
3516    pub(super) async fn sequence_alter_sink_prepare(
3517        &mut self,
3518        ctx: ExecuteContext,
3519        plan: plan::AlterSinkPlan,
3520    ) {
3521        // Put a read hold on the new relation
3522        let id_bundle = crate::CollectionIdBundle {
3523            storage_ids: BTreeSet::from_iter([plan.sink.from]),
3524            compute_ids: BTreeMap::new(),
3525        };
3526        let read_hold = self.acquire_read_holds(&id_bundle);
3527
3528        let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3529            ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3530            return;
3531        };
3532
3533        let otel_ctx = OpenTelemetryContext::obtain();
3534        let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3535
3536        let plan_validity = PlanValidity::new(
3537            self.catalog().transient_revision(),
3538            BTreeSet::from_iter([from_item_id]),
3539            Some(plan.in_cluster),
3540            None,
3541            ctx.session().role_metadata().clone(),
3542        );
3543
3544        // Re-resolve items in the altered statement
3545        // Parse statement.
3546        let create_sink_stmt = match mz_sql::parse::parse(&plan.sink.create_sql)
3547            .expect("invalid create sink sql")
3548            .into_element()
3549            .ast
3550        {
3551            Statement::CreateSink(stmt) => stmt,
3552            _ => unreachable!("invalid statment kind for sink"),
3553        };
3554        let catalog = self.catalog().for_system_session();
3555        let (_, resolved_ids) = match mz_sql::names::resolve(&catalog, create_sink_stmt) {
3556            Ok(ok) => ok,
3557            Err(e) => {
3558                ctx.retire(Err(AdapterError::internal("ALTER SINK", e)));
3559                return;
3560            }
3561        };
3562
3563        info!(
3564            "preparing alter sink for {}: frontiers={:?} export={:?}",
3565            plan.global_id,
3566            self.controller
3567                .storage_collections
3568                .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3569            self.controller.storage.export(plan.global_id)
3570        );
3571
3572        // Now we must wait for the sink to make enough progress such that there is overlap between
3573        // the new `from` collection's read hold and the sink's write frontier.
3574        self.install_storage_watch_set(
3575            ctx.session().conn_id().clone(),
3576            BTreeSet::from_iter([plan.global_id]),
3577            read_ts,
3578            WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3579                ctx: Some(ctx),
3580                otel_ctx,
3581                plan,
3582                plan_validity,
3583                resolved_ids,
3584                read_hold,
3585            }),
3586        );
3587    }
3588
3589    #[instrument]
3590    pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3591        ctx.otel_ctx.attach_as_parent();
3592        match ctx.plan_validity.check(self.catalog()) {
3593            Ok(()) => {}
3594            Err(err) => {
3595                ctx.retire(Err(err));
3596                return;
3597            }
3598        }
3599        {
3600            let plan = &ctx.plan;
3601            info!(
3602                "finishing alter sink for {}: frontiers={:?} export={:?}",
3603                plan.global_id,
3604                self.controller
3605                    .storage_collections
3606                    .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3607                self.controller.storage.export(plan.global_id)
3608            );
3609        }
3610
3611        let plan::AlterSinkPlan {
3612            item_id,
3613            global_id,
3614            sink,
3615            with_snapshot,
3616            in_cluster,
3617        } = ctx.plan.clone();
3618        // Assert that we can recover the updates that happened at the timestamps of the write
3619        // frontier. This must be true in this call.
3620        let write_frontier = &self
3621            .controller
3622            .storage
3623            .export(global_id)
3624            .expect("sink known to exist")
3625            .write_frontier;
3626        let as_of = ctx.read_hold.least_valid_read();
3627        assert!(
3628            write_frontier.iter().all(|t| as_of.less_than(t)),
3629            "{:?} should be strictly less than {:?}",
3630            &*as_of,
3631            &**write_frontier
3632        );
3633
3634        let catalog_sink = Sink {
3635            create_sql: sink.create_sql,
3636            global_id,
3637            from: sink.from,
3638            connection: sink.connection.clone(),
3639            envelope: sink.envelope,
3640            version: sink.version,
3641            with_snapshot,
3642            resolved_ids: ctx.resolved_ids.clone(),
3643            cluster_id: in_cluster,
3644        };
3645
3646        let ops = vec![catalog::Op::UpdateItem {
3647            id: item_id,
3648            name: self.catalog.get_entry(&item_id).name().clone(),
3649            to_item: CatalogItem::Sink(catalog_sink),
3650        }];
3651
3652        match self
3653            .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3654            .await
3655        {
3656            Ok(()) => {}
3657            Err(err) => {
3658                ctx.retire(Err(err));
3659                return;
3660            }
3661        }
3662
3663        let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3664        let storage_sink_desc = StorageSinkDesc {
3665            from: sink.from,
3666            from_desc: from_entry
3667                .desc_opt()
3668                .expect("sinks can only be built on items with descs")
3669                .into_owned(),
3670            connection: sink
3671                .connection
3672                .clone()
3673                .into_inline_connection(self.catalog().state()),
3674            envelope: sink.envelope,
3675            as_of,
3676            with_snapshot,
3677            version: sink.version,
3678            from_storage_metadata: (),
3679            to_storage_metadata: (),
3680        };
3681
3682        self.controller
3683            .storage
3684            .alter_export(
3685                global_id,
3686                ExportDescription {
3687                    sink: storage_sink_desc,
3688                    instance_id: in_cluster,
3689                },
3690            )
3691            .await
3692            .unwrap_or_terminate("cannot fail to alter source desc");
3693
3694        ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3695    }
3696
3697    #[instrument]
3698    pub(super) async fn sequence_alter_connection(
3699        &mut self,
3700        ctx: ExecuteContext,
3701        AlterConnectionPlan { id, action }: AlterConnectionPlan,
3702    ) {
3703        match action {
3704            AlterConnectionAction::RotateKeys => {
3705                self.sequence_rotate_keys(ctx, id).await;
3706            }
3707            AlterConnectionAction::AlterOptions {
3708                set_options,
3709                drop_options,
3710                validate,
3711            } => {
3712                self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3713                    .await
3714            }
3715        }
3716    }
3717
3718    #[instrument]
3719    async fn sequence_alter_connection_options(
3720        &mut self,
3721        mut ctx: ExecuteContext,
3722        id: CatalogItemId,
3723        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3724        drop_options: BTreeSet<ConnectionOptionName>,
3725        validate: bool,
3726    ) {
3727        let cur_entry = self.catalog().get_entry(&id);
3728        let cur_conn = cur_entry.connection().expect("known to be connection");
3729        let connection_gid = cur_conn.global_id();
3730
3731        let inner = || -> Result<Connection, AdapterError> {
3732            // Parse statement.
3733            let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3734                .expect("invalid create sql persisted to catalog")
3735                .into_element()
3736                .ast
3737            {
3738                Statement::CreateConnection(stmt) => stmt,
3739                _ => unreachable!("proved type is source"),
3740            };
3741
3742            let catalog = self.catalog().for_system_session();
3743
3744            // Resolve items in statement
3745            let (mut create_conn_stmt, resolved_ids) =
3746                mz_sql::names::resolve(&catalog, create_conn_stmt)
3747                    .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3748
3749            // Retain options that are neither set nor dropped.
3750            create_conn_stmt
3751                .values
3752                .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3753
3754            // Set new values
3755            create_conn_stmt.values.extend(
3756                set_options
3757                    .into_iter()
3758                    .map(|(name, value)| ConnectionOption { name, value }),
3759            );
3760
3761            // Open a new catalog, which we will use to re-plan our
3762            // statement with the desired config.
3763            let mut catalog = self.catalog().for_system_session();
3764            catalog.mark_id_unresolvable_for_replanning(id);
3765
3766            // Re-define our source in terms of the amended statement
3767            let plan = match mz_sql::plan::plan(
3768                None,
3769                &catalog,
3770                Statement::CreateConnection(create_conn_stmt),
3771                &Params::empty(),
3772                &resolved_ids,
3773            )
3774            .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3775            {
3776                Plan::CreateConnection(plan) => plan,
3777                _ => unreachable!("create source plan is only valid response"),
3778            };
3779
3780            // Parse statement.
3781            let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3782                .expect("invalid create sql persisted to catalog")
3783                .into_element()
3784                .ast
3785            {
3786                Statement::CreateConnection(stmt) => stmt,
3787                _ => unreachable!("proved type is source"),
3788            };
3789
3790            let catalog = self.catalog().for_system_session();
3791
3792            // Resolve items in statement
3793            let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3794                .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3795
3796            Ok(Connection {
3797                create_sql: plan.connection.create_sql,
3798                global_id: cur_conn.global_id,
3799                details: plan.connection.details,
3800                resolved_ids: new_deps,
3801            })
3802        };
3803
3804        let conn = match inner() {
3805            Ok(conn) => conn,
3806            Err(e) => {
3807                return ctx.retire(Err(e));
3808            }
3809        };
3810
3811        if validate {
3812            let connection = conn
3813                .details
3814                .to_connection()
3815                .into_inline_connection(self.catalog().state());
3816
3817            let internal_cmd_tx = self.internal_cmd_tx.clone();
3818            let transient_revision = self.catalog().transient_revision();
3819            let conn_id = ctx.session().conn_id().clone();
3820            let otel_ctx = OpenTelemetryContext::obtain();
3821            let role_metadata = ctx.session().role_metadata().clone();
3822            let current_storage_parameters = self.controller.storage.config().clone();
3823
3824            task::spawn(
3825                || format!("validate_alter_connection:{conn_id}"),
3826                async move {
3827                    let resolved_ids = conn.resolved_ids.clone();
3828                    let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3829                    let result = match connection.validate(id, &current_storage_parameters).await {
3830                        Ok(()) => Ok(conn),
3831                        Err(err) => Err(err.into()),
3832                    };
3833
3834                    // It is not an error for validation to complete after `internal_cmd_rx` is dropped.
3835                    let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3836                        AlterConnectionValidationReady {
3837                            ctx,
3838                            result,
3839                            connection_id: id,
3840                            connection_gid,
3841                            plan_validity: PlanValidity::new(
3842                                transient_revision,
3843                                dependency_ids.clone(),
3844                                None,
3845                                None,
3846                                role_metadata,
3847                            ),
3848                            otel_ctx,
3849                            resolved_ids,
3850                        },
3851                    ));
3852                    if let Err(e) = result {
3853                        tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3854                    }
3855                },
3856            );
3857        } else {
3858            let result = self
3859                .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3860                .await;
3861            ctx.retire(result);
3862        }
3863    }
3864
3865    #[instrument]
3866    pub(crate) async fn sequence_alter_connection_stage_finish(
3867        &mut self,
3868        session: &mut Session,
3869        id: CatalogItemId,
3870        connection: Connection,
3871    ) -> Result<ExecuteResponse, AdapterError> {
3872        match self.catalog.get_entry(&id).item() {
3873            CatalogItem::Connection(curr_conn) => {
3874                curr_conn
3875                    .details
3876                    .to_connection()
3877                    .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3878                    .map_err(StorageError::from)?;
3879            }
3880            _ => unreachable!("known to be a connection"),
3881        };
3882
3883        let ops = vec![catalog::Op::UpdateItem {
3884            id,
3885            name: self.catalog.get_entry(&id).name().clone(),
3886            to_item: CatalogItem::Connection(connection.clone()),
3887        }];
3888
3889        self.catalog_transact(Some(session), ops).await?;
3890
3891        match connection.details {
3892            ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3893                let spec = VpcEndpointConfig {
3894                    aws_service_name: privatelink.service_name.to_owned(),
3895                    availability_zone_ids: privatelink.availability_zones.to_owned(),
3896                };
3897                self.cloud_resource_controller
3898                    .as_ref()
3899                    .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3900                    .ensure_vpc_endpoint(id, spec)
3901                    .await?;
3902            }
3903            _ => {}
3904        };
3905
3906        let entry = self.catalog().get_entry(&id);
3907
3908        let mut connections = VecDeque::new();
3909        connections.push_front(entry.id());
3910
3911        let mut source_connections = BTreeMap::new();
3912        let mut sink_connections = BTreeMap::new();
3913        let mut source_export_data_configs = BTreeMap::new();
3914
3915        while let Some(id) = connections.pop_front() {
3916            for id in self.catalog.get_entry(&id).used_by() {
3917                let entry = self.catalog.get_entry(id);
3918                match entry.item() {
3919                    CatalogItem::Connection(_) => connections.push_back(*id),
3920                    CatalogItem::Source(source) => {
3921                        let desc = match &entry.source().expect("known to be source").data_source {
3922                            DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
3923                                .desc
3924                                .clone()
3925                                .into_inline_connection(self.catalog().state()),
3926                            _ => unreachable!("only ingestions reference connections"),
3927                        };
3928
3929                        source_connections.insert(source.global_id, desc.connection);
3930                    }
3931                    CatalogItem::Sink(sink) => {
3932                        let export = entry.sink().expect("known to be sink");
3933                        sink_connections.insert(
3934                            sink.global_id,
3935                            export
3936                                .connection
3937                                .clone()
3938                                .into_inline_connection(self.catalog().state()),
3939                        );
3940                    }
3941                    CatalogItem::Table(table) => {
3942                        // This is a source-fed table that reference a schema registry
3943                        // connection as a part of its encoding / data config
3944                        if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
3945                            let data_config = export_data_config.clone();
3946                            source_export_data_configs.insert(
3947                                table.global_id_writes(),
3948                                data_config.into_inline_connection(self.catalog().state()),
3949                            );
3950                        }
3951                    }
3952                    t => unreachable!("connection dependency not expected on {:?}", t),
3953                }
3954            }
3955        }
3956
3957        if !source_connections.is_empty() {
3958            self.controller
3959                .storage
3960                .alter_ingestion_connections(source_connections)
3961                .await
3962                .unwrap_or_terminate("cannot fail to alter ingestion connection");
3963        }
3964
3965        if !sink_connections.is_empty() {
3966            self.controller
3967                .storage
3968                .alter_export_connections(sink_connections)
3969                .await
3970                .unwrap_or_terminate("altering exports after txn must succeed");
3971        }
3972
3973        if !source_export_data_configs.is_empty() {
3974            self.controller
3975                .storage
3976                .alter_ingestion_export_data_configs(source_export_data_configs)
3977                .await
3978                .unwrap_or_terminate("altering source export data configs after txn must succeed");
3979        }
3980
3981        Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3982    }
3983
3984    #[instrument]
3985    pub(super) async fn sequence_alter_source(
3986        &mut self,
3987        session: &Session,
3988        plan::AlterSourcePlan {
3989            item_id,
3990            ingestion_id,
3991            action,
3992        }: plan::AlterSourcePlan,
3993    ) -> Result<ExecuteResponse, AdapterError> {
3994        let cur_entry = self.catalog().get_entry(&item_id);
3995        let cur_source = cur_entry.source().expect("known to be source");
3996
3997        let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3998            // Parse statement.
3999            let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4000                .expect("invalid create sql persisted to catalog")
4001                .into_element()
4002                .ast
4003            {
4004                Statement::CreateSource(stmt) => stmt,
4005                _ => unreachable!("proved type is source"),
4006            };
4007
4008            let catalog = coord.catalog().for_system_session();
4009
4010            // Resolve items in statement
4011            mz_sql::names::resolve(&catalog, create_source_stmt)
4012                .map_err(|e| AdapterError::internal(err_cx, e))
4013        };
4014
4015        match action {
4016            plan::AlterSourceAction::AddSubsourceExports {
4017                subsources,
4018                options,
4019            } => {
4020                const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4021
4022                let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4023                    text_columns: mut new_text_columns,
4024                    exclude_columns: mut new_exclude_columns,
4025                    ..
4026                } = options.try_into()?;
4027
4028                // Resolve items in statement
4029                let (mut create_source_stmt, resolved_ids) =
4030                    create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4031
4032                // Get all currently referred-to items
4033                let catalog = self.catalog();
4034                let curr_references: BTreeSet<_> = catalog
4035                    .get_entry(&item_id)
4036                    .used_by()
4037                    .into_iter()
4038                    .filter_map(|subsource| {
4039                        catalog
4040                            .get_entry(subsource)
4041                            .subsource_details()
4042                            .map(|(_id, reference, _details)| reference)
4043                    })
4044                    .collect();
4045
4046                // We are doing a lot of unwrapping, so just make an error to reference; all of
4047                // these invariants are guaranteed to be true because of how we plan subsources.
4048                let purification_err =
4049                    || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4050
4051                // TODO(roshan): Remove all the text-column/ignore-column option merging here once
4052                // we remove support for implicitly created subsources from a `CREATE SOURCE`
4053                // statement.
4054                match &mut create_source_stmt.connection {
4055                    CreateSourceConnection::Postgres {
4056                        options: curr_options,
4057                        ..
4058                    } => {
4059                        let mz_sql::plan::PgConfigOptionExtracted {
4060                            mut text_columns, ..
4061                        } = curr_options.clone().try_into()?;
4062
4063                        // Drop text columns; we will add them back in
4064                        // as appropriate below.
4065                        curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4066
4067                        // Drop all text columns that are not currently referred to.
4068                        text_columns.retain(|column_qualified_reference| {
4069                            mz_ore::soft_assert_eq_or_log!(
4070                                column_qualified_reference.0.len(),
4071                                4,
4072                                "all TEXT COLUMNS values must be column-qualified references"
4073                            );
4074                            let mut table = column_qualified_reference.clone();
4075                            table.0.truncate(3);
4076                            curr_references.contains(&table)
4077                        });
4078
4079                        // Merge the current text columns into the new text columns.
4080                        new_text_columns.extend(text_columns);
4081
4082                        // If we have text columns, add them to the options.
4083                        if !new_text_columns.is_empty() {
4084                            new_text_columns.sort();
4085                            let new_text_columns = new_text_columns
4086                                .into_iter()
4087                                .map(WithOptionValue::UnresolvedItemName)
4088                                .collect();
4089
4090                            curr_options.push(PgConfigOption {
4091                                name: PgConfigOptionName::TextColumns,
4092                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4093                            });
4094                        }
4095                    }
4096                    CreateSourceConnection::MySql {
4097                        options: curr_options,
4098                        ..
4099                    } => {
4100                        let mz_sql::plan::MySqlConfigOptionExtracted {
4101                            mut text_columns,
4102                            mut exclude_columns,
4103                            ..
4104                        } = curr_options.clone().try_into()?;
4105
4106                        // Drop both ignore and text columns; we will add them back in
4107                        // as appropriate below.
4108                        curr_options.retain(|o| {
4109                            !matches!(
4110                                o.name,
4111                                MySqlConfigOptionName::TextColumns
4112                                    | MySqlConfigOptionName::ExcludeColumns
4113                            )
4114                        });
4115
4116                        // Drop all text / exclude columns that are not currently referred to.
4117                        let column_referenced =
4118                            |column_qualified_reference: &UnresolvedItemName| {
4119                                mz_ore::soft_assert_eq_or_log!(
4120                                    column_qualified_reference.0.len(),
4121                                    3,
4122                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4123                                );
4124                                let mut table = column_qualified_reference.clone();
4125                                table.0.truncate(2);
4126                                curr_references.contains(&table)
4127                            };
4128                        text_columns.retain(column_referenced);
4129                        exclude_columns.retain(column_referenced);
4130
4131                        // Merge the current text / exclude columns into the new text / exclude columns.
4132                        new_text_columns.extend(text_columns);
4133                        new_exclude_columns.extend(exclude_columns);
4134
4135                        // If we have text columns, add them to the options.
4136                        if !new_text_columns.is_empty() {
4137                            new_text_columns.sort();
4138                            let new_text_columns = new_text_columns
4139                                .into_iter()
4140                                .map(WithOptionValue::UnresolvedItemName)
4141                                .collect();
4142
4143                            curr_options.push(MySqlConfigOption {
4144                                name: MySqlConfigOptionName::TextColumns,
4145                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4146                            });
4147                        }
4148                        // If we have exclude columns, add them to the options.
4149                        if !new_exclude_columns.is_empty() {
4150                            new_exclude_columns.sort();
4151                            let new_exclude_columns = new_exclude_columns
4152                                .into_iter()
4153                                .map(WithOptionValue::UnresolvedItemName)
4154                                .collect();
4155
4156                            curr_options.push(MySqlConfigOption {
4157                                name: MySqlConfigOptionName::ExcludeColumns,
4158                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4159                            });
4160                        }
4161                    }
4162                    CreateSourceConnection::SqlServer {
4163                        options: curr_options,
4164                        ..
4165                    } => {
4166                        let mz_sql::plan::SqlServerConfigOptionExtracted {
4167                            mut text_columns,
4168                            mut exclude_columns,
4169                            ..
4170                        } = curr_options.clone().try_into()?;
4171
4172                        // Drop both ignore and text columns; we will add them back in
4173                        // as appropriate below.
4174                        curr_options.retain(|o| {
4175                            !matches!(
4176                                o.name,
4177                                SqlServerConfigOptionName::TextColumns
4178                                    | SqlServerConfigOptionName::ExcludeColumns
4179                            )
4180                        });
4181
4182                        // Drop all text / exclude columns that are not currently referred to.
4183                        let column_referenced =
4184                            |column_qualified_reference: &UnresolvedItemName| {
4185                                mz_ore::soft_assert_eq_or_log!(
4186                                    column_qualified_reference.0.len(),
4187                                    3,
4188                                    "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4189                                );
4190                                let mut table = column_qualified_reference.clone();
4191                                table.0.truncate(2);
4192                                curr_references.contains(&table)
4193                            };
4194                        text_columns.retain(column_referenced);
4195                        exclude_columns.retain(column_referenced);
4196
4197                        // Merge the current text / exclude columns into the new text / exclude columns.
4198                        new_text_columns.extend(text_columns);
4199                        new_exclude_columns.extend(exclude_columns);
4200
4201                        // If we have text columns, add them to the options.
4202                        if !new_text_columns.is_empty() {
4203                            new_text_columns.sort();
4204                            let new_text_columns = new_text_columns
4205                                .into_iter()
4206                                .map(WithOptionValue::UnresolvedItemName)
4207                                .collect();
4208
4209                            curr_options.push(SqlServerConfigOption {
4210                                name: SqlServerConfigOptionName::TextColumns,
4211                                value: Some(WithOptionValue::Sequence(new_text_columns)),
4212                            });
4213                        }
4214                        // If we have exclude columns, add them to the options.
4215                        if !new_exclude_columns.is_empty() {
4216                            new_exclude_columns.sort();
4217                            let new_exclude_columns = new_exclude_columns
4218                                .into_iter()
4219                                .map(WithOptionValue::UnresolvedItemName)
4220                                .collect();
4221
4222                            curr_options.push(SqlServerConfigOption {
4223                                name: SqlServerConfigOptionName::ExcludeColumns,
4224                                value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4225                            });
4226                        }
4227                    }
4228                    _ => return Err(purification_err()),
4229                };
4230
4231                let mut catalog = self.catalog().for_system_session();
4232                catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4233
4234                // Re-define our source in terms of the amended statement
4235                let plan = match mz_sql::plan::plan(
4236                    None,
4237                    &catalog,
4238                    Statement::CreateSource(create_source_stmt),
4239                    &Params::empty(),
4240                    &resolved_ids,
4241                )
4242                .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4243                {
4244                    Plan::CreateSource(plan) => plan,
4245                    _ => unreachable!("create source plan is only valid response"),
4246                };
4247
4248                // Asserting that we've done the right thing with dependencies
4249                // here requires mocking out objects in the catalog, which is a
4250                // large task for an operation we have to cover in tests anyway.
4251                let source = Source::new(
4252                    plan,
4253                    cur_source.global_id,
4254                    resolved_ids,
4255                    cur_source.custom_logical_compaction_window,
4256                    cur_source.is_retained_metrics_object,
4257                );
4258
4259                let source_compaction_window = source.custom_logical_compaction_window;
4260
4261                // Get new ingestion description for storage.
4262                let desc = match &source.data_source {
4263                    DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
4264                        .desc
4265                        .clone()
4266                        .into_inline_connection(self.catalog().state()),
4267                    _ => unreachable!("already verified of type ingestion"),
4268                };
4269
4270                self.controller
4271                    .storage
4272                    .check_alter_ingestion_source_desc(ingestion_id, &desc)
4273                    .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4274
4275                // Redefine source. This must be done before we create any new
4276                // subsources so that it has the right ingestion.
4277                let mut ops = vec![catalog::Op::UpdateItem {
4278                    id: item_id,
4279                    // Look this up again so we don't have to hold an immutable reference to the
4280                    // entry for so long.
4281                    name: self.catalog.get_entry(&item_id).name().clone(),
4282                    to_item: CatalogItem::Source(source),
4283                }];
4284
4285                let CreateSourceInner {
4286                    ops: new_ops,
4287                    sources,
4288                    if_not_exists_ids,
4289                } = self.create_source_inner(session, subsources).await?;
4290
4291                ops.extend(new_ops.into_iter());
4292
4293                assert!(
4294                    if_not_exists_ids.is_empty(),
4295                    "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4296                );
4297
4298                self.catalog_transact(Some(session), ops).await?;
4299
4300                self.controller
4301                    .storage
4302                    .alter_ingestion_source_desc(ingestion_id, desc)
4303                    .await
4304                    .unwrap_or_terminate("cannot fail to alter source desc");
4305
4306                let mut item_ids = BTreeSet::new();
4307                let mut collections = Vec::with_capacity(sources.len());
4308                for (item_id, source) in sources {
4309                    let status_id = self.catalog().resolve_builtin_storage_collection(
4310                        &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4311                    );
4312                    let source_status_collection_id =
4313                        Some(self.catalog().get_entry(&status_id).latest_global_id());
4314
4315                    let (data_source, status_collection_id) = match source.data_source {
4316                        // Subsources use source statuses.
4317                        DataSourceDesc::IngestionExport {
4318                            ingestion_id,
4319                            external_reference: _,
4320                            details,
4321                            data_config,
4322                        } => {
4323                            // TODO(parkmycar): We should probably check the type here, but I'm not sure if
4324                            // this will always be a Source or a Table.
4325                            let ingestion_id =
4326                                self.catalog().get_entry(&ingestion_id).latest_global_id();
4327                            (
4328                                DataSource::IngestionExport {
4329                                    ingestion_id,
4330                                    details,
4331                                    data_config: data_config
4332                                        .into_inline_connection(self.catalog().state()),
4333                                },
4334                                source_status_collection_id,
4335                            )
4336                        }
4337                        o => {
4338                            unreachable!(
4339                                "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4340                                o
4341                            )
4342                        }
4343                    };
4344
4345                    collections.push((
4346                        source.global_id,
4347                        CollectionDescription {
4348                            desc: source.desc.clone(),
4349                            data_source,
4350                            since: None,
4351                            status_collection_id,
4352                            timeline: Some(source.timeline.clone()),
4353                        },
4354                    ));
4355
4356                    item_ids.insert(item_id);
4357                }
4358
4359                let storage_metadata = self.catalog.state().storage_metadata();
4360
4361                self.controller
4362                    .storage
4363                    .create_collections(storage_metadata, None, collections)
4364                    .await
4365                    .unwrap_or_terminate("cannot fail to create collections");
4366
4367                self.initialize_storage_read_policies(
4368                    item_ids,
4369                    source_compaction_window.unwrap_or(CompactionWindow::Default),
4370                )
4371                .await;
4372            }
4373            plan::AlterSourceAction::RefreshReferences { references } => {
4374                self.catalog_transact(
4375                    Some(session),
4376                    vec![catalog::Op::UpdateSourceReferences {
4377                        source_id: item_id,
4378                        references: references.into(),
4379                    }],
4380                )
4381                .await?;
4382            }
4383        }
4384
4385        Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4386    }
4387
4388    #[instrument]
4389    pub(super) async fn sequence_alter_system_set(
4390        &mut self,
4391        session: &Session,
4392        plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4393    ) -> Result<ExecuteResponse, AdapterError> {
4394        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4395        // We want to ensure that the network policy we're switching too actually exists.
4396        if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4397            self.validate_alter_system_network_policy(session, &value)?;
4398        }
4399
4400        let op = match value {
4401            plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4402                name: name.clone(),
4403                value: OwnedVarInput::SqlSet(values),
4404            },
4405            plan::VariableValue::Default => {
4406                catalog::Op::ResetSystemConfiguration { name: name.clone() }
4407            }
4408        };
4409        self.catalog_transact(Some(session), vec![op]).await?;
4410
4411        session.add_notice(AdapterNotice::VarDefaultUpdated {
4412            role: None,
4413            var_name: Some(name),
4414        });
4415        Ok(ExecuteResponse::AlteredSystemConfiguration)
4416    }
4417
4418    #[instrument]
4419    pub(super) async fn sequence_alter_system_reset(
4420        &mut self,
4421        session: &Session,
4422        plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4423    ) -> Result<ExecuteResponse, AdapterError> {
4424        self.is_user_allowed_to_alter_system(session, Some(&name))?;
4425        let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4426        self.catalog_transact(Some(session), vec![op]).await?;
4427        session.add_notice(AdapterNotice::VarDefaultUpdated {
4428            role: None,
4429            var_name: Some(name),
4430        });
4431        Ok(ExecuteResponse::AlteredSystemConfiguration)
4432    }
4433
4434    #[instrument]
4435    pub(super) async fn sequence_alter_system_reset_all(
4436        &mut self,
4437        session: &Session,
4438        _: plan::AlterSystemResetAllPlan,
4439    ) -> Result<ExecuteResponse, AdapterError> {
4440        self.is_user_allowed_to_alter_system(session, None)?;
4441        let op = catalog::Op::ResetAllSystemConfiguration;
4442        self.catalog_transact(Some(session), vec![op]).await?;
4443        session.add_notice(AdapterNotice::VarDefaultUpdated {
4444            role: None,
4445            var_name: None,
4446        });
4447        Ok(ExecuteResponse::AlteredSystemConfiguration)
4448    }
4449
4450    // TODO(jkosh44) Move this into rbac.rs once RBAC is always on.
4451    fn is_user_allowed_to_alter_system(
4452        &self,
4453        session: &Session,
4454        var_name: Option<&str>,
4455    ) -> Result<(), AdapterError> {
4456        match (session.user().kind(), var_name) {
4457            // Only internal superusers can reset all system variables.
4458            (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4459            // Whether or not a variable can be modified depends if we're an internal superuser.
4460            (UserKind::Superuser, Some(name))
4461                if session.user().is_internal()
4462                    || self.catalog().system_config().user_modifiable(name) =>
4463            {
4464                // In lieu of plumbing the user to all system config functions, just check that
4465                // the var is visible.
4466                let var = self.catalog().system_config().get(name)?;
4467                var.visible(session.user(), self.catalog().system_config())?;
4468                Ok(())
4469            }
4470            // If we're not a superuser, but the variable is user modifiable, indicate they can use
4471            // session variables.
4472            (UserKind::Regular, Some(name))
4473                if self.catalog().system_config().user_modifiable(name) =>
4474            {
4475                Err(AdapterError::Unauthorized(
4476                    rbac::UnauthorizedError::Superuser {
4477                        action: format!("toggle the '{name}' system configuration parameter"),
4478                    },
4479                ))
4480            }
4481            _ => Err(AdapterError::Unauthorized(
4482                rbac::UnauthorizedError::MzSystem {
4483                    action: "alter system".into(),
4484                },
4485            )),
4486        }
4487    }
4488
4489    fn validate_alter_system_network_policy(
4490        &self,
4491        session: &Session,
4492        policy_value: &plan::VariableValue,
4493    ) -> Result<(), AdapterError> {
4494        let policy_name = match &policy_value {
4495            // Make sure the compiled in default still exists.
4496            plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4497            plan::VariableValue::Values(values) if values.len() == 1 => {
4498                values.iter().next().cloned()
4499            }
4500            plan::VariableValue::Values(values) => {
4501                tracing::warn!(?values, "can't set multiple network policies at once");
4502                None
4503            }
4504        };
4505        let maybe_network_policy = policy_name
4506            .as_ref()
4507            .and_then(|name| self.catalog.get_network_policy_by_name(name));
4508        let Some(network_policy) = maybe_network_policy else {
4509            return Err(AdapterError::PlanError(plan::PlanError::VarError(
4510                VarError::InvalidParameterValue {
4511                    name: NETWORK_POLICY.name(),
4512                    invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4513                    reason: "no network policy with such name exists".to_string(),
4514                },
4515            )));
4516        };
4517        self.validate_alter_network_policy(session, &network_policy.rules)
4518    }
4519
4520    /// Validates that a set of [`NetworkPolicyRule`]s is valid for the current [`Session`].
4521    ///
4522    /// This helps prevent users from modifying network policies in a way that would lock out their
4523    /// current connection.
4524    fn validate_alter_network_policy(
4525        &self,
4526        session: &Session,
4527        policy_rules: &Vec<NetworkPolicyRule>,
4528    ) -> Result<(), AdapterError> {
4529        // If the user is not an internal user attempt to protect them from
4530        // blocking themselves.
4531        if session.user().is_internal() {
4532            return Ok(());
4533        }
4534        if let Some(ip) = session.meta().client_ip() {
4535            validate_ip_with_policy_rules(ip, policy_rules)
4536                .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4537        } else {
4538            // Sessions without IPs are only temporarily constructed for default values
4539            // they should not be permitted here.
4540            return Err(AdapterError::NetworkPolicyDenied(
4541                NetworkPolicyError::MissingIp,
4542            ));
4543        }
4544        Ok(())
4545    }
4546
4547    // Returns the name of the portal to execute.
4548    #[instrument]
4549    pub(super) fn sequence_execute(
4550        &mut self,
4551        session: &mut Session,
4552        plan: plan::ExecutePlan,
4553    ) -> Result<String, AdapterError> {
4554        // Verify the stmt is still valid.
4555        Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4556        let ps = session
4557            .get_prepared_statement_unverified(&plan.name)
4558            .expect("known to exist");
4559        let stmt = ps.stmt().cloned();
4560        let desc = ps.desc().clone();
4561        let revision = ps.catalog_revision;
4562        let logging = Arc::clone(ps.logging());
4563        session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), revision)
4564    }
4565
4566    #[instrument]
4567    pub(super) async fn sequence_grant_privileges(
4568        &mut self,
4569        session: &Session,
4570        plan::GrantPrivilegesPlan {
4571            update_privileges,
4572            grantees,
4573        }: plan::GrantPrivilegesPlan,
4574    ) -> Result<ExecuteResponse, AdapterError> {
4575        self.sequence_update_privileges(
4576            session,
4577            update_privileges,
4578            grantees,
4579            UpdatePrivilegeVariant::Grant,
4580        )
4581        .await
4582    }
4583
4584    #[instrument]
4585    pub(super) async fn sequence_revoke_privileges(
4586        &mut self,
4587        session: &Session,
4588        plan::RevokePrivilegesPlan {
4589            update_privileges,
4590            revokees,
4591        }: plan::RevokePrivilegesPlan,
4592    ) -> Result<ExecuteResponse, AdapterError> {
4593        self.sequence_update_privileges(
4594            session,
4595            update_privileges,
4596            revokees,
4597            UpdatePrivilegeVariant::Revoke,
4598        )
4599        .await
4600    }
4601
4602    #[instrument]
4603    async fn sequence_update_privileges(
4604        &mut self,
4605        session: &Session,
4606        update_privileges: Vec<UpdatePrivilege>,
4607        grantees: Vec<RoleId>,
4608        variant: UpdatePrivilegeVariant,
4609    ) -> Result<ExecuteResponse, AdapterError> {
4610        let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4611        let mut warnings = Vec::new();
4612        let catalog = self.catalog().for_session(session);
4613
4614        for UpdatePrivilege {
4615            acl_mode,
4616            target_id,
4617            grantor,
4618        } in update_privileges
4619        {
4620            let actual_object_type = catalog.get_system_object_type(&target_id);
4621            // For all relations we allow all applicable table privileges, but send a warning if the
4622            // privilege isn't actually applicable to the object type.
4623            if actual_object_type.is_relation() {
4624                let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4625                let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4626                if !non_applicable_privileges.is_empty() {
4627                    let object_description =
4628                        ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4629                    warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4630                        non_applicable_privileges,
4631                        object_description,
4632                    })
4633                }
4634            }
4635
4636            if let SystemObjectId::Object(object_id) = &target_id {
4637                self.catalog()
4638                    .ensure_not_reserved_object(object_id, session.conn_id())?;
4639            }
4640
4641            let privileges = self
4642                .catalog()
4643                .get_privileges(&target_id, session.conn_id())
4644                // Should be unreachable since the parser will refuse to parse grant/revoke
4645                // statements on objects without privileges.
4646                .ok_or(AdapterError::Unsupported(
4647                    "GRANTs/REVOKEs on an object type with no privileges",
4648                ))?;
4649
4650            for grantee in &grantees {
4651                self.catalog().ensure_not_system_role(grantee)?;
4652                self.catalog().ensure_not_predefined_role(grantee)?;
4653                let existing_privilege = privileges
4654                    .get_acl_item(grantee, &grantor)
4655                    .map(Cow::Borrowed)
4656                    .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4657
4658                match variant {
4659                    UpdatePrivilegeVariant::Grant
4660                        if !existing_privilege.acl_mode.contains(acl_mode) =>
4661                    {
4662                        ops.push(catalog::Op::UpdatePrivilege {
4663                            target_id: target_id.clone(),
4664                            privilege: MzAclItem {
4665                                grantee: *grantee,
4666                                grantor,
4667                                acl_mode,
4668                            },
4669                            variant,
4670                        });
4671                    }
4672                    UpdatePrivilegeVariant::Revoke
4673                        if !existing_privilege
4674                            .acl_mode
4675                            .intersection(acl_mode)
4676                            .is_empty() =>
4677                    {
4678                        ops.push(catalog::Op::UpdatePrivilege {
4679                            target_id: target_id.clone(),
4680                            privilege: MzAclItem {
4681                                grantee: *grantee,
4682                                grantor,
4683                                acl_mode,
4684                            },
4685                            variant,
4686                        });
4687                    }
4688                    // no-op
4689                    _ => {}
4690                }
4691            }
4692        }
4693
4694        if ops.is_empty() {
4695            session.add_notices(warnings);
4696            return Ok(variant.into());
4697        }
4698
4699        let res = self
4700            .catalog_transact(Some(session), ops)
4701            .await
4702            .map(|_| match variant {
4703                UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4704                UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4705            });
4706        if res.is_ok() {
4707            session.add_notices(warnings);
4708        }
4709        res
4710    }
4711
4712    #[instrument]
4713    pub(super) async fn sequence_alter_default_privileges(
4714        &mut self,
4715        session: &Session,
4716        plan::AlterDefaultPrivilegesPlan {
4717            privilege_objects,
4718            privilege_acl_items,
4719            is_grant,
4720        }: plan::AlterDefaultPrivilegesPlan,
4721    ) -> Result<ExecuteResponse, AdapterError> {
4722        let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4723        let variant = if is_grant {
4724            UpdatePrivilegeVariant::Grant
4725        } else {
4726            UpdatePrivilegeVariant::Revoke
4727        };
4728        for privilege_object in &privilege_objects {
4729            self.catalog()
4730                .ensure_not_system_role(&privilege_object.role_id)?;
4731            self.catalog()
4732                .ensure_not_predefined_role(&privilege_object.role_id)?;
4733            if let Some(database_id) = privilege_object.database_id {
4734                self.catalog()
4735                    .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4736            }
4737            if let Some(schema_id) = privilege_object.schema_id {
4738                let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4739                let schema_spec: SchemaSpecifier = schema_id.into();
4740
4741                self.catalog().ensure_not_reserved_object(
4742                    &(database_spec, schema_spec).into(),
4743                    session.conn_id(),
4744                )?;
4745            }
4746            for privilege_acl_item in &privilege_acl_items {
4747                self.catalog()
4748                    .ensure_not_system_role(&privilege_acl_item.grantee)?;
4749                self.catalog()
4750                    .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4751                ops.push(catalog::Op::UpdateDefaultPrivilege {
4752                    privilege_object: privilege_object.clone(),
4753                    privilege_acl_item: privilege_acl_item.clone(),
4754                    variant,
4755                })
4756            }
4757        }
4758
4759        self.catalog_transact(Some(session), ops).await?;
4760        Ok(ExecuteResponse::AlteredDefaultPrivileges)
4761    }
4762
4763    #[instrument]
4764    pub(super) async fn sequence_grant_role(
4765        &mut self,
4766        session: &Session,
4767        plan::GrantRolePlan {
4768            role_ids,
4769            member_ids,
4770            grantor_id,
4771        }: plan::GrantRolePlan,
4772    ) -> Result<ExecuteResponse, AdapterError> {
4773        let catalog = self.catalog();
4774        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4775        for role_id in role_ids {
4776            for member_id in &member_ids {
4777                let member_membership: BTreeSet<_> =
4778                    catalog.get_role(member_id).membership().keys().collect();
4779                if member_membership.contains(&role_id) {
4780                    let role_name = catalog.get_role(&role_id).name().to_string();
4781                    let member_name = catalog.get_role(member_id).name().to_string();
4782                    // We need this check so we don't accidentally return a success on a reserved role.
4783                    catalog.ensure_not_reserved_role(member_id)?;
4784                    catalog.ensure_grantable_role(&role_id)?;
4785                    session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4786                        role_name,
4787                        member_name,
4788                    });
4789                } else {
4790                    ops.push(catalog::Op::GrantRole {
4791                        role_id,
4792                        member_id: *member_id,
4793                        grantor_id,
4794                    });
4795                }
4796            }
4797        }
4798
4799        if ops.is_empty() {
4800            return Ok(ExecuteResponse::GrantedRole);
4801        }
4802
4803        self.catalog_transact(Some(session), ops)
4804            .await
4805            .map(|_| ExecuteResponse::GrantedRole)
4806    }
4807
4808    #[instrument]
4809    pub(super) async fn sequence_revoke_role(
4810        &mut self,
4811        session: &Session,
4812        plan::RevokeRolePlan {
4813            role_ids,
4814            member_ids,
4815            grantor_id,
4816        }: plan::RevokeRolePlan,
4817    ) -> Result<ExecuteResponse, AdapterError> {
4818        let catalog = self.catalog();
4819        let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4820        for role_id in role_ids {
4821            for member_id in &member_ids {
4822                let member_membership: BTreeSet<_> =
4823                    catalog.get_role(member_id).membership().keys().collect();
4824                if !member_membership.contains(&role_id) {
4825                    let role_name = catalog.get_role(&role_id).name().to_string();
4826                    let member_name = catalog.get_role(member_id).name().to_string();
4827                    // We need this check so we don't accidentally return a success on a reserved role.
4828                    catalog.ensure_not_reserved_role(member_id)?;
4829                    catalog.ensure_grantable_role(&role_id)?;
4830                    session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4831                        role_name,
4832                        member_name,
4833                    });
4834                } else {
4835                    ops.push(catalog::Op::RevokeRole {
4836                        role_id,
4837                        member_id: *member_id,
4838                        grantor_id,
4839                    });
4840                }
4841            }
4842        }
4843
4844        if ops.is_empty() {
4845            return Ok(ExecuteResponse::RevokedRole);
4846        }
4847
4848        self.catalog_transact(Some(session), ops)
4849            .await
4850            .map(|_| ExecuteResponse::RevokedRole)
4851    }
4852
4853    #[instrument]
4854    pub(super) async fn sequence_alter_owner(
4855        &mut self,
4856        session: &Session,
4857        plan::AlterOwnerPlan {
4858            id,
4859            object_type,
4860            new_owner,
4861        }: plan::AlterOwnerPlan,
4862    ) -> Result<ExecuteResponse, AdapterError> {
4863        let mut ops = vec![catalog::Op::UpdateOwner {
4864            id: id.clone(),
4865            new_owner,
4866        }];
4867
4868        match &id {
4869            ObjectId::Item(global_id) => {
4870                let entry = self.catalog().get_entry(global_id);
4871
4872                // Cannot directly change the owner of an index.
4873                if entry.is_index() {
4874                    let name = self
4875                        .catalog()
4876                        .resolve_full_name(entry.name(), Some(session.conn_id()))
4877                        .to_string();
4878                    session.add_notice(AdapterNotice::AlterIndexOwner { name });
4879                    return Ok(ExecuteResponse::AlteredObject(object_type));
4880                }
4881
4882                // Alter owner cascades down to dependent indexes.
4883                let dependent_index_ops = entry
4884                    .used_by()
4885                    .into_iter()
4886                    .filter(|id| self.catalog().get_entry(id).is_index())
4887                    .map(|id| catalog::Op::UpdateOwner {
4888                        id: ObjectId::Item(*id),
4889                        new_owner,
4890                    });
4891                ops.extend(dependent_index_ops);
4892
4893                // Alter owner cascades down to progress collections.
4894                let dependent_subsources =
4895                    entry
4896                        .progress_id()
4897                        .into_iter()
4898                        .map(|item_id| catalog::Op::UpdateOwner {
4899                            id: ObjectId::Item(item_id),
4900                            new_owner,
4901                        });
4902                ops.extend(dependent_subsources);
4903            }
4904            ObjectId::Cluster(cluster_id) => {
4905                let cluster = self.catalog().get_cluster(*cluster_id);
4906                // Alter owner cascades down to cluster replicas.
4907                let managed_cluster_replica_ops =
4908                    cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4909                        id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4910                        new_owner,
4911                    });
4912                ops.extend(managed_cluster_replica_ops);
4913            }
4914            _ => {}
4915        }
4916
4917        self.catalog_transact(Some(session), ops)
4918            .await
4919            .map(|_| ExecuteResponse::AlteredObject(object_type))
4920    }
4921
4922    #[instrument]
4923    pub(super) async fn sequence_reassign_owned(
4924        &mut self,
4925        session: &Session,
4926        plan::ReassignOwnedPlan {
4927            old_roles,
4928            new_role,
4929            reassign_ids,
4930        }: plan::ReassignOwnedPlan,
4931    ) -> Result<ExecuteResponse, AdapterError> {
4932        for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4933            self.catalog().ensure_not_reserved_role(role_id)?;
4934        }
4935
4936        let ops = reassign_ids
4937            .into_iter()
4938            .map(|id| catalog::Op::UpdateOwner {
4939                id,
4940                new_owner: new_role,
4941            })
4942            .collect();
4943
4944        self.catalog_transact(Some(session), ops)
4945            .await
4946            .map(|_| ExecuteResponse::ReassignOwned)
4947    }
4948
4949    #[instrument]
4950    pub(crate) async fn handle_deferred_statement(&mut self) {
4951        // It is possible Message::DeferredStatementReady was sent but then a session cancellation
4952        // was processed, removing the single element from deferred_statements, so it is expected
4953        // that this is sometimes empty.
4954        let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4955            return;
4956        };
4957        match ps {
4958            crate::coord::PlanStatement::Statement { stmt, params } => {
4959                self.handle_execute_inner(stmt, params, ctx).await;
4960            }
4961            crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4962                self.sequence_plan(ctx, plan, resolved_ids).await;
4963            }
4964        }
4965    }
4966
4967    #[instrument]
4968    // TODO(parkmycar): Remove this once we have an actual implementation.
4969    #[allow(clippy::unused_async)]
4970    pub(super) async fn sequence_alter_table(
4971        &mut self,
4972        session: &Session,
4973        plan: plan::AlterTablePlan,
4974    ) -> Result<ExecuteResponse, AdapterError> {
4975        let plan::AlterTablePlan {
4976            relation_id,
4977            column_name,
4978            column_type,
4979            raw_sql_type,
4980        } = plan;
4981
4982        // TODO(alter_table): Support allocating GlobalIds without a CatalogItemId.
4983        let id_ts = self.get_catalog_write_ts().await;
4984        let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4985        let ops = vec![catalog::Op::AlterAddColumn {
4986            id: relation_id,
4987            new_global_id,
4988            name: column_name,
4989            typ: column_type,
4990            sql: raw_sql_type,
4991        }];
4992
4993        let entry = self.catalog().get_entry(&relation_id);
4994        let CatalogItem::Table(table) = &entry.item else {
4995            let err = format!("expected table, found {:?}", entry.item);
4996            return Err(AdapterError::Internal(err));
4997        };
4998        // Expected schema version, before altering the table.
4999        let expected_version = table.desc.latest_version();
5000        let existing_global_id = table.global_id_writes();
5001
5002        self.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
5003            let entry = coord.catalog().get_entry(&relation_id);
5004            let CatalogItem::Table(table) = &entry.item else {
5005                panic!("programming error, expected table found {:?}", entry.item);
5006            };
5007            let table = table.clone();
5008
5009            // Acquire a read hold on the original table for the duration of
5010            // the alter to prevent the since of the original table from
5011            // getting advanced, while the ALTER is running.
5012            let existing_table = crate::CollectionIdBundle {
5013                storage_ids: btreeset![existing_global_id],
5014                compute_ids: BTreeMap::new(),
5015            };
5016            let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5017
5018            let new_version = table.desc.latest_version();
5019            let new_desc = table
5020                .desc
5021                .at_version(RelationVersionSelector::Specific(new_version));
5022            let register_ts = coord.get_local_write_ts().await.timestamp;
5023
5024            // Alter the table description, creating a "new" collection.
5025            coord
5026                .controller
5027                .storage
5028                .alter_table_desc(
5029                    existing_global_id,
5030                    new_global_id,
5031                    new_desc,
5032                    expected_version,
5033                    register_ts,
5034                )
5035                .await
5036                .expect("failed to alter desc of table");
5037
5038            // Initialize the ReadPolicy which ensures we have the correct read holds.
5039            let compaction_window = table
5040                .custom_logical_compaction_window
5041                .unwrap_or(CompactionWindow::Default);
5042            coord
5043                .initialize_read_policies(
5044                    &crate::CollectionIdBundle {
5045                        storage_ids: btreeset![new_global_id],
5046                        compute_ids: BTreeMap::new(),
5047                    },
5048                    compaction_window,
5049                )
5050                .await;
5051            coord.apply_local_write(register_ts).await;
5052
5053            // Alter is complete! We can drop our read hold.
5054            drop(existing_table_read_hold);
5055        })
5056        .await?;
5057
5058        Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5059    }
5060}
5061
5062#[derive(Debug)]
5063struct CachedStatisticsOracle {
5064    cache: BTreeMap<GlobalId, usize>,
5065}
5066
5067impl CachedStatisticsOracle {
5068    pub async fn new<T: TimelyTimestamp>(
5069        ids: &BTreeSet<GlobalId>,
5070        as_of: &Antichain<T>,
5071        storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5072    ) -> Result<Self, StorageError<T>> {
5073        let mut cache = BTreeMap::new();
5074
5075        for id in ids {
5076            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5077
5078            match stats {
5079                Ok(stats) => {
5080                    cache.insert(*id, stats.num_updates);
5081                }
5082                Err(StorageError::IdentifierMissing(id)) => {
5083                    ::tracing::debug!("no statistics for {id}")
5084                }
5085                Err(e) => return Err(e),
5086            }
5087        }
5088
5089        Ok(Self { cache })
5090    }
5091}
5092
5093impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5094    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5095        self.cache.get(&id).map(|estimate| *estimate)
5096    }
5097
5098    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5099        self.cache.clone()
5100    }
5101}
5102
5103impl Coordinator {
5104    pub(super) async fn statistics_oracle(
5105        &self,
5106        session: &Session,
5107        source_ids: &BTreeSet<GlobalId>,
5108        query_as_of: &Antichain<Timestamp>,
5109        is_oneshot: bool,
5110    ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5111        if !session.vars().enable_session_cardinality_estimates() {
5112            return Ok(Box::new(EmptyStatisticsOracle));
5113        }
5114
5115        let timeout = if is_oneshot {
5116            // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
5117            self.catalog()
5118                .system_config()
5119                .optimizer_oneshot_stats_timeout()
5120        } else {
5121            self.catalog().system_config().optimizer_stats_timeout()
5122        };
5123
5124        let cached_stats = mz_ore::future::timeout(
5125            timeout,
5126            CachedStatisticsOracle::new(
5127                source_ids,
5128                query_as_of,
5129                self.controller.storage_collections.as_ref(),
5130            ),
5131        )
5132        .await;
5133
5134        match cached_stats {
5135            Ok(stats) => Ok(Box::new(stats)),
5136            Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5137                warn!(
5138                    is_oneshot = is_oneshot,
5139                    "optimizer statistics collection timed out after {}ms",
5140                    timeout.as_millis()
5141                );
5142
5143                Ok(Box::new(EmptyStatisticsOracle))
5144            }
5145            Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5146        }
5147    }
5148}
5149
5150/// Checks whether we should emit diagnostic
5151/// information associated with reading per-replica sources.
5152///
5153/// If an unrecoverable error is found (today: an untargeted read on a
5154/// cluster with a non-1 number of replicas), return that.  Otherwise,
5155/// return a list of associated notices (today: we always emit exactly
5156/// one notice if there are any per-replica log dependencies and if
5157/// `emit_introspection_query_notice` is set, and none otherwise.)
5158pub(super) fn check_log_reads(
5159    catalog: &Catalog,
5160    cluster: &Cluster,
5161    source_ids: &BTreeSet<GlobalId>,
5162    target_replica: &mut Option<ReplicaId>,
5163    vars: &SessionVars,
5164) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5165where
5166{
5167    let log_names = source_ids
5168        .iter()
5169        .map(|gid| catalog.resolve_item_id(gid))
5170        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5171        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5172        .collect::<Vec<_>>();
5173
5174    if log_names.is_empty() {
5175        return Ok(None);
5176    }
5177
5178    // Reading from log sources on replicated clusters is only allowed if a
5179    // target replica is selected. Otherwise, we have no way of knowing which
5180    // replica we read the introspection data from.
5181    let num_replicas = cluster.replicas().count();
5182    if target_replica.is_none() {
5183        if num_replicas == 1 {
5184            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5185        } else {
5186            return Err(AdapterError::UntargetedLogRead { log_names });
5187        }
5188    }
5189
5190    // Ensure that logging is initialized for the target replica, lest
5191    // we try to read from a non-existing arrangement.
5192    let replica_id = target_replica.expect("set to `Some` above");
5193    let replica = &cluster.replica(replica_id).expect("Replica must exist");
5194    if !replica.config.compute.logging.enabled() {
5195        return Err(AdapterError::IntrospectionDisabled { log_names });
5196    }
5197
5198    Ok(vars
5199        .emit_introspection_query_notice()
5200        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5201}
5202
5203impl Coordinator {
5204    /// Forward notices that we got from the optimizer.
5205    fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5206        // `for_session` below is expensive, so return early if there's nothing to do.
5207        if notices.is_empty() {
5208            return;
5209        }
5210        let humanizer = self.catalog.for_session(session);
5211        let system_vars = self.catalog.system_config();
5212        for notice in notices {
5213            let kind = OptimizerNoticeKind::from(notice);
5214            let notice_enabled = match kind {
5215                OptimizerNoticeKind::IndexAlreadyExists => {
5216                    system_vars.enable_notices_for_index_already_exists()
5217                }
5218                OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5219                    system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5220                }
5221                OptimizerNoticeKind::IndexKeyEmpty => {
5222                    system_vars.enable_notices_for_index_empty_key()
5223                }
5224            };
5225            if notice_enabled {
5226                // We don't need to redact the notice parts because
5227                // `emit_optimizer_notices` is only called by the `sequence_~`
5228                // method for the statement that produces that notice.
5229                session.add_notice(AdapterNotice::OptimizerNotice {
5230                    notice: notice.message(&humanizer, false).to_string(),
5231                    hint: notice.hint(&humanizer, false).to_string(),
5232                });
5233            }
5234            self.metrics
5235                .optimization_notices
5236                .with_label_values(&[kind.metric_label()])
5237                .inc_by(1);
5238        }
5239    }
5240
5241    /// Process the metainfo from a newly created non-transient dataflow.
5242    async fn process_dataflow_metainfo(
5243        &mut self,
5244        df_meta: DataflowMetainfo,
5245        export_id: GlobalId,
5246        session: &Session,
5247        notice_ids: Vec<GlobalId>,
5248    ) -> Option<BuiltinTableAppendNotify> {
5249        // Emit raw notices to the user.
5250        self.emit_optimizer_notices(session, &df_meta.optimizer_notices);
5251
5252        // Create a metainfo with rendered notices.
5253        let df_meta = self
5254            .catalog()
5255            .render_notices(df_meta, notice_ids, Some(export_id));
5256
5257        // Attend to optimization notice builtin tables and save the metainfo in the catalog's
5258        // in-memory state.
5259        if self.catalog().state().system_config().enable_mz_notices()
5260            && !df_meta.optimizer_notices.is_empty()
5261        {
5262            let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5263            self.catalog().state().pack_optimizer_notices(
5264                &mut builtin_table_updates,
5265                df_meta.optimizer_notices.iter(),
5266                Diff::ONE,
5267            );
5268
5269            // Save the metainfo.
5270            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5271
5272            Some(
5273                self.builtin_table_update()
5274                    .execute(builtin_table_updates)
5275                    .await
5276                    .0,
5277            )
5278        } else {
5279            // Save the metainfo.
5280            self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5281
5282            None
5283        }
5284    }
5285}