1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_expr::{
28 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
29};
30use mz_ore::cast::CastFrom;
31use mz_ore::collections::{CollectionExt, HashSet};
32use mz_ore::task::{self, JoinHandle, spawn};
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{assert_none, instrument};
35use mz_repr::adt::jsonb::Jsonb;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::explain::ExprHumanizer;
38use mz_repr::explain::json::json_string;
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
42 RowIterator, Timestamp,
43};
44use mz_sql::ast::{
45 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
46 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
47 SqlServerConfigOptionName,
48};
49use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
50use mz_sql::catalog::{
51 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
52 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
53 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
54};
55use mz_sql::names::{
56 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
57 SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{
60 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
61 StatementContext,
62};
63use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
64use mz_storage_types::sinks::StorageSinkDesc;
65use mz_storage_types::sources::GenericSourceConnection;
66use mz_sql::plan::{
68 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
69 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
70 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
71};
72use mz_sql::session::metadata::SessionMetadata;
73use mz_sql::session::user::UserKind;
74use mz_sql::session::vars::{
75 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
76 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
77};
78use mz_sql::{plan, rbac};
79use mz_sql_parser::ast::display::AstDisplay;
80use mz_sql_parser::ast::{
81 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
82 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
83 WithOptionValue,
84};
85use mz_ssh_util::keys::SshKeyPairSet;
86use mz_storage_client::controller::ExportDescription;
87use mz_storage_types::AlterCompatible;
88use mz_storage_types::connections::inline::IntoInlineConnection;
89use mz_storage_types::controller::StorageError;
90use mz_transform::dataflow::DataflowMetainfo;
91use smallvec::SmallVec;
92use timely::progress::Antichain;
93use tokio::sync::{oneshot, watch};
94use tracing::{Instrument, Span, info, warn};
95
96use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
97use crate::command::{ExecuteResponse, Response};
98use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
99use crate::coord::sequencer::emit_optimizer_notices;
100use crate::coord::{
101 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
102 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
103 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
104 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
105 WatchSetResponse, validate_ip_with_policy_rules,
106};
107use crate::error::AdapterError;
108use crate::notice::{AdapterNotice, DroppedInUseIndex};
109use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
110use crate::optimize::{self, Optimize};
111use crate::session::{
112 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
113 WriteLocks, WriteOp,
114};
115use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
116use crate::{PeekResponseUnary, ReadHolds};
117
118mod cluster;
119mod copy_from;
120mod create_continual_task;
121mod create_index;
122mod create_materialized_view;
123mod create_view;
124mod explain_timestamp;
125mod peek;
126mod secret;
127mod subscribe;
128
129macro_rules! return_if_err {
132 ($expr:expr, $ctx:expr) => {
133 match $expr {
134 Ok(v) => v,
135 Err(e) => return $ctx.retire(Err(e.into())),
136 }
137 };
138}
139
140pub(super) use return_if_err;
141
142struct DropOps {
143 ops: Vec<catalog::Op>,
144 dropped_active_db: bool,
145 dropped_active_cluster: bool,
146 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
147}
148
149struct CreateSourceInner {
151 ops: Vec<catalog::Op>,
152 sources: Vec<(CatalogItemId, Source)>,
153 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
154}
155
156impl Coordinator {
157 pub(crate) async fn sequence_staged<S>(
162 &mut self,
163 mut ctx: S::Ctx,
164 parent_span: Span,
165 mut stage: S,
166 ) where
167 S: Staged + 'static,
168 S::Ctx: Send + 'static,
169 {
170 return_if_err!(stage.validity().check(self.catalog()), ctx);
171 loop {
172 let mut cancel_enabled = stage.cancel_enabled();
173 if let Some(session) = ctx.session() {
174 if cancel_enabled {
175 if let Some((_prev_tx, prev_rx)) = self
178 .staged_cancellation
179 .insert(session.conn_id().clone(), watch::channel(false))
180 {
181 let was_canceled = *prev_rx.borrow();
182 if was_canceled {
183 ctx.retire(Err(AdapterError::Canceled));
184 return;
185 }
186 }
187 } else {
188 self.staged_cancellation.remove(session.conn_id());
191 }
192 } else {
193 cancel_enabled = false
194 };
195 let next = stage
196 .stage(self, &mut ctx)
197 .instrument(parent_span.clone())
198 .await;
199 let res = return_if_err!(next, ctx);
200 stage = match res {
201 StageResult::Handle(handle) => {
202 let internal_cmd_tx = self.internal_cmd_tx.clone();
203 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
204 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
205 });
206 return;
207 }
208 StageResult::HandleRetire(handle) => {
209 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
210 ctx.retire(Ok(resp));
211 });
212 return;
213 }
214 StageResult::Response(resp) => {
215 ctx.retire(Ok(resp));
216 return;
217 }
218 StageResult::Immediate(stage) => *stage,
219 }
220 }
221 }
222
223 fn handle_spawn<C, T, F>(
224 &self,
225 ctx: C,
226 handle: JoinHandle<Result<T, AdapterError>>,
227 cancel_enabled: bool,
228 f: F,
229 ) where
230 C: StagedContext + Send + 'static,
231 T: Send + 'static,
232 F: FnOnce(C, T) + Send + 'static,
233 {
234 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
235 .session()
236 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
237 {
238 let mut rx = rx.clone();
239 Box::pin(async move {
240 let _ = rx.wait_for(|v| *v).await;
242 ()
243 })
244 } else {
245 Box::pin(future::pending())
246 };
247 spawn(|| "sequence_staged", async move {
248 tokio::select! {
249 res = handle => {
250 let next = return_if_err!(res, ctx);
251 f(ctx, next);
252 }
253 _ = rx, if cancel_enabled => {
254 ctx.retire(Err(AdapterError::Canceled));
255 }
256 }
257 });
258 }
259
260 async fn create_source_inner(
261 &self,
262 session: &Session,
263 plans: Vec<plan::CreateSourcePlanBundle>,
264 ) -> Result<CreateSourceInner, AdapterError> {
265 let mut ops = vec![];
266 let mut sources = vec![];
267
268 let if_not_exists_ids = plans
269 .iter()
270 .filter_map(
271 |plan::CreateSourcePlanBundle {
272 item_id,
273 global_id: _,
274 plan,
275 resolved_ids: _,
276 available_source_references: _,
277 }| {
278 if plan.if_not_exists {
279 Some((*item_id, plan.name.clone()))
280 } else {
281 None
282 }
283 },
284 )
285 .collect::<BTreeMap<_, _>>();
286
287 for plan::CreateSourcePlanBundle {
288 item_id,
289 global_id,
290 mut plan,
291 resolved_ids,
292 available_source_references,
293 } in plans
294 {
295 let name = plan.name.clone();
296
297 match plan.source.data_source {
298 plan::DataSourceDesc::Ingestion(ref desc)
299 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
300 let cluster_id = plan
301 .in_cluster
302 .expect("ingestion plans must specify cluster");
303 match desc.connection {
304 GenericSourceConnection::Postgres(_)
305 | GenericSourceConnection::MySql(_)
306 | GenericSourceConnection::SqlServer(_)
307 | GenericSourceConnection::Kafka(_)
308 | GenericSourceConnection::LoadGenerator(_) => {
309 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
310 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
311 .get(self.catalog().system_config().dyncfgs());
312
313 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
314 {
315 return Err(AdapterError::Unsupported(
316 "sources in clusters with >1 replicas",
317 ));
318 }
319 }
320 }
321 }
322 }
323 plan::DataSourceDesc::Webhook { .. } => {
324 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
325 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
326 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
327 .get(self.catalog().system_config().dyncfgs());
328
329 if !enable_multi_replica_sources {
330 if cluster.replica_ids().len() > 1 {
331 return Err(AdapterError::Unsupported(
332 "webhook sources in clusters with >1 replicas",
333 ));
334 }
335 }
336 }
337 }
338 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
339 }
340
341 if let mz_sql::plan::DataSourceDesc::Webhook {
343 validate_using: Some(validate),
344 ..
345 } = &mut plan.source.data_source
346 {
347 if let Err(reason) = validate.reduce_expression().await {
348 self.metrics
349 .webhook_validation_reduce_failures
350 .with_label_values(&[reason])
351 .inc();
352 return Err(AdapterError::Internal(format!(
353 "failed to reduce check expression, {reason}"
354 )));
355 }
356 }
357
358 let mut reference_ops = vec![];
361 if let Some(references) = &available_source_references {
362 reference_ops.push(catalog::Op::UpdateSourceReferences {
363 source_id: item_id,
364 references: references.clone().into(),
365 });
366 }
367
368 let source = Source::new(plan, global_id, resolved_ids, None, false);
369 ops.push(catalog::Op::CreateItem {
370 id: item_id,
371 name,
372 item: CatalogItem::Source(source.clone()),
373 owner_id: *session.current_role_id(),
374 });
375 sources.push((item_id, source));
376 ops.extend(reference_ops);
378 }
379
380 Ok(CreateSourceInner {
381 ops,
382 sources,
383 if_not_exists_ids,
384 })
385 }
386
387 pub(crate) fn plan_subsource(
395 &self,
396 session: &Session,
397 params: &mz_sql::plan::Params,
398 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
399 item_id: CatalogItemId,
400 global_id: GlobalId,
401 ) -> Result<CreateSourcePlanBundle, AdapterError> {
402 let catalog = self.catalog().for_session(session);
403 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
404
405 let plan = self.plan_statement(
406 session,
407 Statement::CreateSubsource(subsource_stmt),
408 params,
409 &resolved_ids,
410 )?;
411 let plan = match plan {
412 Plan::CreateSource(plan) => plan,
413 _ => unreachable!(),
414 };
415 Ok(CreateSourcePlanBundle {
416 item_id,
417 global_id,
418 plan,
419 resolved_ids,
420 available_source_references: None,
421 })
422 }
423
424 pub(crate) async fn plan_purified_alter_source_add_subsource(
426 &mut self,
427 session: &Session,
428 params: Params,
429 source_name: ResolvedItemName,
430 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
431 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
432 ) -> Result<(Plan, ResolvedIds), AdapterError> {
433 let mut subsource_plans = Vec::with_capacity(subsources.len());
434
435 let conn_catalog = self.catalog().for_system_session();
437 let pcx = plan::PlanContext::zero();
438 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
439
440 let entry = self.catalog().get_entry(source_name.item_id());
441 let source = entry.source().ok_or_else(|| {
442 AdapterError::internal(
443 "plan alter source",
444 format!("expected Source found {entry:?}"),
445 )
446 })?;
447
448 let item_id = entry.id();
449 let ingestion_id = source.global_id();
450 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
451
452 let id_ts = self.get_catalog_write_ts().await;
453 let ids = self
454 .catalog()
455 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
456 .await?;
457 for (subsource_stmt, (item_id, global_id)) in
458 subsource_stmts.into_iter().zip_eq(ids.into_iter())
459 {
460 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
461 subsource_plans.push(s);
462 }
463
464 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
465 subsources: subsource_plans,
466 options,
467 };
468
469 Ok((
470 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
471 item_id,
472 ingestion_id,
473 action,
474 }),
475 ResolvedIds::empty(),
476 ))
477 }
478
479 pub(crate) fn plan_purified_alter_source_refresh_references(
481 &self,
482 _session: &Session,
483 _params: Params,
484 source_name: ResolvedItemName,
485 available_source_references: plan::SourceReferences,
486 ) -> Result<(Plan, ResolvedIds), AdapterError> {
487 let entry = self.catalog().get_entry(source_name.item_id());
488 let source = entry.source().ok_or_else(|| {
489 AdapterError::internal(
490 "plan alter source",
491 format!("expected Source found {entry:?}"),
492 )
493 })?;
494 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
495 references: available_source_references,
496 };
497
498 Ok((
499 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
500 item_id: entry.id(),
501 ingestion_id: source.global_id(),
502 action,
503 }),
504 ResolvedIds::empty(),
505 ))
506 }
507
508 pub(crate) async fn plan_purified_create_source(
512 &mut self,
513 ctx: &ExecuteContext,
514 params: Params,
515 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
516 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
517 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
518 available_source_references: plan::SourceReferences,
519 ) -> Result<(Plan, ResolvedIds), AdapterError> {
520 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
521
522 if let Some(progress_stmt) = progress_stmt {
524 assert_none!(progress_stmt.of_source);
529 let id_ts = self.get_catalog_write_ts().await;
530 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
531 let progress_plan =
532 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
533 let progress_full_name = self
534 .catalog()
535 .resolve_full_name(&progress_plan.plan.name, None);
536 let progress_subsource = ResolvedItemName::Item {
537 id: progress_plan.item_id,
538 qualifiers: progress_plan.plan.name.qualifiers.clone(),
539 full_name: progress_full_name,
540 print_id: true,
541 version: RelationVersionSelector::Latest,
542 };
543
544 create_source_plans.push(progress_plan);
545
546 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
547 }
548
549 let catalog = self.catalog().for_session(ctx.session());
550 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
551
552 let propagated_with_options: Vec<_> = source_stmt
553 .with_options
554 .iter()
555 .filter_map(|opt| match opt.name {
556 CreateSourceOptionName::TimestampInterval => None,
557 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
558 name: CreateSubsourceOptionName::RetainHistory,
559 value: opt.value.clone(),
560 }),
561 })
562 .collect();
563
564 let source_plan = match self.plan_statement(
566 ctx.session(),
567 Statement::CreateSource(source_stmt),
568 ¶ms,
569 &resolved_ids,
570 )? {
571 Plan::CreateSource(plan) => plan,
572 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
573 };
574
575 let id_ts = self.get_catalog_write_ts().await;
576 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
577
578 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579 let of_source = ResolvedItemName::Item {
580 id: item_id,
581 qualifiers: source_plan.name.qualifiers.clone(),
582 full_name: source_full_name,
583 print_id: true,
584 version: RelationVersionSelector::Latest,
585 };
586
587 let conn_catalog = self.catalog().for_system_session();
589 let pcx = plan::PlanContext::zero();
590 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594 for subsource_stmt in subsource_stmts.iter_mut() {
595 subsource_stmt
596 .with_options
597 .extend(propagated_with_options.iter().cloned())
598 }
599
600 create_source_plans.push(CreateSourcePlanBundle {
601 item_id,
602 global_id,
603 plan: source_plan,
604 resolved_ids: resolved_ids.clone(),
605 available_source_references: Some(available_source_references),
606 });
607
608 let id_ts = self.get_catalog_write_ts().await;
610 let ids = self
611 .catalog()
612 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
613 .await?;
614 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
615 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
616 create_source_plans.push(plan);
617 }
618
619 Ok((
620 Plan::CreateSources(create_source_plans),
621 ResolvedIds::empty(),
622 ))
623 }
624
625 #[instrument]
626 pub(super) async fn sequence_create_source(
627 &mut self,
628 ctx: &mut ExecuteContext,
629 plans: Vec<plan::CreateSourcePlanBundle>,
630 ) -> Result<ExecuteResponse, AdapterError> {
631 let CreateSourceInner {
632 ops,
633 sources,
634 if_not_exists_ids,
635 } = self.create_source_inner(ctx.session(), plans).await?;
636
637 let transact_result = self
638 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
639 .await;
640
641 for (item_id, source) in &sources {
643 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
644 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
645 ctx.session()
646 .add_notice(AdapterNotice::WebhookSourceCreated { url });
647 }
648 }
649 }
650
651 match transact_result {
652 Ok(()) => Ok(ExecuteResponse::CreatedSource),
653 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
654 kind:
655 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
656 })) if if_not_exists_ids.contains_key(&id) => {
657 ctx.session()
658 .add_notice(AdapterNotice::ObjectAlreadyExists {
659 name: if_not_exists_ids[&id].item.clone(),
660 ty: "source",
661 });
662 Ok(ExecuteResponse::CreatedSource)
663 }
664 Err(err) => Err(err),
665 }
666 }
667
668 #[instrument]
669 pub(super) async fn sequence_create_connection(
670 &mut self,
671 mut ctx: ExecuteContext,
672 plan: plan::CreateConnectionPlan,
673 resolved_ids: ResolvedIds,
674 ) {
675 let id_ts = self.get_catalog_write_ts().await;
676 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
677 Ok(item_id) => item_id,
678 Err(err) => return ctx.retire(Err(err.into())),
679 };
680
681 match &plan.connection.details {
682 ConnectionDetails::Ssh { key_1, key_2, .. } => {
683 let key_1 = match key_1.as_key_pair() {
684 Some(key_1) => key_1.clone(),
685 None => {
686 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
687 "the PUBLIC KEY 1 option cannot be explicitly specified"
688 ))));
689 }
690 };
691
692 let key_2 = match key_2.as_key_pair() {
693 Some(key_2) => key_2.clone(),
694 None => {
695 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
696 "the PUBLIC KEY 2 option cannot be explicitly specified"
697 ))));
698 }
699 };
700
701 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
702 let secret = key_set.to_bytes();
703 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
704 return ctx.retire(Err(err.into()));
705 }
706 }
707 _ => (),
708 };
709
710 if plan.validate {
711 let internal_cmd_tx = self.internal_cmd_tx.clone();
712 let transient_revision = self.catalog().transient_revision();
713 let conn_id = ctx.session().conn_id().clone();
714 let otel_ctx = OpenTelemetryContext::obtain();
715 let role_metadata = ctx.session().role_metadata().clone();
716
717 let connection = plan
718 .connection
719 .details
720 .to_connection()
721 .into_inline_connection(self.catalog().state());
722
723 let current_storage_parameters = self.controller.storage.config().clone();
724 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
725 let result = match connection
726 .validate(connection_id, ¤t_storage_parameters)
727 .await
728 {
729 Ok(()) => Ok(plan),
730 Err(err) => Err(err.into()),
731 };
732
733 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
735 CreateConnectionValidationReady {
736 ctx,
737 result,
738 connection_id,
739 connection_gid,
740 plan_validity: PlanValidity::new(
741 transient_revision,
742 resolved_ids.items().copied().collect(),
743 None,
744 None,
745 role_metadata,
746 ),
747 otel_ctx,
748 resolved_ids: resolved_ids.clone(),
749 },
750 ));
751 if let Err(e) = result {
752 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
753 }
754 });
755 } else {
756 let result = self
757 .sequence_create_connection_stage_finish(
758 &mut ctx,
759 connection_id,
760 connection_gid,
761 plan,
762 resolved_ids,
763 )
764 .await;
765 ctx.retire(result);
766 }
767 }
768
769 #[instrument]
770 pub(crate) async fn sequence_create_connection_stage_finish(
771 &mut self,
772 ctx: &mut ExecuteContext,
773 connection_id: CatalogItemId,
774 connection_gid: GlobalId,
775 plan: plan::CreateConnectionPlan,
776 resolved_ids: ResolvedIds,
777 ) -> Result<ExecuteResponse, AdapterError> {
778 let ops = vec![catalog::Op::CreateItem {
779 id: connection_id,
780 name: plan.name.clone(),
781 item: CatalogItem::Connection(Connection {
782 create_sql: plan.connection.create_sql,
783 global_id: connection_gid,
784 details: plan.connection.details.clone(),
785 resolved_ids,
786 }),
787 owner_id: *ctx.session().current_role_id(),
788 }];
789
790 let conn_id = ctx.session().conn_id().clone();
793 let transact_result = self
794 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
795 .await;
796
797 match transact_result {
798 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
799 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
800 kind:
801 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
802 })) if plan.if_not_exists => {
803 ctx.session()
804 .add_notice(AdapterNotice::ObjectAlreadyExists {
805 name: plan.name.item,
806 ty: "connection",
807 });
808 Ok(ExecuteResponse::CreatedConnection)
809 }
810 Err(err) => Err(err),
811 }
812 }
813
814 #[instrument]
815 pub(super) async fn sequence_create_database(
816 &mut self,
817 session: &Session,
818 plan: plan::CreateDatabasePlan,
819 ) -> Result<ExecuteResponse, AdapterError> {
820 let ops = vec![catalog::Op::CreateDatabase {
821 name: plan.name.clone(),
822 owner_id: *session.current_role_id(),
823 }];
824 match self.catalog_transact(Some(session), ops).await {
825 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
826 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
827 kind:
828 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
829 })) if plan.if_not_exists => {
830 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
831 Ok(ExecuteResponse::CreatedDatabase)
832 }
833 Err(err) => Err(err),
834 }
835 }
836
837 #[instrument]
838 pub(super) async fn sequence_create_schema(
839 &mut self,
840 session: &Session,
841 plan: plan::CreateSchemaPlan,
842 ) -> Result<ExecuteResponse, AdapterError> {
843 let op = catalog::Op::CreateSchema {
844 database_id: plan.database_spec,
845 schema_name: plan.schema_name.clone(),
846 owner_id: *session.current_role_id(),
847 };
848 match self.catalog_transact(Some(session), vec![op]).await {
849 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
850 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851 kind:
852 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853 })) if plan.if_not_exists => {
854 session.add_notice(AdapterNotice::SchemaAlreadyExists {
855 name: plan.schema_name,
856 });
857 Ok(ExecuteResponse::CreatedSchema)
858 }
859 Err(err) => Err(err),
860 }
861 }
862
863 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866 if attributes.superuser.is_some()
867 || attributes.password.is_some()
868 || attributes.login.is_some()
869 {
870 return Err(AdapterError::UnavailableFeature {
871 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873 });
874 }
875 }
876 Ok(())
877 }
878
879 #[instrument]
880 pub(super) async fn sequence_create_role(
881 &mut self,
882 conn_id: Option<&ConnectionId>,
883 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884 ) -> Result<ExecuteResponse, AdapterError> {
885 self.validate_role_attributes(&attributes.clone())?;
886 let op = catalog::Op::CreateRole { name, attributes };
887 self.catalog_transact_with_context(conn_id, None, vec![op])
888 .await
889 .map(|_| ExecuteResponse::CreatedRole)
890 }
891
892 #[instrument]
893 pub(super) async fn sequence_create_network_policy(
894 &mut self,
895 session: &Session,
896 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897 ) -> Result<ExecuteResponse, AdapterError> {
898 let op = catalog::Op::CreateNetworkPolicy {
899 rules,
900 name,
901 owner_id: *session.current_role_id(),
902 };
903 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904 .await
905 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906 }
907
908 #[instrument]
909 pub(super) async fn sequence_alter_network_policy(
910 &mut self,
911 session: &Session,
912 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913 ) -> Result<ExecuteResponse, AdapterError> {
914 let current_network_policy_name =
916 self.catalog().system_config().default_network_policy_name();
917 if current_network_policy_name == name {
919 self.validate_alter_network_policy(session, &rules)?;
920 }
921
922 let op = catalog::Op::AlterNetworkPolicy {
923 id,
924 rules,
925 name,
926 owner_id: *session.current_role_id(),
927 };
928 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929 .await
930 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931 }
932
933 #[instrument]
934 pub(super) async fn sequence_create_table(
935 &mut self,
936 ctx: &mut ExecuteContext,
937 plan: plan::CreateTablePlan,
938 resolved_ids: ResolvedIds,
939 ) -> Result<ExecuteResponse, AdapterError> {
940 let plan::CreateTablePlan {
941 name,
942 table,
943 if_not_exists,
944 } = plan;
945
946 let conn_id = if table.temporary {
947 Some(ctx.session().conn_id())
948 } else {
949 None
950 };
951 let id_ts = self.get_catalog_write_ts().await;
952 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955 let data_source = match table.data_source {
956 plan::TableDataSource::TableWrites { defaults } => {
957 TableDataSource::TableWrites { defaults }
958 }
959 plan::TableDataSource::DataSource {
960 desc: data_source_plan,
961 timeline,
962 } => match data_source_plan {
963 plan::DataSourceDesc::IngestionExport {
964 ingestion_id,
965 external_reference,
966 details,
967 data_config,
968 } => TableDataSource::DataSource {
969 desc: DataSourceDesc::IngestionExport {
970 ingestion_id,
971 external_reference,
972 details,
973 data_config,
974 },
975 timeline,
976 },
977 plan::DataSourceDesc::Webhook {
978 validate_using,
979 body_format,
980 headers,
981 cluster_id,
982 } => TableDataSource::DataSource {
983 desc: DataSourceDesc::Webhook {
984 validate_using,
985 body_format,
986 headers,
987 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988 },
989 timeline,
990 },
991 o => {
992 unreachable!("CREATE TABLE data source got {:?}", o)
993 }
994 },
995 };
996
997 let is_webhook = if let TableDataSource::DataSource {
998 desc: DataSourceDesc::Webhook { .. },
999 timeline: _,
1000 } = &data_source
1001 {
1002 true
1003 } else {
1004 false
1005 };
1006
1007 let table = Table {
1008 create_sql: Some(table.create_sql),
1009 desc: table.desc,
1010 collections,
1011 conn_id: conn_id.cloned(),
1012 resolved_ids,
1013 custom_logical_compaction_window: table.compaction_window,
1014 is_retained_metrics_object: false,
1015 data_source,
1016 };
1017 let ops = vec![catalog::Op::CreateItem {
1018 id: table_id,
1019 name: name.clone(),
1020 item: CatalogItem::Table(table.clone()),
1021 owner_id: *ctx.session().current_role_id(),
1022 }];
1023
1024 let catalog_result = self
1025 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026 .await;
1027
1028 if is_webhook {
1029 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032 ctx.session()
1033 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034 }
1035 }
1036
1037 match catalog_result {
1038 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040 kind:
1041 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1042 })) if if_not_exists => {
1043 ctx.session_mut()
1044 .add_notice(AdapterNotice::ObjectAlreadyExists {
1045 name: name.item,
1046 ty: "table",
1047 });
1048 Ok(ExecuteResponse::CreatedTable)
1049 }
1050 Err(err) => Err(err),
1051 }
1052 }
1053
1054 #[instrument]
1055 pub(super) async fn sequence_create_sink(
1056 &mut self,
1057 ctx: ExecuteContext,
1058 plan: plan::CreateSinkPlan,
1059 resolved_ids: ResolvedIds,
1060 ) {
1061 let plan::CreateSinkPlan {
1062 name,
1063 sink,
1064 with_snapshot,
1065 if_not_exists,
1066 in_cluster,
1067 } = plan;
1068
1069 let id_ts = self.get_catalog_write_ts().await;
1071 let (item_id, global_id) =
1072 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1073
1074 let catalog_sink = Sink {
1075 create_sql: sink.create_sql,
1076 global_id,
1077 from: sink.from,
1078 connection: sink.connection,
1079 envelope: sink.envelope,
1080 version: sink.version,
1081 with_snapshot,
1082 resolved_ids,
1083 cluster_id: in_cluster,
1084 commit_interval: sink.commit_interval,
1085 };
1086
1087 let ops = vec![catalog::Op::CreateItem {
1088 id: item_id,
1089 name: name.clone(),
1090 item: CatalogItem::Sink(catalog_sink.clone()),
1091 owner_id: *ctx.session().current_role_id(),
1092 }];
1093
1094 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1095
1096 match result {
1097 Ok(()) => {}
1098 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1099 kind:
1100 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1101 })) if if_not_exists => {
1102 ctx.session()
1103 .add_notice(AdapterNotice::ObjectAlreadyExists {
1104 name: name.item,
1105 ty: "sink",
1106 });
1107 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1108 return;
1109 }
1110 Err(e) => {
1111 ctx.retire(Err(e));
1112 return;
1113 }
1114 };
1115
1116 self.create_storage_export(global_id, &catalog_sink)
1117 .await
1118 .unwrap_or_terminate("cannot fail to create exports");
1119
1120 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1121 .await;
1122
1123 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1124 }
1125
1126 pub(super) fn validate_system_column_references(
1149 &self,
1150 uses_ambiguous_columns: bool,
1151 depends_on: &BTreeSet<GlobalId>,
1152 ) -> Result<(), AdapterError> {
1153 if uses_ambiguous_columns
1154 && depends_on
1155 .iter()
1156 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1157 {
1158 Err(AdapterError::AmbiguousSystemColumnReference)
1159 } else {
1160 Ok(())
1161 }
1162 }
1163
1164 #[instrument]
1165 pub(super) async fn sequence_create_type(
1166 &mut self,
1167 session: &Session,
1168 plan: plan::CreateTypePlan,
1169 resolved_ids: ResolvedIds,
1170 ) -> Result<ExecuteResponse, AdapterError> {
1171 let id_ts = self.get_catalog_write_ts().await;
1172 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1173 plan.typ
1175 .inner
1176 .desc(&self.catalog().for_session(session))
1177 .map_err(AdapterError::from)?;
1178 let typ = Type {
1179 create_sql: Some(plan.typ.create_sql),
1180 global_id,
1181 details: CatalogTypeDetails {
1182 array_id: None,
1183 typ: plan.typ.inner,
1184 pg_metadata: None,
1185 },
1186 resolved_ids,
1187 };
1188 let op = catalog::Op::CreateItem {
1189 id: item_id,
1190 name: plan.name,
1191 item: CatalogItem::Type(typ),
1192 owner_id: *session.current_role_id(),
1193 };
1194 match self.catalog_transact(Some(session), vec![op]).await {
1195 Ok(()) => Ok(ExecuteResponse::CreatedType),
1196 Err(err) => Err(err),
1197 }
1198 }
1199
1200 #[instrument]
1201 pub(super) async fn sequence_comment_on(
1202 &mut self,
1203 session: &Session,
1204 plan: plan::CommentPlan,
1205 ) -> Result<ExecuteResponse, AdapterError> {
1206 let op = catalog::Op::Comment {
1207 object_id: plan.object_id,
1208 sub_component: plan.sub_component,
1209 comment: plan.comment,
1210 };
1211 self.catalog_transact(Some(session), vec![op]).await?;
1212 Ok(ExecuteResponse::Comment)
1213 }
1214
1215 #[instrument]
1216 pub(super) async fn sequence_drop_objects(
1217 &mut self,
1218 ctx: &mut ExecuteContext,
1219 plan::DropObjectsPlan {
1220 drop_ids,
1221 object_type,
1222 referenced_ids,
1223 }: plan::DropObjectsPlan,
1224 ) -> Result<ExecuteResponse, AdapterError> {
1225 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1226 let mut objects = Vec::new();
1227 for obj_id in &drop_ids {
1228 if !referenced_ids_hashset.contains(obj_id) {
1229 let object_info = ErrorMessageObjectDescription::from_id(
1230 obj_id,
1231 &self.catalog().for_session(ctx.session()),
1232 )
1233 .to_string();
1234 objects.push(object_info);
1235 }
1236 }
1237
1238 if !objects.is_empty() {
1239 ctx.session()
1240 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1241 }
1242
1243 let DropOps {
1244 ops,
1245 dropped_active_db,
1246 dropped_active_cluster,
1247 dropped_in_use_indexes,
1248 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1249
1250 self.catalog_transact_with_context(None, Some(ctx), ops)
1251 .await?;
1252
1253 fail::fail_point!("after_sequencer_drop_replica");
1254
1255 if dropped_active_db {
1256 ctx.session()
1257 .add_notice(AdapterNotice::DroppedActiveDatabase {
1258 name: ctx.session().vars().database().to_string(),
1259 });
1260 }
1261 if dropped_active_cluster {
1262 ctx.session()
1263 .add_notice(AdapterNotice::DroppedActiveCluster {
1264 name: ctx.session().vars().cluster().to_string(),
1265 });
1266 }
1267 for dropped_in_use_index in dropped_in_use_indexes {
1268 ctx.session()
1269 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1270 self.metrics
1271 .optimization_notices
1272 .with_label_values(&["DroppedInUseIndex"])
1273 .inc_by(1);
1274 }
1275 Ok(ExecuteResponse::DroppedObject(object_type))
1276 }
1277
1278 fn validate_dropped_role_ownership(
1279 &self,
1280 session: &Session,
1281 dropped_roles: &BTreeMap<RoleId, &str>,
1282 ) -> Result<(), AdapterError> {
1283 fn privilege_check(
1284 privileges: &PrivilegeMap,
1285 dropped_roles: &BTreeMap<RoleId, &str>,
1286 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1287 object_id: &SystemObjectId,
1288 catalog: &ConnCatalog,
1289 ) {
1290 for privilege in privileges.all_values() {
1291 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1292 let grantor_name = catalog.get_role(&privilege.grantor).name();
1293 let object_description =
1294 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1295 dependent_objects
1296 .entry(role_name.to_string())
1297 .or_default()
1298 .push(format!(
1299 "privileges on {object_description} granted by {grantor_name}",
1300 ));
1301 }
1302 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1303 let grantee_name = catalog.get_role(&privilege.grantee).name();
1304 let object_description =
1305 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1306 dependent_objects
1307 .entry(role_name.to_string())
1308 .or_default()
1309 .push(format!(
1310 "privileges granted on {object_description} to {grantee_name}"
1311 ));
1312 }
1313 }
1314 }
1315
1316 let catalog = self.catalog().for_session(session);
1317 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1318 for entry in self.catalog.entries() {
1319 let id = SystemObjectId::Object(entry.id().into());
1320 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1321 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1322 dependent_objects
1323 .entry(role_name.to_string())
1324 .or_default()
1325 .push(format!("owner of {object_description}"));
1326 }
1327 privilege_check(
1328 entry.privileges(),
1329 dropped_roles,
1330 &mut dependent_objects,
1331 &id,
1332 &catalog,
1333 );
1334 }
1335 for database in self.catalog.databases() {
1336 let database_id = SystemObjectId::Object(database.id().into());
1337 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1338 let object_description =
1339 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1340 dependent_objects
1341 .entry(role_name.to_string())
1342 .or_default()
1343 .push(format!("owner of {object_description}"));
1344 }
1345 privilege_check(
1346 &database.privileges,
1347 dropped_roles,
1348 &mut dependent_objects,
1349 &database_id,
1350 &catalog,
1351 );
1352 for schema in database.schemas_by_id.values() {
1353 let schema_id = SystemObjectId::Object(
1354 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1355 );
1356 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1357 let object_description =
1358 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1359 dependent_objects
1360 .entry(role_name.to_string())
1361 .or_default()
1362 .push(format!("owner of {object_description}"));
1363 }
1364 privilege_check(
1365 &schema.privileges,
1366 dropped_roles,
1367 &mut dependent_objects,
1368 &schema_id,
1369 &catalog,
1370 );
1371 }
1372 }
1373 for cluster in self.catalog.clusters() {
1374 let cluster_id = SystemObjectId::Object(cluster.id().into());
1375 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1376 let object_description =
1377 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1378 dependent_objects
1379 .entry(role_name.to_string())
1380 .or_default()
1381 .push(format!("owner of {object_description}"));
1382 }
1383 privilege_check(
1384 &cluster.privileges,
1385 dropped_roles,
1386 &mut dependent_objects,
1387 &cluster_id,
1388 &catalog,
1389 );
1390 for replica in cluster.replicas() {
1391 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1392 let replica_id =
1393 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1394 let object_description =
1395 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1396 dependent_objects
1397 .entry(role_name.to_string())
1398 .or_default()
1399 .push(format!("owner of {object_description}"));
1400 }
1401 }
1402 }
1403 privilege_check(
1404 self.catalog().system_privileges(),
1405 dropped_roles,
1406 &mut dependent_objects,
1407 &SystemObjectId::System,
1408 &catalog,
1409 );
1410 for (default_privilege_object, default_privilege_acl_items) in
1411 self.catalog.default_privileges()
1412 {
1413 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1414 dependent_objects
1415 .entry(role_name.to_string())
1416 .or_default()
1417 .push(format!(
1418 "default privileges on {}S created by {}",
1419 default_privilege_object.object_type, role_name
1420 ));
1421 }
1422 for default_privilege_acl_item in default_privilege_acl_items {
1423 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1424 dependent_objects
1425 .entry(role_name.to_string())
1426 .or_default()
1427 .push(format!(
1428 "default privileges on {}S granted to {}",
1429 default_privilege_object.object_type, role_name
1430 ));
1431 }
1432 }
1433 }
1434
1435 if !dependent_objects.is_empty() {
1436 Err(AdapterError::DependentObject(dependent_objects))
1437 } else {
1438 Ok(())
1439 }
1440 }
1441
1442 #[instrument]
1443 pub(super) async fn sequence_drop_owned(
1444 &mut self,
1445 session: &Session,
1446 plan: plan::DropOwnedPlan,
1447 ) -> Result<ExecuteResponse, AdapterError> {
1448 for role_id in &plan.role_ids {
1449 self.catalog().ensure_not_reserved_role(role_id)?;
1450 }
1451
1452 let mut privilege_revokes = plan.privilege_revokes;
1453
1454 let session_catalog = self.catalog().for_session(session);
1456 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1457 && !session.is_superuser()
1458 {
1459 let role_membership =
1461 session_catalog.collect_role_membership(session.current_role_id());
1462 let invalid_revokes: BTreeSet<_> = privilege_revokes
1463 .extract_if(.., |(_, privilege)| {
1464 !role_membership.contains(&privilege.grantor)
1465 })
1466 .map(|(object_id, _)| object_id)
1467 .collect();
1468 for invalid_revoke in invalid_revokes {
1469 let object_description =
1470 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1471 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1472 }
1473 }
1474
1475 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1476 catalog::Op::UpdatePrivilege {
1477 target_id: object_id,
1478 privilege,
1479 variant: UpdatePrivilegeVariant::Revoke,
1480 }
1481 });
1482 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1483 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1484 privilege_object,
1485 privilege_acl_item,
1486 variant: UpdatePrivilegeVariant::Revoke,
1487 },
1488 );
1489 let DropOps {
1490 ops: drop_ops,
1491 dropped_active_db,
1492 dropped_active_cluster,
1493 dropped_in_use_indexes,
1494 } = self.sequence_drop_common(session, plan.drop_ids)?;
1495
1496 let ops = privilege_revoke_ops
1497 .chain(default_privilege_revoke_ops)
1498 .chain(drop_ops.into_iter())
1499 .collect();
1500
1501 self.catalog_transact(Some(session), ops).await?;
1502
1503 if dropped_active_db {
1504 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1505 name: session.vars().database().to_string(),
1506 });
1507 }
1508 if dropped_active_cluster {
1509 session.add_notice(AdapterNotice::DroppedActiveCluster {
1510 name: session.vars().cluster().to_string(),
1511 });
1512 }
1513 for dropped_in_use_index in dropped_in_use_indexes {
1514 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1515 }
1516 Ok(ExecuteResponse::DroppedOwned)
1517 }
1518
1519 fn sequence_drop_common(
1520 &self,
1521 session: &Session,
1522 ids: Vec<ObjectId>,
1523 ) -> Result<DropOps, AdapterError> {
1524 let mut dropped_active_db = false;
1525 let mut dropped_active_cluster = false;
1526 let mut dropped_in_use_indexes = Vec::new();
1527 let mut dropped_roles = BTreeMap::new();
1528 let mut dropped_databases = BTreeSet::new();
1529 let mut dropped_schemas = BTreeSet::new();
1530 let mut role_revokes = BTreeSet::new();
1534 let mut default_privilege_revokes = BTreeSet::new();
1537
1538 let mut clusters_to_drop = BTreeSet::new();
1540
1541 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1542 for id in &ids {
1543 match id {
1544 ObjectId::Database(id) => {
1545 let name = self.catalog().get_database(id).name();
1546 if name == session.vars().database() {
1547 dropped_active_db = true;
1548 }
1549 dropped_databases.insert(id);
1550 }
1551 ObjectId::Schema((_, spec)) => {
1552 if let SchemaSpecifier::Id(id) = spec {
1553 dropped_schemas.insert(id);
1554 }
1555 }
1556 ObjectId::Cluster(id) => {
1557 clusters_to_drop.insert(*id);
1558 if let Some(active_id) = self
1559 .catalog()
1560 .active_cluster(session)
1561 .ok()
1562 .map(|cluster| cluster.id())
1563 {
1564 if id == &active_id {
1565 dropped_active_cluster = true;
1566 }
1567 }
1568 }
1569 ObjectId::Role(id) => {
1570 let role = self.catalog().get_role(id);
1571 let name = role.name();
1572 dropped_roles.insert(*id, name);
1573 for (group_id, grantor_id) in &role.membership.map {
1575 role_revokes.insert((*group_id, *id, *grantor_id));
1576 }
1577 }
1578 ObjectId::Item(id) => {
1579 if let Some(index) = self.catalog().get_entry(id).index() {
1580 let humanizer = self.catalog().for_session(session);
1581 let dependants = self
1582 .controller
1583 .compute
1584 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1585 .ok()
1586 .into_iter()
1587 .flatten()
1588 .filter(|dependant_id| {
1589 if dependant_id.is_transient() {
1596 return false;
1597 }
1598 let Some(dependent_id) = humanizer
1600 .try_get_item_by_global_id(dependant_id)
1601 .map(|item| item.id())
1602 else {
1603 return false;
1604 };
1605 !ids_set.contains(&ObjectId::Item(dependent_id))
1608 })
1609 .flat_map(|dependant_id| {
1610 humanizer.humanize_id(dependant_id)
1614 })
1615 .collect_vec();
1616 if !dependants.is_empty() {
1617 dropped_in_use_indexes.push(DroppedInUseIndex {
1618 index_name: humanizer
1619 .humanize_id(index.global_id())
1620 .unwrap_or_else(|| id.to_string()),
1621 dependant_objects: dependants,
1622 });
1623 }
1624 }
1625 }
1626 _ => {}
1627 }
1628 }
1629
1630 for id in &ids {
1631 match id {
1632 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1636 if !clusters_to_drop.contains(cluster_id) {
1637 let cluster = self.catalog.get_cluster(*cluster_id);
1638 if cluster.is_managed() {
1639 let replica =
1640 cluster.replica(*replica_id).expect("Catalog out of sync");
1641 if !replica.config.location.internal() {
1642 coord_bail!("cannot drop replica of managed cluster");
1643 }
1644 }
1645 }
1646 }
1647 _ => {}
1648 }
1649 }
1650
1651 for role_id in dropped_roles.keys() {
1652 self.catalog().ensure_not_reserved_role(role_id)?;
1653 }
1654 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1655 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1657 for role in self.catalog().user_roles() {
1658 for dropped_role_id in
1659 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1660 {
1661 role_revokes.insert((
1662 **dropped_role_id,
1663 role.id(),
1664 *role
1665 .membership
1666 .map
1667 .get(*dropped_role_id)
1668 .expect("included in keys above"),
1669 ));
1670 }
1671 }
1672
1673 for (default_privilege_object, default_privilege_acls) in
1674 self.catalog().default_privileges()
1675 {
1676 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1677 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1678 {
1679 for default_privilege_acl in default_privilege_acls {
1680 default_privilege_revokes.insert((
1681 default_privilege_object.clone(),
1682 default_privilege_acl.clone(),
1683 ));
1684 }
1685 }
1686 }
1687
1688 let ops = role_revokes
1689 .into_iter()
1690 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1691 role_id,
1692 member_id,
1693 grantor_id,
1694 })
1695 .chain(default_privilege_revokes.into_iter().map(
1696 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1697 privilege_object,
1698 privilege_acl_item,
1699 variant: UpdatePrivilegeVariant::Revoke,
1700 },
1701 ))
1702 .chain(iter::once(catalog::Op::DropObjects(
1703 ids.into_iter()
1704 .map(DropObjectInfo::manual_drop_from_object_id)
1705 .collect(),
1706 )))
1707 .collect();
1708
1709 Ok(DropOps {
1710 ops,
1711 dropped_active_db,
1712 dropped_active_cluster,
1713 dropped_in_use_indexes,
1714 })
1715 }
1716
1717 pub(super) fn sequence_explain_schema(
1718 &self,
1719 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1720 ) -> Result<ExecuteResponse, AdapterError> {
1721 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1722 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1723 })?;
1724
1725 let json_string = json_string(&json_value);
1726 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1727 Ok(Self::send_immediate_rows(row))
1728 }
1729
1730 pub(super) fn sequence_show_all_variables(
1731 &self,
1732 session: &Session,
1733 ) -> Result<ExecuteResponse, AdapterError> {
1734 let mut rows = viewable_variables(self.catalog().state(), session)
1735 .map(|v| (v.name(), v.value(), v.description()))
1736 .collect::<Vec<_>>();
1737 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1738
1739 let rows: Vec<_> = rows
1741 .into_iter()
1742 .map(|(name, val, desc)| {
1743 Row::pack_slice(&[
1744 Datum::String(name),
1745 Datum::String(&val),
1746 Datum::String(desc),
1747 ])
1748 })
1749 .collect();
1750 Ok(Self::send_immediate_rows(rows))
1751 }
1752
1753 pub(super) fn sequence_show_variable(
1754 &self,
1755 session: &Session,
1756 plan: plan::ShowVariablePlan,
1757 ) -> Result<ExecuteResponse, AdapterError> {
1758 if &plan.name == SCHEMA_ALIAS {
1759 let schemas = self.catalog.resolve_search_path(session);
1760 let schema = schemas.first();
1761 return match schema {
1762 Some((database_spec, schema_spec)) => {
1763 let schema_name = &self
1764 .catalog
1765 .get_schema(database_spec, schema_spec, session.conn_id())
1766 .name()
1767 .schema;
1768 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1769 Ok(Self::send_immediate_rows(row))
1770 }
1771 None => {
1772 if session.vars().current_object_missing_warnings() {
1773 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1774 search_path: session
1775 .vars()
1776 .search_path()
1777 .into_iter()
1778 .map(|schema| schema.to_string())
1779 .collect(),
1780 });
1781 }
1782 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1783 }
1784 };
1785 }
1786
1787 let variable = session
1788 .vars()
1789 .get(self.catalog().system_config(), &plan.name)
1790 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1791
1792 variable.visible(session.user(), self.catalog().system_config())?;
1795
1796 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1797 if variable.name() == vars::DATABASE.name()
1798 && matches!(
1799 self.catalog().resolve_database(&variable.value()),
1800 Err(CatalogError::UnknownDatabase(_))
1801 )
1802 && session.vars().current_object_missing_warnings()
1803 {
1804 let name = variable.value();
1805 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1806 } else if variable.name() == vars::CLUSTER.name()
1807 && matches!(
1808 self.catalog().resolve_cluster(&variable.value()),
1809 Err(CatalogError::UnknownCluster(_))
1810 )
1811 && session.vars().current_object_missing_warnings()
1812 {
1813 let name = variable.value();
1814 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1815 }
1816 Ok(Self::send_immediate_rows(row))
1817 }
1818
1819 #[instrument]
1820 pub(super) async fn sequence_inspect_shard(
1821 &self,
1822 session: &Session,
1823 plan: plan::InspectShardPlan,
1824 ) -> Result<ExecuteResponse, AdapterError> {
1825 if !session.user().is_internal() {
1828 return Err(AdapterError::Unauthorized(
1829 rbac::UnauthorizedError::MzSystem {
1830 action: "inspect".into(),
1831 },
1832 ));
1833 }
1834 let state = self
1835 .controller
1836 .storage
1837 .inspect_persist_state(plan.id)
1838 .await?;
1839 let jsonb = Jsonb::from_serde_json(state)?;
1840 Ok(Self::send_immediate_rows(jsonb.into_row()))
1841 }
1842
1843 #[instrument]
1844 pub(super) fn sequence_set_variable(
1845 &self,
1846 session: &mut Session,
1847 plan: plan::SetVariablePlan,
1848 ) -> Result<ExecuteResponse, AdapterError> {
1849 let (name, local) = (plan.name, plan.local);
1850 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1851 self.validate_set_isolation_level(session)?;
1852 }
1853 if &name == vars::CLUSTER.name() {
1854 self.validate_set_cluster(session)?;
1855 }
1856
1857 let vars = session.vars_mut();
1858 let values = match plan.value {
1859 plan::VariableValue::Default => None,
1860 plan::VariableValue::Values(values) => Some(values),
1861 };
1862
1863 match values {
1864 Some(values) => {
1865 vars.set(
1866 self.catalog().system_config(),
1867 &name,
1868 VarInput::SqlSet(&values),
1869 local,
1870 )?;
1871
1872 let vars = session.vars();
1873
1874 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1877 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1878 } else if name == vars::CLUSTER.name()
1879 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1880 {
1881 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1882 }
1883
1884 if name.as_str() == vars::DATABASE.name()
1886 && matches!(
1887 self.catalog().resolve_database(vars.database()),
1888 Err(CatalogError::UnknownDatabase(_))
1889 )
1890 && session.vars().current_object_missing_warnings()
1891 {
1892 let name = vars.database().to_string();
1893 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1894 } else if name.as_str() == vars::CLUSTER.name()
1895 && matches!(
1896 self.catalog().resolve_cluster(vars.cluster()),
1897 Err(CatalogError::UnknownCluster(_))
1898 )
1899 && session.vars().current_object_missing_warnings()
1900 {
1901 let name = vars.cluster().to_string();
1902 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1903 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1904 let v = values.into_first().to_lowercase();
1905 if v == IsolationLevel::ReadUncommitted.as_str()
1906 || v == IsolationLevel::ReadCommitted.as_str()
1907 || v == IsolationLevel::RepeatableRead.as_str()
1908 {
1909 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1910 isolation_level: v,
1911 });
1912 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1913 session.add_notice(AdapterNotice::StrongSessionSerializable);
1914 }
1915 }
1916 }
1917 None => vars.reset(self.catalog().system_config(), &name, local)?,
1918 }
1919
1920 Ok(ExecuteResponse::SetVariable { name, reset: false })
1921 }
1922
1923 pub(super) fn sequence_reset_variable(
1924 &self,
1925 session: &mut Session,
1926 plan: plan::ResetVariablePlan,
1927 ) -> Result<ExecuteResponse, AdapterError> {
1928 let name = plan.name;
1929 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1930 self.validate_set_isolation_level(session)?;
1931 }
1932 if &name == vars::CLUSTER.name() {
1933 self.validate_set_cluster(session)?;
1934 }
1935 session
1936 .vars_mut()
1937 .reset(self.catalog().system_config(), &name, false)?;
1938 Ok(ExecuteResponse::SetVariable { name, reset: true })
1939 }
1940
1941 pub(super) fn sequence_set_transaction(
1942 &self,
1943 session: &mut Session,
1944 plan: plan::SetTransactionPlan,
1945 ) -> Result<ExecuteResponse, AdapterError> {
1946 for mode in plan.modes {
1948 match mode {
1949 TransactionMode::AccessMode(_) => {
1950 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1951 }
1952 TransactionMode::IsolationLevel(isolation_level) => {
1953 self.validate_set_isolation_level(session)?;
1954
1955 session.vars_mut().set(
1956 self.catalog().system_config(),
1957 TRANSACTION_ISOLATION_VAR_NAME,
1958 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1959 plan.local,
1960 )?
1961 }
1962 }
1963 }
1964 Ok(ExecuteResponse::SetVariable {
1965 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1966 reset: false,
1967 })
1968 }
1969
1970 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1971 if session.transaction().contains_ops() {
1972 Err(AdapterError::InvalidSetIsolationLevel)
1973 } else {
1974 Ok(())
1975 }
1976 }
1977
1978 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1979 if session.transaction().contains_ops() {
1980 Err(AdapterError::InvalidSetCluster)
1981 } else {
1982 Ok(())
1983 }
1984 }
1985
1986 #[instrument]
1987 pub(super) async fn sequence_end_transaction(
1988 &mut self,
1989 mut ctx: ExecuteContext,
1990 mut action: EndTransactionAction,
1991 ) {
1992 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1994 (&action, ctx.session().transaction())
1995 {
1996 action = EndTransactionAction::Rollback;
1997 }
1998 let response = match action {
1999 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2000 params: BTreeMap::new(),
2001 }),
2002 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2003 params: BTreeMap::new(),
2004 }),
2005 };
2006
2007 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2008
2009 let (response, action) = match result {
2010 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2011 (response, action)
2012 }
2013 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2014 let validated_locks = match write_lock_guards {
2018 None => None,
2019 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2020 Ok(locks) => Some(locks),
2021 Err(missing) => {
2022 tracing::error!(?missing, "programming error, missing write locks");
2023 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2024 }
2025 },
2026 };
2027
2028 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2029 for WriteOp { id, rows } in writes {
2030 let total_rows = collected_writes.entry(id).or_default();
2031 total_rows.push(rows);
2032 }
2033
2034 self.submit_write(PendingWriteTxn::User {
2035 span: Span::current(),
2036 writes: collected_writes,
2037 write_locks: validated_locks,
2038 pending_txn: PendingTxn {
2039 ctx,
2040 response,
2041 action,
2042 },
2043 });
2044 return;
2045 }
2046 Ok((
2047 Some(TransactionOps::Peeks {
2048 determination,
2049 requires_linearization: RequireLinearization::Required,
2050 ..
2051 }),
2052 _,
2053 )) if ctx.session().vars().transaction_isolation()
2054 == &IsolationLevel::StrictSerializable =>
2055 {
2056 let conn_id = ctx.session().conn_id().clone();
2057 let pending_read_txn = PendingReadTxn {
2058 txn: PendingRead::Read {
2059 txn: PendingTxn {
2060 ctx,
2061 response,
2062 action,
2063 },
2064 },
2065 timestamp_context: determination.timestamp_context,
2066 created: Instant::now(),
2067 num_requeues: 0,
2068 otel_ctx: OpenTelemetryContext::obtain(),
2069 };
2070 self.strict_serializable_reads_tx
2071 .send((conn_id, pending_read_txn))
2072 .expect("sending to strict_serializable_reads_tx cannot fail");
2073 return;
2074 }
2075 Ok((
2076 Some(TransactionOps::Peeks {
2077 determination,
2078 requires_linearization: RequireLinearization::Required,
2079 ..
2080 }),
2081 _,
2082 )) if ctx.session().vars().transaction_isolation()
2083 == &IsolationLevel::StrongSessionSerializable =>
2084 {
2085 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2086 ctx.session_mut()
2087 .ensure_timestamp_oracle(timeline.clone())
2088 .apply_write(*ts);
2089 }
2090 (response, action)
2091 }
2092 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2093 self.internal_cmd_tx
2094 .send(Message::ExecuteSingleStatementTransaction {
2095 ctx,
2096 otel_ctx: OpenTelemetryContext::obtain(),
2097 stmt,
2098 params,
2099 })
2100 .expect("must send");
2101 return;
2102 }
2103 Ok((_, _)) => (response, action),
2104 Err(err) => (Err(err), EndTransactionAction::Rollback),
2105 };
2106 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2107 let response = response.map(|mut r| {
2109 r.extend_params(changed);
2110 ExecuteResponse::from(r)
2111 });
2112
2113 ctx.retire(response);
2114 }
2115
2116 #[instrument]
2117 async fn sequence_end_transaction_inner(
2118 &mut self,
2119 ctx: &mut ExecuteContext,
2120 action: EndTransactionAction,
2121 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2122 let txn = self.clear_transaction(ctx.session_mut()).await;
2123
2124 if let EndTransactionAction::Commit = action {
2125 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2126 match &mut ops {
2127 TransactionOps::Writes(writes) => {
2128 for WriteOp { id, .. } in &mut writes.iter() {
2129 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2131 AdapterError::Catalog(mz_catalog::memory::error::Error {
2132 kind: mz_catalog::memory::error::ErrorKind::Sql(
2133 CatalogError::UnknownItem(id.to_string()),
2134 ),
2135 })
2136 })?;
2137 }
2138
2139 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2141 }
2142 TransactionOps::DDL {
2143 ops,
2144 state: _,
2145 side_effects,
2146 revision,
2147 } => {
2148 if *revision != self.catalog().transient_revision() {
2150 return Err(AdapterError::DDLTransactionRace);
2151 }
2152 let ops = std::mem::take(ops);
2154 let side_effects = std::mem::take(side_effects);
2155 self.catalog_transact_with_side_effects(
2156 Some(ctx),
2157 ops,
2158 move |a, mut ctx| {
2159 Box::pin(async move {
2160 for side_effect in side_effects {
2161 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2162 }
2163 })
2164 },
2165 )
2166 .await?;
2167 }
2168 _ => (),
2169 }
2170 return Ok((Some(ops), write_lock_guards));
2171 }
2172 }
2173
2174 Ok((None, None))
2175 }
2176
2177 pub(super) async fn sequence_side_effecting_func(
2178 &mut self,
2179 ctx: ExecuteContext,
2180 plan: SideEffectingFunc,
2181 ) {
2182 match plan {
2183 SideEffectingFunc::PgCancelBackend { connection_id } => {
2184 if ctx.session().conn_id().unhandled() == connection_id {
2185 ctx.retire(Err(AdapterError::Canceled));
2189 return;
2190 }
2191
2192 let res = if let Some((id_handle, _conn_meta)) =
2193 self.active_conns.get_key_value(&connection_id)
2194 {
2195 self.handle_privileged_cancel(id_handle.clone()).await;
2197 Datum::True
2198 } else {
2199 Datum::False
2200 };
2201 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2202 }
2203 }
2204 }
2205
2206 pub(crate) async fn execute_side_effecting_func(
2215 &mut self,
2216 plan: SideEffectingFunc,
2217 conn_id: ConnectionId,
2218 current_role: RoleId,
2219 ) -> Result<ExecuteResponse, AdapterError> {
2220 match plan {
2221 SideEffectingFunc::PgCancelBackend { connection_id } => {
2222 if conn_id.unhandled() == connection_id {
2223 return Err(AdapterError::Canceled);
2227 }
2228
2229 if let Some((_id_handle, conn_meta)) =
2232 self.active_conns.get_key_value(&connection_id)
2233 {
2234 let target_role = *conn_meta.authenticated_role_id();
2235 let role_membership = self
2236 .catalog()
2237 .state()
2238 .collect_role_membership(¤t_role);
2239 if !role_membership.contains(&target_role) {
2240 let target_role_name = self
2241 .catalog()
2242 .try_get_role(&target_role)
2243 .map(|role| role.name().to_string())
2244 .unwrap_or_else(|| target_role.to_string());
2245 return Err(AdapterError::Unauthorized(
2246 rbac::UnauthorizedError::RoleMembership {
2247 role_names: vec![target_role_name],
2248 },
2249 ));
2250 }
2251
2252 let id_handle = self
2254 .active_conns
2255 .get_key_value(&connection_id)
2256 .map(|(id, _)| id.clone())
2257 .expect("checked above");
2258 self.handle_privileged_cancel(id_handle).await;
2259 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2260 } else {
2261 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2263 }
2264 }
2265 }
2266 }
2267
2268 pub(crate) async fn determine_real_time_recent_timestamp(
2272 &self,
2273 source_ids: impl Iterator<Item = GlobalId>,
2274 real_time_recency_timeout: Duration,
2275 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2276 {
2277 let item_ids = source_ids
2278 .map(|gid| {
2279 self.catalog
2280 .try_resolve_item_id(&gid)
2281 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2282 })
2283 .collect::<Result<Vec<_>, _>>()?;
2284
2285 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2291 if to_visit.is_empty() {
2294 return Ok(None);
2295 }
2296
2297 let mut timestamp_objects = BTreeSet::new();
2298
2299 while let Some(id) = to_visit.pop_front() {
2300 timestamp_objects.insert(id);
2301 to_visit.extend(
2302 self.catalog()
2303 .get_entry(&id)
2304 .uses()
2305 .into_iter()
2306 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2307 );
2308 }
2309 let timestamp_objects = timestamp_objects
2310 .into_iter()
2311 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2312 .collect();
2313
2314 let r = self
2315 .controller
2316 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2317 .await?;
2318
2319 Ok(Some(r))
2320 }
2321
2322 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2325 &self,
2326 session: &Session,
2327 source_ids: impl Iterator<Item = GlobalId>,
2328 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2329 {
2330 let vars = session.vars();
2331
2332 if vars.real_time_recency()
2333 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2334 && !session.contains_read_timestamp()
2335 {
2336 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2337 .await
2338 } else {
2339 Ok(None)
2340 }
2341 }
2342
2343 #[instrument]
2344 pub(super) async fn sequence_explain_plan(
2345 &mut self,
2346 ctx: ExecuteContext,
2347 plan: plan::ExplainPlanPlan,
2348 target_cluster: TargetCluster,
2349 ) {
2350 match &plan.explainee {
2351 plan::Explainee::Statement(stmt) => match stmt {
2352 plan::ExplaineeStatement::CreateView { .. } => {
2353 self.explain_create_view(ctx, plan).await;
2354 }
2355 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2356 self.explain_create_materialized_view(ctx, plan).await;
2357 }
2358 plan::ExplaineeStatement::CreateIndex { .. } => {
2359 self.explain_create_index(ctx, plan).await;
2360 }
2361 plan::ExplaineeStatement::Select { .. } => {
2362 self.explain_peek(ctx, plan, target_cluster).await;
2363 }
2364 },
2365 plan::Explainee::View(_) => {
2366 let result = self.explain_view(&ctx, plan);
2367 ctx.retire(result);
2368 }
2369 plan::Explainee::MaterializedView(_) => {
2370 let result = self.explain_materialized_view(&ctx, plan);
2371 ctx.retire(result);
2372 }
2373 plan::Explainee::Index(_) => {
2374 let result = self.explain_index(&ctx, plan);
2375 ctx.retire(result);
2376 }
2377 plan::Explainee::ReplanView(_) => {
2378 self.explain_replan_view(ctx, plan).await;
2379 }
2380 plan::Explainee::ReplanMaterializedView(_) => {
2381 self.explain_replan_materialized_view(ctx, plan).await;
2382 }
2383 plan::Explainee::ReplanIndex(_) => {
2384 self.explain_replan_index(ctx, plan).await;
2385 }
2386 };
2387 }
2388
2389 pub(super) async fn sequence_explain_pushdown(
2390 &mut self,
2391 ctx: ExecuteContext,
2392 plan: plan::ExplainPushdownPlan,
2393 target_cluster: TargetCluster,
2394 ) {
2395 match plan.explainee {
2396 Explainee::Statement(ExplaineeStatement::Select {
2397 broken: false,
2398 plan,
2399 desc: _,
2400 }) => {
2401 let stage = return_if_err!(
2402 self.peek_validate(
2403 ctx.session(),
2404 plan,
2405 target_cluster,
2406 None,
2407 ExplainContext::Pushdown,
2408 Some(ctx.session().vars().max_query_result_size()),
2409 ),
2410 ctx
2411 );
2412 self.sequence_staged(ctx, Span::current(), stage).await;
2413 }
2414 Explainee::MaterializedView(item_id) => {
2415 self.explain_pushdown_materialized_view(ctx, item_id).await;
2416 }
2417 _ => {
2418 ctx.retire(Err(AdapterError::Unsupported(
2419 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2420 )));
2421 }
2422 };
2423 }
2424
2425 async fn execute_explain_pushdown_with_read_holds(
2427 &self,
2428 ctx: ExecuteContext,
2429 as_of: Antichain<Timestamp>,
2430 mz_now: ResultSpec<'static>,
2431 read_holds: Option<ReadHolds<Timestamp>>,
2432 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2433 ) {
2434 let fut = self
2435 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2436 .await;
2437 task::spawn(|| "render explain pushdown", async move {
2438 let _read_holds = read_holds;
2440 let res = fut.await;
2441 ctx.retire(res);
2442 });
2443 }
2444
2445 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2447 &self,
2448 session: &Session,
2449 as_of: Antichain<Timestamp>,
2450 mz_now: ResultSpec<'static>,
2451 imports: I,
2452 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2453 super::explain_pushdown_future_inner(
2455 session,
2456 &self.catalog,
2457 &self.controller.storage_collections,
2458 as_of,
2459 mz_now,
2460 imports,
2461 )
2462 .await
2463 }
2464
2465 #[instrument]
2466 pub(super) async fn sequence_insert(
2467 &mut self,
2468 mut ctx: ExecuteContext,
2469 plan: plan::InsertPlan,
2470 ) {
2471 if !ctx.session_mut().transaction().allows_writes() {
2479 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2480 return;
2481 }
2482
2483 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2497 let expr = return_if_err!(
2500 plan.values
2501 .clone()
2502 .lower(self.catalog().system_config(), None),
2503 ctx
2504 );
2505 OptimizedMirRelationExpr(expr)
2506 } else {
2507 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2509
2510 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2512
2513 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2515 };
2516
2517 match optimized_mir.into_inner() {
2518 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2519 let catalog = self.owned_catalog();
2520 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2521 let result =
2522 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2523 ctx.retire(result);
2524 });
2525 }
2526 _ => {
2528 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2529 Some(table) => {
2530 let desc = table.relation_desc_latest().expect("table has a desc");
2532 desc.arity()
2533 }
2534 None => {
2535 ctx.retire(Err(AdapterError::Catalog(
2536 mz_catalog::memory::error::Error {
2537 kind: mz_catalog::memory::error::ErrorKind::Sql(
2538 CatalogError::UnknownItem(plan.id.to_string()),
2539 ),
2540 },
2541 )));
2542 return;
2543 }
2544 };
2545
2546 let finishing = RowSetFinishing {
2547 order_by: vec![],
2548 limit: None,
2549 offset: 0,
2550 project: (0..desc_arity).collect(),
2551 };
2552
2553 let read_then_write_plan = plan::ReadThenWritePlan {
2554 id: plan.id,
2555 selection: plan.values,
2556 finishing,
2557 assignments: BTreeMap::new(),
2558 kind: MutationKind::Insert,
2559 returning: plan.returning,
2560 };
2561
2562 self.sequence_read_then_write(ctx, read_then_write_plan)
2563 .await;
2564 }
2565 }
2566 }
2567
2568 #[instrument]
2573 pub(super) async fn sequence_read_then_write(
2574 &mut self,
2575 mut ctx: ExecuteContext,
2576 plan: plan::ReadThenWritePlan,
2577 ) {
2578 let mut source_ids: BTreeSet<_> = plan
2579 .selection
2580 .depends_on()
2581 .into_iter()
2582 .map(|gid| self.catalog().resolve_item_id(&gid))
2583 .collect();
2584 source_ids.insert(plan.id);
2585
2586 if ctx.session().transaction().write_locks().is_none() {
2588 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2590
2591 for id in &source_ids {
2593 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2594 write_locks.insert_lock(*id, lock);
2595 }
2596 }
2597
2598 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2600 Ok(locks) => locks,
2601 Err(missing) => {
2602 let role_metadata = ctx.session().role_metadata().clone();
2604 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2605 let plan = DeferredPlan {
2606 ctx,
2607 plan: Plan::ReadThenWrite(plan),
2608 validity: PlanValidity::new(
2609 self.catalog.transient_revision(),
2610 source_ids.clone(),
2611 None,
2612 None,
2613 role_metadata,
2614 ),
2615 requires_locks: source_ids,
2616 };
2617 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2618 }
2619 };
2620
2621 ctx.session_mut()
2622 .try_grant_write_locks(write_locks)
2623 .expect("session has already been granted write locks");
2624 }
2625
2626 let plan::ReadThenWritePlan {
2627 id,
2628 kind,
2629 selection,
2630 mut assignments,
2631 finishing,
2632 mut returning,
2633 } = plan;
2634
2635 let desc = match self.catalog().try_get_entry(&id) {
2637 Some(table) => {
2638 table
2640 .relation_desc_latest()
2641 .expect("table has a desc")
2642 .into_owned()
2643 }
2644 None => {
2645 ctx.retire(Err(AdapterError::Catalog(
2646 mz_catalog::memory::error::Error {
2647 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2648 id.to_string(),
2649 )),
2650 },
2651 )));
2652 return;
2653 }
2654 };
2655
2656 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2658 || assignments.values().any(|e| e.contains_temporal())
2659 || returning.iter().any(|e| e.contains_temporal());
2660 if contains_temporal {
2661 ctx.retire(Err(AdapterError::Unsupported(
2662 "calls to mz_now in write statements",
2663 )));
2664 return;
2665 }
2666
2667 fn validate_read_dependencies(
2675 catalog: &Catalog,
2676 id: &CatalogItemId,
2677 ) -> Result<(), AdapterError> {
2678 use CatalogItemType::*;
2679 use mz_catalog::memory::objects;
2680 let mut ids_to_check = Vec::new();
2681 let valid = match catalog.try_get_entry(id) {
2682 Some(entry) => {
2683 if let CatalogItem::View(objects::View { optimized_expr, .. })
2684 | CatalogItem::MaterializedView(objects::MaterializedView {
2685 optimized_expr,
2686 ..
2687 }) = entry.item()
2688 {
2689 if optimized_expr.contains_temporal() {
2690 return Err(AdapterError::Unsupported(
2691 "calls to mz_now in write statements",
2692 ));
2693 }
2694 }
2695 match entry.item().typ() {
2696 typ @ (Func | View | MaterializedView | ContinualTask) => {
2697 ids_to_check.extend(entry.uses());
2698 let valid_id = id.is_user() || matches!(typ, Func);
2699 valid_id
2700 }
2701 Source | Secret | Connection => false,
2702 Sink | Index => unreachable!(),
2704 Table => {
2705 if !id.is_user() {
2706 false
2708 } else {
2709 entry.source_export_details().is_none()
2711 }
2712 }
2713 Type => true,
2714 }
2715 }
2716 None => false,
2717 };
2718 if !valid {
2719 let (object_name, object_type) = match catalog.try_get_entry(id) {
2720 Some(entry) => {
2721 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2722 let object_type = match entry.item().typ() {
2723 Source => "source",
2725 Secret => "secret",
2726 Connection => "connection",
2727 Table => {
2728 if !id.is_user() {
2729 "system table"
2730 } else {
2731 "source-export table"
2732 }
2733 }
2734 View => "system view",
2735 MaterializedView => "system materialized view",
2736 ContinualTask => "system task",
2737 _ => "invalid dependency",
2738 };
2739 (object_name, object_type.to_string())
2740 }
2741 None => (id.to_string(), "unknown".to_string()),
2742 };
2743 return Err(AdapterError::InvalidTableMutationSelection {
2744 object_name,
2745 object_type,
2746 });
2747 }
2748 for id in ids_to_check {
2749 validate_read_dependencies(catalog, &id)?;
2750 }
2751 Ok(())
2752 }
2753
2754 for gid in selection.depends_on() {
2755 let item_id = self.catalog().resolve_item_id(&gid);
2756 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2757 ctx.retire(Err(err));
2758 return;
2759 }
2760 }
2761
2762 let (peek_tx, peek_rx) = oneshot::channel();
2763 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2764 let (tx, _, session, extra) = ctx.into_parts();
2765 let peek_ctx = ExecuteContext::from_parts(
2777 peek_client_tx,
2778 self.internal_cmd_tx.clone(),
2779 session,
2780 Default::default(),
2781 );
2782
2783 self.sequence_peek(
2784 peek_ctx,
2785 plan::SelectPlan {
2786 select: None,
2787 source: selection,
2788 when: QueryWhen::FreshestTableWrite,
2789 finishing,
2790 copy_to: None,
2791 },
2792 TargetCluster::Active,
2793 None,
2794 )
2795 .await;
2796
2797 let internal_cmd_tx = self.internal_cmd_tx.clone();
2798 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2799 let catalog = self.owned_catalog();
2800 let max_result_size = self.catalog().system_config().max_result_size();
2801
2802 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2803 let (peek_response, session) = match peek_rx.await {
2804 Ok(Response {
2805 result: Ok(resp),
2806 session,
2807 otel_ctx,
2808 }) => {
2809 otel_ctx.attach_as_parent();
2810 (resp, session)
2811 }
2812 Ok(Response {
2813 result: Err(e),
2814 session,
2815 otel_ctx,
2816 }) => {
2817 let ctx =
2818 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2819 otel_ctx.attach_as_parent();
2820 ctx.retire(Err(e));
2821 return;
2822 }
2823 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2825 };
2826 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2827 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2828
2829 if timeout_dur == Duration::ZERO {
2831 timeout_dur = Duration::MAX;
2832 }
2833
2834 let style = ExprPrepOneShot {
2835 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2837 catalog_state: catalog.state(),
2838 };
2839 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2840 return_if_err!(style.prep_scalar_expr(expr), ctx);
2841 }
2842
2843 let make_diffs =
2844 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2845 let arena = RowArena::new();
2846 let mut diffs = Vec::new();
2847 let mut datum_vec = mz_repr::DatumVec::new();
2848
2849 while let Some(row) = rows.next() {
2850 if !assignments.is_empty() {
2851 assert!(
2852 matches!(kind, MutationKind::Update),
2853 "only updates support assignments"
2854 );
2855 let mut datums = datum_vec.borrow_with(row);
2856 let mut updates = vec![];
2857 for (idx, expr) in &assignments {
2858 let updated = match expr.eval(&datums, &arena) {
2859 Ok(updated) => updated,
2860 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2861 };
2862 updates.push((*idx, updated));
2863 }
2864 for (idx, new_value) in updates {
2865 datums[idx] = new_value;
2866 }
2867 let updated = Row::pack_slice(&datums);
2868 diffs.push((updated, Diff::ONE));
2869 }
2870 match kind {
2871 MutationKind::Update | MutationKind::Delete => {
2875 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2876 }
2877 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2878 }
2879 }
2880
2881 let mut byte_size: u64 = 0;
2884 for (row, diff) in &diffs {
2885 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2886 if diff.is_positive() {
2887 for (idx, datum) in row.iter().enumerate() {
2888 desc.constraints_met(idx, &datum)?;
2889 }
2890 }
2891 }
2892 Ok((diffs, byte_size))
2893 };
2894
2895 let diffs = match peek_response {
2896 ExecuteResponse::SendingRowsStreaming {
2897 rows: mut rows_stream,
2898 ..
2899 } => {
2900 let mut byte_size: u64 = 0;
2901 let mut diffs = Vec::new();
2902 let result = loop {
2903 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2904 Ok(Some(res)) => match res {
2905 PeekResponseUnary::Rows(new_rows) => {
2906 match make_diffs(new_rows) {
2907 Ok((mut new_diffs, new_byte_size)) => {
2908 byte_size = byte_size.saturating_add(new_byte_size);
2909 if byte_size > max_result_size {
2910 break Err(AdapterError::ResultSize(format!(
2911 "result exceeds max size of {max_result_size}"
2912 )));
2913 }
2914 diffs.append(&mut new_diffs)
2915 }
2916 Err(e) => break Err(e),
2917 };
2918 }
2919 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2920 PeekResponseUnary::Error(e) => {
2921 break Err(AdapterError::Unstructured(anyhow!(e)));
2922 }
2923 },
2924 Ok(None) => break Ok(diffs),
2925 Err(_) => {
2926 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2931 conn_id: ctx.session().conn_id().clone(),
2932 });
2933 if let Err(e) = result {
2934 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2935 }
2936 break Err(AdapterError::StatementTimeout);
2937 }
2938 }
2939 };
2940
2941 result
2942 }
2943 ExecuteResponse::SendingRowsImmediate { rows } => {
2944 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2945 }
2946 resp => Err(AdapterError::Unstructured(anyhow!(
2947 "unexpected peek response: {resp:?}"
2948 ))),
2949 };
2950
2951 let mut returning_rows = Vec::new();
2952 let mut diff_err: Option<AdapterError> = None;
2953 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2954 let arena = RowArena::new();
2955 for (row, diff) in diffs {
2956 if !diff.is_positive() {
2957 continue;
2958 }
2959 let mut returning_row = Row::with_capacity(returning.len());
2960 let mut packer = returning_row.packer();
2961 for expr in &returning {
2962 let datums: Vec<_> = row.iter().collect();
2963 match expr.eval(&datums, &arena) {
2964 Ok(datum) => {
2965 packer.push(datum);
2966 }
2967 Err(err) => {
2968 diff_err = Some(err.into());
2969 break;
2970 }
2971 }
2972 }
2973 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2974 let diff = match NonZeroUsize::try_from(diff) {
2975 Ok(diff) => diff,
2976 Err(err) => {
2977 diff_err = Some(err.into());
2978 break;
2979 }
2980 };
2981 returning_rows.push((returning_row, diff));
2982 if diff_err.is_some() {
2983 break;
2984 }
2985 }
2986 }
2987 let diffs = if let Some(err) = diff_err {
2988 Err(err)
2989 } else {
2990 diffs
2991 };
2992
2993 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2996 if let Some(timestamp_context) = timestamp_context {
3005 let (tx, rx) = tokio::sync::oneshot::channel();
3006 let conn_id = ctx.session().conn_id().clone();
3007 let pending_read_txn = PendingReadTxn {
3008 txn: PendingRead::ReadThenWrite { ctx, tx },
3009 timestamp_context,
3010 created: Instant::now(),
3011 num_requeues: 0,
3012 otel_ctx: OpenTelemetryContext::obtain(),
3013 };
3014 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3015 if let Err(e) = result {
3017 warn!(
3018 "strict_serializable_reads_tx dropped before we could send: {:?}",
3019 e
3020 );
3021 return;
3022 }
3023 let result = rx.await;
3024 ctx = match result {
3026 Ok(Some(ctx)) => ctx,
3027 Ok(None) => {
3028 return;
3031 }
3032 Err(e) => {
3033 warn!(
3034 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3035 e
3036 );
3037 return;
3038 }
3039 };
3040 }
3041
3042 match diffs {
3043 Ok(diffs) => {
3044 let result = Self::send_diffs(
3045 ctx.session_mut(),
3046 plan::SendDiffsPlan {
3047 id,
3048 updates: diffs,
3049 kind,
3050 returning: returning_rows,
3051 max_result_size,
3052 },
3053 );
3054 ctx.retire(result);
3055 }
3056 Err(e) => {
3057 ctx.retire(Err(e));
3058 }
3059 }
3060 });
3061 }
3062
3063 #[instrument]
3064 pub(super) async fn sequence_alter_item_rename(
3065 &mut self,
3066 ctx: &mut ExecuteContext,
3067 plan: plan::AlterItemRenamePlan,
3068 ) -> Result<ExecuteResponse, AdapterError> {
3069 let op = catalog::Op::RenameItem {
3070 id: plan.id,
3071 current_full_name: plan.current_full_name,
3072 to_name: plan.to_name,
3073 };
3074 match self
3075 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3076 .await
3077 {
3078 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3079 Err(err) => Err(err),
3080 }
3081 }
3082
3083 #[instrument]
3084 pub(super) async fn sequence_alter_retain_history(
3085 &mut self,
3086 ctx: &mut ExecuteContext,
3087 plan: plan::AlterRetainHistoryPlan,
3088 ) -> Result<ExecuteResponse, AdapterError> {
3089 let ops = vec![catalog::Op::AlterRetainHistory {
3090 id: plan.id,
3091 value: plan.value,
3092 window: plan.window,
3093 }];
3094 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3095 Box::pin(async move {
3096 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3097 let cluster = match catalog_item {
3098 CatalogItem::Table(_)
3099 | CatalogItem::MaterializedView(_)
3100 | CatalogItem::Source(_)
3101 | CatalogItem::ContinualTask(_) => None,
3102 CatalogItem::Index(index) => Some(index.cluster_id),
3103 CatalogItem::Log(_)
3104 | CatalogItem::View(_)
3105 | CatalogItem::Sink(_)
3106 | CatalogItem::Type(_)
3107 | CatalogItem::Func(_)
3108 | CatalogItem::Secret(_)
3109 | CatalogItem::Connection(_) => unreachable!(),
3110 };
3111 match cluster {
3112 Some(cluster) => {
3113 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3114 }
3115 None => {
3116 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3117 }
3118 }
3119 })
3120 })
3121 .await?;
3122 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3123 }
3124
3125 #[instrument]
3126 pub(super) async fn sequence_alter_schema_rename(
3127 &mut self,
3128 ctx: &mut ExecuteContext,
3129 plan: plan::AlterSchemaRenamePlan,
3130 ) -> Result<ExecuteResponse, AdapterError> {
3131 let (database_spec, schema_spec) = plan.cur_schema_spec;
3132 let op = catalog::Op::RenameSchema {
3133 database_spec,
3134 schema_spec,
3135 new_name: plan.new_schema_name,
3136 check_reserved_names: true,
3137 };
3138 match self
3139 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3140 .await
3141 {
3142 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3143 Err(err) => Err(err),
3144 }
3145 }
3146
3147 #[instrument]
3148 pub(super) async fn sequence_alter_schema_swap(
3149 &mut self,
3150 ctx: &mut ExecuteContext,
3151 plan: plan::AlterSchemaSwapPlan,
3152 ) -> Result<ExecuteResponse, AdapterError> {
3153 let plan::AlterSchemaSwapPlan {
3154 schema_a_spec: (schema_a_db, schema_a),
3155 schema_a_name,
3156 schema_b_spec: (schema_b_db, schema_b),
3157 schema_b_name,
3158 name_temp,
3159 } = plan;
3160
3161 let op_a = catalog::Op::RenameSchema {
3162 database_spec: schema_a_db,
3163 schema_spec: schema_a,
3164 new_name: name_temp,
3165 check_reserved_names: false,
3166 };
3167 let op_b = catalog::Op::RenameSchema {
3168 database_spec: schema_b_db,
3169 schema_spec: schema_b,
3170 new_name: schema_a_name,
3171 check_reserved_names: false,
3172 };
3173 let op_c = catalog::Op::RenameSchema {
3174 database_spec: schema_a_db,
3175 schema_spec: schema_a,
3176 new_name: schema_b_name,
3177 check_reserved_names: false,
3178 };
3179
3180 match self
3181 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3182 Box::pin(async {})
3183 })
3184 .await
3185 {
3186 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3187 Err(err) => Err(err),
3188 }
3189 }
3190
3191 #[instrument]
3192 pub(super) async fn sequence_alter_role(
3193 &mut self,
3194 session: &Session,
3195 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3196 ) -> Result<ExecuteResponse, AdapterError> {
3197 let catalog = self.catalog().for_session(session);
3198 let role = catalog.get_role(&id);
3199
3200 let mut notices = vec![];
3202
3203 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3205 let mut vars = role.vars().clone();
3206
3207 let mut nopassword = false;
3210
3211 match option {
3213 PlannedAlterRoleOption::Attributes(attrs) => {
3214 self.validate_role_attributes(&attrs.clone().into())?;
3215
3216 if let Some(inherit) = attrs.inherit {
3217 attributes.inherit = inherit;
3218 }
3219
3220 if let Some(password) = attrs.password {
3221 attributes.password = Some(password);
3222 attributes.scram_iterations =
3223 Some(self.catalog().system_config().scram_iterations())
3224 }
3225
3226 if let Some(superuser) = attrs.superuser {
3227 attributes.superuser = Some(superuser);
3228 }
3229
3230 if let Some(login) = attrs.login {
3231 attributes.login = Some(login);
3232 }
3233
3234 if attrs.nopassword.unwrap_or(false) {
3235 nopassword = true;
3236 }
3237
3238 if let Some(notice) = self.should_emit_rbac_notice(session) {
3239 notices.push(notice);
3240 }
3241 }
3242 PlannedAlterRoleOption::Variable(variable) => {
3243 let session_var = session.vars().inspect(variable.name())?;
3245 session_var.visible(session.user(), catalog.system_vars())?;
3247
3248 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3251 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3252 } else if let PlannedRoleVariable::Set {
3253 name,
3254 value: VariableValue::Values(vals),
3255 } = &variable
3256 {
3257 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3258 notices.push(AdapterNotice::IntrospectionClusterUsage);
3259 }
3260 }
3261
3262 let var_name = match variable {
3263 PlannedRoleVariable::Set { name, value } => {
3264 match &value {
3266 VariableValue::Default => {
3267 vars.remove(&name);
3268 }
3269 VariableValue::Values(vals) => {
3270 let var = match &vals[..] {
3271 [val] => OwnedVarInput::Flat(val.clone()),
3272 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3273 };
3274 session_var.check(var.borrow())?;
3276
3277 vars.insert(name.clone(), var);
3278 }
3279 };
3280 name
3281 }
3282 PlannedRoleVariable::Reset { name } => {
3283 vars.remove(&name);
3285 name
3286 }
3287 };
3288
3289 notices.push(AdapterNotice::VarDefaultUpdated {
3291 role: Some(name.clone()),
3292 var_name: Some(var_name),
3293 });
3294 }
3295 }
3296
3297 let op = catalog::Op::AlterRole {
3298 id,
3299 name,
3300 attributes,
3301 nopassword,
3302 vars: RoleVars { map: vars },
3303 };
3304 let response = self
3305 .catalog_transact(Some(session), vec![op])
3306 .await
3307 .map(|_| ExecuteResponse::AlteredRole)?;
3308
3309 session.add_notices(notices);
3311
3312 Ok(response)
3313 }
3314
3315 #[instrument]
3316 pub(super) async fn sequence_alter_sink_prepare(
3317 &mut self,
3318 ctx: ExecuteContext,
3319 plan: plan::AlterSinkPlan,
3320 ) {
3321 let id_bundle = crate::CollectionIdBundle {
3323 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3324 compute_ids: BTreeMap::new(),
3325 };
3326 let read_hold = self.acquire_read_holds(&id_bundle);
3327
3328 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3329 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3330 return;
3331 };
3332
3333 let otel_ctx = OpenTelemetryContext::obtain();
3334 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3335
3336 let plan_validity = PlanValidity::new(
3337 self.catalog().transient_revision(),
3338 BTreeSet::from_iter([plan.item_id, from_item_id]),
3339 Some(plan.in_cluster),
3340 None,
3341 ctx.session().role_metadata().clone(),
3342 );
3343
3344 info!(
3345 "preparing alter sink for {}: frontiers={:?} export={:?}",
3346 plan.global_id,
3347 self.controller
3348 .storage_collections
3349 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3350 self.controller.storage.export(plan.global_id)
3351 );
3352
3353 self.install_storage_watch_set(
3359 ctx.session().conn_id().clone(),
3360 BTreeSet::from_iter([plan.global_id]),
3361 read_ts,
3362 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3363 ctx: Some(ctx),
3364 otel_ctx,
3365 plan,
3366 plan_validity,
3367 read_hold,
3368 }),
3369 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3370 }
3371
3372 #[instrument]
3373 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3374 ctx.otel_ctx.attach_as_parent();
3375
3376 let plan::AlterSinkPlan {
3377 item_id,
3378 global_id,
3379 sink: sink_plan,
3380 with_snapshot,
3381 in_cluster,
3382 } = ctx.plan.clone();
3383
3384 match ctx.plan_validity.check(self.catalog()) {
3393 Ok(()) => {}
3394 Err(err) => {
3395 ctx.retire(Err(err));
3396 return;
3397 }
3398 }
3399
3400 let entry = self.catalog().get_entry(&item_id);
3401 let CatalogItem::Sink(old_sink) = entry.item() else {
3402 panic!("invalid item kind for `AlterSinkPlan`");
3403 };
3404
3405 if sink_plan.version != old_sink.version + 1 {
3406 ctx.retire(Err(AdapterError::ChangedPlan(
3407 "sink was altered concurrently".into(),
3408 )));
3409 return;
3410 }
3411
3412 info!(
3413 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3414 self.controller
3415 .storage_collections
3416 .collections_frontiers(vec![global_id, sink_plan.from]),
3417 self.controller.storage.export(global_id),
3418 );
3419
3420 let write_frontier = &self
3423 .controller
3424 .storage
3425 .export(global_id)
3426 .expect("sink known to exist")
3427 .write_frontier;
3428 let as_of = ctx.read_hold.least_valid_read();
3429 assert!(
3430 write_frontier.iter().all(|t| as_of.less_than(t)),
3431 "{:?} should be strictly less than {:?}",
3432 &*as_of,
3433 &**write_frontier
3434 );
3435
3436 let create_sql = &old_sink.create_sql;
3442 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3443 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3444 unreachable!("invalid statement kind for sink");
3445 };
3446
3447 stmt.with_options
3449 .retain(|o| o.name != CreateSinkOptionName::Version);
3450 stmt.with_options.push(CreateSinkOption {
3451 name: CreateSinkOptionName::Version,
3452 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3453 sink_plan.version.to_string(),
3454 ))),
3455 });
3456
3457 let conn_catalog = self.catalog().for_system_session();
3458 let (mut stmt, resolved_ids) =
3459 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3460
3461 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3463 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3464 stmt.from = ResolvedItemName::Item {
3465 id: from_entry.id(),
3466 qualifiers: from_entry.name.qualifiers.clone(),
3467 full_name,
3468 print_id: true,
3469 version: from_entry.version,
3470 };
3471
3472 let new_sink = Sink {
3473 create_sql: stmt.to_ast_string_stable(),
3474 global_id,
3475 from: sink_plan.from,
3476 connection: sink_plan.connection.clone(),
3477 envelope: sink_plan.envelope,
3478 version: sink_plan.version,
3479 with_snapshot,
3480 resolved_ids: resolved_ids.clone(),
3481 cluster_id: in_cluster,
3482 commit_interval: sink_plan.commit_interval,
3483 };
3484
3485 let ops = vec![catalog::Op::UpdateItem {
3486 id: item_id,
3487 name: entry.name().clone(),
3488 to_item: CatalogItem::Sink(new_sink),
3489 }];
3490
3491 match self
3492 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3493 .await
3494 {
3495 Ok(()) => {}
3496 Err(err) => {
3497 ctx.retire(Err(err));
3498 return;
3499 }
3500 }
3501
3502 let storage_sink_desc = StorageSinkDesc {
3503 from: sink_plan.from,
3504 from_desc: from_entry
3505 .relation_desc()
3506 .expect("sinks can only be built on items with descs")
3507 .into_owned(),
3508 connection: sink_plan
3509 .connection
3510 .clone()
3511 .into_inline_connection(self.catalog().state()),
3512 envelope: sink_plan.envelope,
3513 as_of,
3514 with_snapshot,
3515 version: sink_plan.version,
3516 from_storage_metadata: (),
3517 to_storage_metadata: (),
3518 commit_interval: sink_plan.commit_interval,
3519 };
3520
3521 self.controller
3522 .storage
3523 .alter_export(
3524 global_id,
3525 ExportDescription {
3526 sink: storage_sink_desc,
3527 instance_id: in_cluster,
3528 },
3529 )
3530 .await
3531 .unwrap_or_terminate("cannot fail to alter source desc");
3532
3533 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3534 }
3535
3536 #[instrument]
3537 pub(super) async fn sequence_alter_connection(
3538 &mut self,
3539 ctx: ExecuteContext,
3540 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3541 ) {
3542 match action {
3543 AlterConnectionAction::RotateKeys => {
3544 self.sequence_rotate_keys(ctx, id).await;
3545 }
3546 AlterConnectionAction::AlterOptions {
3547 set_options,
3548 drop_options,
3549 validate,
3550 } => {
3551 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3552 .await
3553 }
3554 }
3555 }
3556
3557 #[instrument]
3558 async fn sequence_alter_connection_options(
3559 &mut self,
3560 mut ctx: ExecuteContext,
3561 id: CatalogItemId,
3562 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3563 drop_options: BTreeSet<ConnectionOptionName>,
3564 validate: bool,
3565 ) {
3566 let cur_entry = self.catalog().get_entry(&id);
3567 let cur_conn = cur_entry.connection().expect("known to be connection");
3568 let connection_gid = cur_conn.global_id();
3569
3570 let inner = || -> Result<Connection, AdapterError> {
3571 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3573 .expect("invalid create sql persisted to catalog")
3574 .into_element()
3575 .ast
3576 {
3577 Statement::CreateConnection(stmt) => stmt,
3578 _ => unreachable!("proved type is source"),
3579 };
3580
3581 let catalog = self.catalog().for_system_session();
3582
3583 let (mut create_conn_stmt, resolved_ids) =
3585 mz_sql::names::resolve(&catalog, create_conn_stmt)
3586 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3587
3588 create_conn_stmt
3590 .values
3591 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3592
3593 create_conn_stmt.values.extend(
3595 set_options
3596 .into_iter()
3597 .map(|(name, value)| ConnectionOption { name, value }),
3598 );
3599
3600 let mut catalog = self.catalog().for_system_session();
3603 catalog.mark_id_unresolvable_for_replanning(id);
3604
3605 let plan = match mz_sql::plan::plan(
3607 None,
3608 &catalog,
3609 Statement::CreateConnection(create_conn_stmt),
3610 &Params::empty(),
3611 &resolved_ids,
3612 )
3613 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3614 {
3615 Plan::CreateConnection(plan) => plan,
3616 _ => unreachable!("create source plan is only valid response"),
3617 };
3618
3619 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3621 .expect("invalid create sql persisted to catalog")
3622 .into_element()
3623 .ast
3624 {
3625 Statement::CreateConnection(stmt) => stmt,
3626 _ => unreachable!("proved type is source"),
3627 };
3628
3629 let catalog = self.catalog().for_system_session();
3630
3631 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3633 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3634
3635 Ok(Connection {
3636 create_sql: plan.connection.create_sql,
3637 global_id: cur_conn.global_id,
3638 details: plan.connection.details,
3639 resolved_ids: new_deps,
3640 })
3641 };
3642
3643 let conn = match inner() {
3644 Ok(conn) => conn,
3645 Err(e) => {
3646 return ctx.retire(Err(e));
3647 }
3648 };
3649
3650 if validate {
3651 let connection = conn
3652 .details
3653 .to_connection()
3654 .into_inline_connection(self.catalog().state());
3655
3656 let internal_cmd_tx = self.internal_cmd_tx.clone();
3657 let transient_revision = self.catalog().transient_revision();
3658 let conn_id = ctx.session().conn_id().clone();
3659 let otel_ctx = OpenTelemetryContext::obtain();
3660 let role_metadata = ctx.session().role_metadata().clone();
3661 let current_storage_parameters = self.controller.storage.config().clone();
3662
3663 task::spawn(
3664 || format!("validate_alter_connection:{conn_id}"),
3665 async move {
3666 let resolved_ids = conn.resolved_ids.clone();
3667 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3668 let result = match connection.validate(id, ¤t_storage_parameters).await {
3669 Ok(()) => Ok(conn),
3670 Err(err) => Err(err.into()),
3671 };
3672
3673 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3675 AlterConnectionValidationReady {
3676 ctx,
3677 result,
3678 connection_id: id,
3679 connection_gid,
3680 plan_validity: PlanValidity::new(
3681 transient_revision,
3682 dependency_ids.clone(),
3683 None,
3684 None,
3685 role_metadata,
3686 ),
3687 otel_ctx,
3688 resolved_ids,
3689 },
3690 ));
3691 if let Err(e) = result {
3692 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3693 }
3694 },
3695 );
3696 } else {
3697 let result = self
3698 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3699 .await;
3700 ctx.retire(result);
3701 }
3702 }
3703
3704 #[instrument]
3705 pub(crate) async fn sequence_alter_connection_stage_finish(
3706 &mut self,
3707 session: &Session,
3708 id: CatalogItemId,
3709 connection: Connection,
3710 ) -> Result<ExecuteResponse, AdapterError> {
3711 match self.catalog.get_entry(&id).item() {
3712 CatalogItem::Connection(curr_conn) => {
3713 curr_conn
3714 .details
3715 .to_connection()
3716 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3717 .map_err(StorageError::from)?;
3718 }
3719 _ => unreachable!("known to be a connection"),
3720 };
3721
3722 let ops = vec![catalog::Op::UpdateItem {
3723 id,
3724 name: self.catalog.get_entry(&id).name().clone(),
3725 to_item: CatalogItem::Connection(connection.clone()),
3726 }];
3727
3728 self.catalog_transact(Some(session), ops).await?;
3729
3730 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3737 }
3738
3739 #[instrument]
3740 pub(super) async fn sequence_alter_source(
3741 &mut self,
3742 session: &Session,
3743 plan::AlterSourcePlan {
3744 item_id,
3745 ingestion_id,
3746 action,
3747 }: plan::AlterSourcePlan,
3748 ) -> Result<ExecuteResponse, AdapterError> {
3749 let cur_entry = self.catalog().get_entry(&item_id);
3750 let cur_source = cur_entry.source().expect("known to be source");
3751
3752 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3753 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3755 .expect("invalid create sql persisted to catalog")
3756 .into_element()
3757 .ast
3758 {
3759 Statement::CreateSource(stmt) => stmt,
3760 _ => unreachable!("proved type is source"),
3761 };
3762
3763 let catalog = coord.catalog().for_system_session();
3764
3765 mz_sql::names::resolve(&catalog, create_source_stmt)
3767 .map_err(|e| AdapterError::internal(err_cx, e))
3768 };
3769
3770 match action {
3771 plan::AlterSourceAction::AddSubsourceExports {
3772 subsources,
3773 options,
3774 } => {
3775 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3776
3777 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3778 text_columns: mut new_text_columns,
3779 exclude_columns: mut new_exclude_columns,
3780 ..
3781 } = options.try_into()?;
3782
3783 let (mut create_source_stmt, resolved_ids) =
3785 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3786
3787 let catalog = self.catalog();
3789 let curr_references: BTreeSet<_> = catalog
3790 .get_entry(&item_id)
3791 .used_by()
3792 .into_iter()
3793 .filter_map(|subsource| {
3794 catalog
3795 .get_entry(subsource)
3796 .subsource_details()
3797 .map(|(_id, reference, _details)| reference)
3798 })
3799 .collect();
3800
3801 let purification_err =
3804 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3805
3806 match &mut create_source_stmt.connection {
3810 CreateSourceConnection::Postgres {
3811 options: curr_options,
3812 ..
3813 } => {
3814 let mz_sql::plan::PgConfigOptionExtracted {
3815 mut text_columns, ..
3816 } = curr_options.clone().try_into()?;
3817
3818 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3821
3822 text_columns.retain(|column_qualified_reference| {
3824 mz_ore::soft_assert_eq_or_log!(
3825 column_qualified_reference.0.len(),
3826 4,
3827 "all TEXT COLUMNS values must be column-qualified references"
3828 );
3829 let mut table = column_qualified_reference.clone();
3830 table.0.truncate(3);
3831 curr_references.contains(&table)
3832 });
3833
3834 new_text_columns.extend(text_columns);
3836
3837 if !new_text_columns.is_empty() {
3839 new_text_columns.sort();
3840 let new_text_columns = new_text_columns
3841 .into_iter()
3842 .map(WithOptionValue::UnresolvedItemName)
3843 .collect();
3844
3845 curr_options.push(PgConfigOption {
3846 name: PgConfigOptionName::TextColumns,
3847 value: Some(WithOptionValue::Sequence(new_text_columns)),
3848 });
3849 }
3850 }
3851 CreateSourceConnection::MySql {
3852 options: curr_options,
3853 ..
3854 } => {
3855 let mz_sql::plan::MySqlConfigOptionExtracted {
3856 mut text_columns,
3857 mut exclude_columns,
3858 ..
3859 } = curr_options.clone().try_into()?;
3860
3861 curr_options.retain(|o| {
3864 !matches!(
3865 o.name,
3866 MySqlConfigOptionName::TextColumns
3867 | MySqlConfigOptionName::ExcludeColumns
3868 )
3869 });
3870
3871 let column_referenced =
3873 |column_qualified_reference: &UnresolvedItemName| {
3874 mz_ore::soft_assert_eq_or_log!(
3875 column_qualified_reference.0.len(),
3876 3,
3877 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3878 );
3879 let mut table = column_qualified_reference.clone();
3880 table.0.truncate(2);
3881 curr_references.contains(&table)
3882 };
3883 text_columns.retain(column_referenced);
3884 exclude_columns.retain(column_referenced);
3885
3886 new_text_columns.extend(text_columns);
3888 new_exclude_columns.extend(exclude_columns);
3889
3890 if !new_text_columns.is_empty() {
3892 new_text_columns.sort();
3893 let new_text_columns = new_text_columns
3894 .into_iter()
3895 .map(WithOptionValue::UnresolvedItemName)
3896 .collect();
3897
3898 curr_options.push(MySqlConfigOption {
3899 name: MySqlConfigOptionName::TextColumns,
3900 value: Some(WithOptionValue::Sequence(new_text_columns)),
3901 });
3902 }
3903 if !new_exclude_columns.is_empty() {
3905 new_exclude_columns.sort();
3906 let new_exclude_columns = new_exclude_columns
3907 .into_iter()
3908 .map(WithOptionValue::UnresolvedItemName)
3909 .collect();
3910
3911 curr_options.push(MySqlConfigOption {
3912 name: MySqlConfigOptionName::ExcludeColumns,
3913 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3914 });
3915 }
3916 }
3917 CreateSourceConnection::SqlServer {
3918 options: curr_options,
3919 ..
3920 } => {
3921 let mz_sql::plan::SqlServerConfigOptionExtracted {
3922 mut text_columns,
3923 mut exclude_columns,
3924 ..
3925 } = curr_options.clone().try_into()?;
3926
3927 curr_options.retain(|o| {
3930 !matches!(
3931 o.name,
3932 SqlServerConfigOptionName::TextColumns
3933 | SqlServerConfigOptionName::ExcludeColumns
3934 )
3935 });
3936
3937 let column_referenced =
3939 |column_qualified_reference: &UnresolvedItemName| {
3940 mz_ore::soft_assert_eq_or_log!(
3941 column_qualified_reference.0.len(),
3942 3,
3943 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3944 );
3945 let mut table = column_qualified_reference.clone();
3946 table.0.truncate(2);
3947 curr_references.contains(&table)
3948 };
3949 text_columns.retain(column_referenced);
3950 exclude_columns.retain(column_referenced);
3951
3952 new_text_columns.extend(text_columns);
3954 new_exclude_columns.extend(exclude_columns);
3955
3956 if !new_text_columns.is_empty() {
3958 new_text_columns.sort();
3959 let new_text_columns = new_text_columns
3960 .into_iter()
3961 .map(WithOptionValue::UnresolvedItemName)
3962 .collect();
3963
3964 curr_options.push(SqlServerConfigOption {
3965 name: SqlServerConfigOptionName::TextColumns,
3966 value: Some(WithOptionValue::Sequence(new_text_columns)),
3967 });
3968 }
3969 if !new_exclude_columns.is_empty() {
3971 new_exclude_columns.sort();
3972 let new_exclude_columns = new_exclude_columns
3973 .into_iter()
3974 .map(WithOptionValue::UnresolvedItemName)
3975 .collect();
3976
3977 curr_options.push(SqlServerConfigOption {
3978 name: SqlServerConfigOptionName::ExcludeColumns,
3979 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3980 });
3981 }
3982 }
3983 _ => return Err(purification_err()),
3984 };
3985
3986 let mut catalog = self.catalog().for_system_session();
3987 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3988
3989 let plan = match mz_sql::plan::plan(
3991 None,
3992 &catalog,
3993 Statement::CreateSource(create_source_stmt),
3994 &Params::empty(),
3995 &resolved_ids,
3996 )
3997 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
3998 {
3999 Plan::CreateSource(plan) => plan,
4000 _ => unreachable!("create source plan is only valid response"),
4001 };
4002
4003 let source = Source::new(
4007 plan,
4008 cur_source.global_id,
4009 resolved_ids,
4010 cur_source.custom_logical_compaction_window,
4011 cur_source.is_retained_metrics_object,
4012 );
4013
4014 let desc = match &source.data_source {
4016 DataSourceDesc::Ingestion { desc, .. }
4017 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4018 desc.clone().into_inline_connection(self.catalog().state())
4019 }
4020 _ => unreachable!("already verified of type ingestion"),
4021 };
4022
4023 self.controller
4024 .storage
4025 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4026 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4027
4028 let mut ops = vec![catalog::Op::UpdateItem {
4031 id: item_id,
4032 name: self.catalog.get_entry(&item_id).name().clone(),
4035 to_item: CatalogItem::Source(source),
4036 }];
4037
4038 let CreateSourceInner {
4039 ops: new_ops,
4040 sources: _,
4041 if_not_exists_ids,
4042 } = self.create_source_inner(session, subsources).await?;
4043
4044 ops.extend(new_ops.into_iter());
4045
4046 assert!(
4047 if_not_exists_ids.is_empty(),
4048 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4049 );
4050
4051 self.catalog_transact(Some(session), ops).await?;
4052 }
4053 plan::AlterSourceAction::RefreshReferences { references } => {
4054 self.catalog_transact(
4055 Some(session),
4056 vec![catalog::Op::UpdateSourceReferences {
4057 source_id: item_id,
4058 references: references.into(),
4059 }],
4060 )
4061 .await?;
4062 }
4063 }
4064
4065 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4066 }
4067
4068 #[instrument]
4069 pub(super) async fn sequence_alter_system_set(
4070 &mut self,
4071 session: &Session,
4072 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4073 ) -> Result<ExecuteResponse, AdapterError> {
4074 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4075 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4077 self.validate_alter_system_network_policy(session, &value)?;
4078 }
4079
4080 let op = match value {
4081 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4082 name: name.clone(),
4083 value: OwnedVarInput::SqlSet(values),
4084 },
4085 plan::VariableValue::Default => {
4086 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4087 }
4088 };
4089 self.catalog_transact(Some(session), vec![op]).await?;
4090
4091 session.add_notice(AdapterNotice::VarDefaultUpdated {
4092 role: None,
4093 var_name: Some(name),
4094 });
4095 Ok(ExecuteResponse::AlteredSystemConfiguration)
4096 }
4097
4098 #[instrument]
4099 pub(super) async fn sequence_alter_system_reset(
4100 &mut self,
4101 session: &Session,
4102 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4103 ) -> Result<ExecuteResponse, AdapterError> {
4104 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4105 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4106 self.catalog_transact(Some(session), vec![op]).await?;
4107 session.add_notice(AdapterNotice::VarDefaultUpdated {
4108 role: None,
4109 var_name: Some(name),
4110 });
4111 Ok(ExecuteResponse::AlteredSystemConfiguration)
4112 }
4113
4114 #[instrument]
4115 pub(super) async fn sequence_alter_system_reset_all(
4116 &mut self,
4117 session: &Session,
4118 _: plan::AlterSystemResetAllPlan,
4119 ) -> Result<ExecuteResponse, AdapterError> {
4120 self.is_user_allowed_to_alter_system(session, None)?;
4121 let op = catalog::Op::ResetAllSystemConfiguration;
4122 self.catalog_transact(Some(session), vec![op]).await?;
4123 session.add_notice(AdapterNotice::VarDefaultUpdated {
4124 role: None,
4125 var_name: None,
4126 });
4127 Ok(ExecuteResponse::AlteredSystemConfiguration)
4128 }
4129
4130 fn is_user_allowed_to_alter_system(
4132 &self,
4133 session: &Session,
4134 var_name: Option<&str>,
4135 ) -> Result<(), AdapterError> {
4136 match (session.user().kind(), var_name) {
4137 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4139 (UserKind::Superuser, Some(name))
4141 if session.user().is_internal()
4142 || self.catalog().system_config().user_modifiable(name) =>
4143 {
4144 let var = self.catalog().system_config().get(name)?;
4147 var.visible(session.user(), self.catalog().system_config())?;
4148 Ok(())
4149 }
4150 (UserKind::Regular, Some(name))
4153 if self.catalog().system_config().user_modifiable(name) =>
4154 {
4155 Err(AdapterError::Unauthorized(
4156 rbac::UnauthorizedError::Superuser {
4157 action: format!("toggle the '{name}' system configuration parameter"),
4158 },
4159 ))
4160 }
4161 _ => Err(AdapterError::Unauthorized(
4162 rbac::UnauthorizedError::MzSystem {
4163 action: "alter system".into(),
4164 },
4165 )),
4166 }
4167 }
4168
4169 fn validate_alter_system_network_policy(
4170 &self,
4171 session: &Session,
4172 policy_value: &plan::VariableValue,
4173 ) -> Result<(), AdapterError> {
4174 let policy_name = match &policy_value {
4175 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4177 plan::VariableValue::Values(values) if values.len() == 1 => {
4178 values.iter().next().cloned()
4179 }
4180 plan::VariableValue::Values(values) => {
4181 tracing::warn!(?values, "can't set multiple network policies at once");
4182 None
4183 }
4184 };
4185 let maybe_network_policy = policy_name
4186 .as_ref()
4187 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4188 let Some(network_policy) = maybe_network_policy else {
4189 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4190 VarError::InvalidParameterValue {
4191 name: NETWORK_POLICY.name(),
4192 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4193 reason: "no network policy with such name exists".to_string(),
4194 },
4195 )));
4196 };
4197 self.validate_alter_network_policy(session, &network_policy.rules)
4198 }
4199
4200 fn validate_alter_network_policy(
4205 &self,
4206 session: &Session,
4207 policy_rules: &Vec<NetworkPolicyRule>,
4208 ) -> Result<(), AdapterError> {
4209 if session.user().is_internal() {
4212 return Ok(());
4213 }
4214 if let Some(ip) = session.meta().client_ip() {
4215 validate_ip_with_policy_rules(ip, policy_rules)
4216 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4217 } else {
4218 return Err(AdapterError::NetworkPolicyDenied(
4221 NetworkPolicyError::MissingIp,
4222 ));
4223 }
4224 Ok(())
4225 }
4226
4227 #[instrument]
4229 pub(super) fn sequence_execute(
4230 &self,
4231 session: &mut Session,
4232 plan: plan::ExecutePlan,
4233 ) -> Result<String, AdapterError> {
4234 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4236 let ps = session
4237 .get_prepared_statement_unverified(&plan.name)
4238 .expect("known to exist");
4239 let stmt = ps.stmt().cloned();
4240 let desc = ps.desc().clone();
4241 let state_revision = ps.state_revision;
4242 let logging = Arc::clone(ps.logging());
4243 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4244 }
4245
4246 #[instrument]
4247 pub(super) async fn sequence_grant_privileges(
4248 &mut self,
4249 session: &Session,
4250 plan::GrantPrivilegesPlan {
4251 update_privileges,
4252 grantees,
4253 }: plan::GrantPrivilegesPlan,
4254 ) -> Result<ExecuteResponse, AdapterError> {
4255 self.sequence_update_privileges(
4256 session,
4257 update_privileges,
4258 grantees,
4259 UpdatePrivilegeVariant::Grant,
4260 )
4261 .await
4262 }
4263
4264 #[instrument]
4265 pub(super) async fn sequence_revoke_privileges(
4266 &mut self,
4267 session: &Session,
4268 plan::RevokePrivilegesPlan {
4269 update_privileges,
4270 revokees,
4271 }: plan::RevokePrivilegesPlan,
4272 ) -> Result<ExecuteResponse, AdapterError> {
4273 self.sequence_update_privileges(
4274 session,
4275 update_privileges,
4276 revokees,
4277 UpdatePrivilegeVariant::Revoke,
4278 )
4279 .await
4280 }
4281
4282 #[instrument]
4283 async fn sequence_update_privileges(
4284 &mut self,
4285 session: &Session,
4286 update_privileges: Vec<UpdatePrivilege>,
4287 grantees: Vec<RoleId>,
4288 variant: UpdatePrivilegeVariant,
4289 ) -> Result<ExecuteResponse, AdapterError> {
4290 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4291 let mut warnings = Vec::new();
4292 let catalog = self.catalog().for_session(session);
4293
4294 for UpdatePrivilege {
4295 acl_mode,
4296 target_id,
4297 grantor,
4298 } in update_privileges
4299 {
4300 let actual_object_type = catalog.get_system_object_type(&target_id);
4301 if actual_object_type.is_relation() {
4304 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4305 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4306 if !non_applicable_privileges.is_empty() {
4307 let object_description =
4308 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4309 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4310 non_applicable_privileges,
4311 object_description,
4312 })
4313 }
4314 }
4315
4316 if let SystemObjectId::Object(object_id) = &target_id {
4317 self.catalog()
4318 .ensure_not_reserved_object(object_id, session.conn_id())?;
4319 }
4320
4321 let privileges = self
4322 .catalog()
4323 .get_privileges(&target_id, session.conn_id())
4324 .ok_or(AdapterError::Unsupported(
4327 "GRANTs/REVOKEs on an object type with no privileges",
4328 ))?;
4329
4330 for grantee in &grantees {
4331 self.catalog().ensure_not_system_role(grantee)?;
4332 self.catalog().ensure_not_predefined_role(grantee)?;
4333 let existing_privilege = privileges
4334 .get_acl_item(grantee, &grantor)
4335 .map(Cow::Borrowed)
4336 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4337
4338 match variant {
4339 UpdatePrivilegeVariant::Grant
4340 if !existing_privilege.acl_mode.contains(acl_mode) =>
4341 {
4342 ops.push(catalog::Op::UpdatePrivilege {
4343 target_id: target_id.clone(),
4344 privilege: MzAclItem {
4345 grantee: *grantee,
4346 grantor,
4347 acl_mode,
4348 },
4349 variant,
4350 });
4351 }
4352 UpdatePrivilegeVariant::Revoke
4353 if !existing_privilege
4354 .acl_mode
4355 .intersection(acl_mode)
4356 .is_empty() =>
4357 {
4358 ops.push(catalog::Op::UpdatePrivilege {
4359 target_id: target_id.clone(),
4360 privilege: MzAclItem {
4361 grantee: *grantee,
4362 grantor,
4363 acl_mode,
4364 },
4365 variant,
4366 });
4367 }
4368 _ => {}
4370 }
4371 }
4372 }
4373
4374 if ops.is_empty() {
4375 session.add_notices(warnings);
4376 return Ok(variant.into());
4377 }
4378
4379 let res = self
4380 .catalog_transact(Some(session), ops)
4381 .await
4382 .map(|_| match variant {
4383 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4384 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4385 });
4386 if res.is_ok() {
4387 session.add_notices(warnings);
4388 }
4389 res
4390 }
4391
4392 #[instrument]
4393 pub(super) async fn sequence_alter_default_privileges(
4394 &mut self,
4395 session: &Session,
4396 plan::AlterDefaultPrivilegesPlan {
4397 privilege_objects,
4398 privilege_acl_items,
4399 is_grant,
4400 }: plan::AlterDefaultPrivilegesPlan,
4401 ) -> Result<ExecuteResponse, AdapterError> {
4402 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4403 let variant = if is_grant {
4404 UpdatePrivilegeVariant::Grant
4405 } else {
4406 UpdatePrivilegeVariant::Revoke
4407 };
4408 for privilege_object in &privilege_objects {
4409 self.catalog()
4410 .ensure_not_system_role(&privilege_object.role_id)?;
4411 self.catalog()
4412 .ensure_not_predefined_role(&privilege_object.role_id)?;
4413 if let Some(database_id) = privilege_object.database_id {
4414 self.catalog()
4415 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4416 }
4417 if let Some(schema_id) = privilege_object.schema_id {
4418 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4419 let schema_spec: SchemaSpecifier = schema_id.into();
4420
4421 self.catalog().ensure_not_reserved_object(
4422 &(database_spec, schema_spec).into(),
4423 session.conn_id(),
4424 )?;
4425 }
4426 for privilege_acl_item in &privilege_acl_items {
4427 self.catalog()
4428 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4429 self.catalog()
4430 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4431 ops.push(catalog::Op::UpdateDefaultPrivilege {
4432 privilege_object: privilege_object.clone(),
4433 privilege_acl_item: privilege_acl_item.clone(),
4434 variant,
4435 })
4436 }
4437 }
4438
4439 self.catalog_transact(Some(session), ops).await?;
4440 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4441 }
4442
4443 #[instrument]
4444 pub(super) async fn sequence_grant_role(
4445 &mut self,
4446 session: &Session,
4447 plan::GrantRolePlan {
4448 role_ids,
4449 member_ids,
4450 grantor_id,
4451 }: plan::GrantRolePlan,
4452 ) -> Result<ExecuteResponse, AdapterError> {
4453 let catalog = self.catalog();
4454 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4455 for role_id in role_ids {
4456 for member_id in &member_ids {
4457 let member_membership: BTreeSet<_> =
4458 catalog.get_role(member_id).membership().keys().collect();
4459 if member_membership.contains(&role_id) {
4460 let role_name = catalog.get_role(&role_id).name().to_string();
4461 let member_name = catalog.get_role(member_id).name().to_string();
4462 catalog.ensure_not_reserved_role(member_id)?;
4464 catalog.ensure_grantable_role(&role_id)?;
4465 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4466 role_name,
4467 member_name,
4468 });
4469 } else {
4470 ops.push(catalog::Op::GrantRole {
4471 role_id,
4472 member_id: *member_id,
4473 grantor_id,
4474 });
4475 }
4476 }
4477 }
4478
4479 if ops.is_empty() {
4480 return Ok(ExecuteResponse::GrantedRole);
4481 }
4482
4483 self.catalog_transact(Some(session), ops)
4484 .await
4485 .map(|_| ExecuteResponse::GrantedRole)
4486 }
4487
4488 #[instrument]
4489 pub(super) async fn sequence_revoke_role(
4490 &mut self,
4491 session: &Session,
4492 plan::RevokeRolePlan {
4493 role_ids,
4494 member_ids,
4495 grantor_id,
4496 }: plan::RevokeRolePlan,
4497 ) -> Result<ExecuteResponse, AdapterError> {
4498 let catalog = self.catalog();
4499 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4500 for role_id in role_ids {
4501 for member_id in &member_ids {
4502 let member_membership: BTreeSet<_> =
4503 catalog.get_role(member_id).membership().keys().collect();
4504 if !member_membership.contains(&role_id) {
4505 let role_name = catalog.get_role(&role_id).name().to_string();
4506 let member_name = catalog.get_role(member_id).name().to_string();
4507 catalog.ensure_not_reserved_role(member_id)?;
4509 catalog.ensure_grantable_role(&role_id)?;
4510 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4511 role_name,
4512 member_name,
4513 });
4514 } else {
4515 ops.push(catalog::Op::RevokeRole {
4516 role_id,
4517 member_id: *member_id,
4518 grantor_id,
4519 });
4520 }
4521 }
4522 }
4523
4524 if ops.is_empty() {
4525 return Ok(ExecuteResponse::RevokedRole);
4526 }
4527
4528 self.catalog_transact(Some(session), ops)
4529 .await
4530 .map(|_| ExecuteResponse::RevokedRole)
4531 }
4532
4533 #[instrument]
4534 pub(super) async fn sequence_alter_owner(
4535 &mut self,
4536 session: &Session,
4537 plan::AlterOwnerPlan {
4538 id,
4539 object_type,
4540 new_owner,
4541 }: plan::AlterOwnerPlan,
4542 ) -> Result<ExecuteResponse, AdapterError> {
4543 let mut ops = vec![catalog::Op::UpdateOwner {
4544 id: id.clone(),
4545 new_owner,
4546 }];
4547
4548 match &id {
4549 ObjectId::Item(global_id) => {
4550 let entry = self.catalog().get_entry(global_id);
4551
4552 if entry.is_index() {
4554 let name = self
4555 .catalog()
4556 .resolve_full_name(entry.name(), Some(session.conn_id()))
4557 .to_string();
4558 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4559 return Ok(ExecuteResponse::AlteredObject(object_type));
4560 }
4561
4562 let dependent_index_ops = entry
4564 .used_by()
4565 .into_iter()
4566 .filter(|id| self.catalog().get_entry(id).is_index())
4567 .map(|id| catalog::Op::UpdateOwner {
4568 id: ObjectId::Item(*id),
4569 new_owner,
4570 });
4571 ops.extend(dependent_index_ops);
4572
4573 let dependent_subsources =
4575 entry
4576 .progress_id()
4577 .into_iter()
4578 .map(|item_id| catalog::Op::UpdateOwner {
4579 id: ObjectId::Item(item_id),
4580 new_owner,
4581 });
4582 ops.extend(dependent_subsources);
4583 }
4584 ObjectId::Cluster(cluster_id) => {
4585 let cluster = self.catalog().get_cluster(*cluster_id);
4586 let managed_cluster_replica_ops =
4588 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4589 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4590 new_owner,
4591 });
4592 ops.extend(managed_cluster_replica_ops);
4593 }
4594 _ => {}
4595 }
4596
4597 self.catalog_transact(Some(session), ops)
4598 .await
4599 .map(|_| ExecuteResponse::AlteredObject(object_type))
4600 }
4601
4602 #[instrument]
4603 pub(super) async fn sequence_reassign_owned(
4604 &mut self,
4605 session: &Session,
4606 plan::ReassignOwnedPlan {
4607 old_roles,
4608 new_role,
4609 reassign_ids,
4610 }: plan::ReassignOwnedPlan,
4611 ) -> Result<ExecuteResponse, AdapterError> {
4612 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4613 self.catalog().ensure_not_reserved_role(role_id)?;
4614 }
4615
4616 let ops = reassign_ids
4617 .into_iter()
4618 .map(|id| catalog::Op::UpdateOwner {
4619 id,
4620 new_owner: new_role,
4621 })
4622 .collect();
4623
4624 self.catalog_transact(Some(session), ops)
4625 .await
4626 .map(|_| ExecuteResponse::ReassignOwned)
4627 }
4628
4629 #[instrument]
4630 pub(crate) async fn handle_deferred_statement(&mut self) {
4631 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4635 return;
4636 };
4637 match ps {
4638 crate::coord::PlanStatement::Statement { stmt, params } => {
4639 self.handle_execute_inner(stmt, params, ctx).await;
4640 }
4641 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4642 self.sequence_plan(ctx, plan, resolved_ids).await;
4643 }
4644 }
4645 }
4646
4647 #[instrument]
4648 #[allow(clippy::unused_async)]
4650 pub(super) async fn sequence_alter_table(
4651 &mut self,
4652 ctx: &mut ExecuteContext,
4653 plan: plan::AlterTablePlan,
4654 ) -> Result<ExecuteResponse, AdapterError> {
4655 let plan::AlterTablePlan {
4656 relation_id,
4657 column_name,
4658 column_type,
4659 raw_sql_type,
4660 } = plan;
4661
4662 let id_ts = self.get_catalog_write_ts().await;
4664 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4665 let ops = vec![catalog::Op::AlterAddColumn {
4666 id: relation_id,
4667 new_global_id,
4668 name: column_name,
4669 typ: column_type,
4670 sql: raw_sql_type,
4671 }];
4672
4673 self.catalog_transact_with_context(None, Some(ctx), ops)
4674 .await?;
4675
4676 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4677 }
4678
4679 #[instrument]
4681 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4682 &mut self,
4683 ctx: ExecuteContext,
4684 plan: AlterMaterializedViewApplyReplacementPlan,
4685 ) {
4686 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4696
4697 let plan_validity = PlanValidity::new(
4698 self.catalog().transient_revision(),
4699 BTreeSet::from_iter([id, replacement_id]),
4700 None,
4701 None,
4702 ctx.session().role_metadata().clone(),
4703 );
4704
4705 let target = self.catalog.get_entry(&id);
4706 let target_gid = target.latest_global_id();
4707
4708 let replacement = self.catalog.get_entry(&replacement_id);
4709 let replacement_gid = replacement.latest_global_id();
4710
4711 let target_upper = self
4712 .controller
4713 .storage_collections
4714 .collection_frontiers(target_gid)
4715 .expect("target MV exists")
4716 .write_frontier;
4717 let replacement_upper = self
4718 .controller
4719 .compute
4720 .collection_frontiers(replacement_gid, replacement.cluster_id())
4721 .expect("replacement MV exists")
4722 .write_frontier;
4723
4724 info!(
4725 %id, %replacement_id, ?target_upper, ?replacement_upper,
4726 "preparing materialized view replacement application",
4727 );
4728
4729 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4730 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4739 name: target.name().item.clone(),
4740 }));
4741 return;
4742 };
4743
4744 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4748
4749 self.install_storage_watch_set(
4753 ctx.session().conn_id().clone(),
4754 BTreeSet::from_iter([target_gid]),
4755 replacement_upper_ts,
4756 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4757 ctx: Some(ctx),
4758 otel_ctx: OpenTelemetryContext::obtain(),
4759 plan,
4760 plan_validity,
4761 }),
4762 )
4763 .expect("target collection exists");
4764 }
4765
4766 #[instrument]
4768 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4769 &mut self,
4770 mut ctx: AlterMaterializedViewReadyContext,
4771 ) {
4772 ctx.otel_ctx.attach_as_parent();
4773
4774 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4775
4776 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4781 ctx.retire(Err(err));
4782 return;
4783 }
4784
4785 info!(
4786 %id, %replacement_id,
4787 "finishing materialized view replacement application",
4788 );
4789
4790 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4791 match self
4792 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4793 .await
4794 {
4795 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4796 ObjectType::MaterializedView,
4797 ))),
4798 Err(err) => ctx.retire(Err(err)),
4799 }
4800 }
4801
4802 pub(super) async fn statistics_oracle(
4803 &self,
4804 session: &Session,
4805 source_ids: &BTreeSet<GlobalId>,
4806 query_as_of: &Antichain<Timestamp>,
4807 is_oneshot: bool,
4808 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4809 super::statistics_oracle(
4810 session,
4811 source_ids,
4812 query_as_of,
4813 is_oneshot,
4814 self.catalog().system_config(),
4815 self.controller.storage_collections.as_ref(),
4816 )
4817 .await
4818 }
4819}
4820
4821impl Coordinator {
4822 async fn process_dataflow_metainfo(
4824 &mut self,
4825 df_meta: DataflowMetainfo,
4826 export_id: GlobalId,
4827 ctx: Option<&mut ExecuteContext>,
4828 notice_ids: Vec<GlobalId>,
4829 ) -> Option<BuiltinTableAppendNotify> {
4830 if let Some(ctx) = ctx {
4832 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4833 }
4834
4835 let df_meta = self
4837 .catalog()
4838 .render_notices(df_meta, notice_ids, Some(export_id));
4839
4840 if self.catalog().state().system_config().enable_mz_notices()
4843 && !df_meta.optimizer_notices.is_empty()
4844 {
4845 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4846 self.catalog().state().pack_optimizer_notices(
4847 &mut builtin_table_updates,
4848 df_meta.optimizer_notices.iter(),
4849 Diff::ONE,
4850 );
4851
4852 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4854
4855 Some(
4856 self.builtin_table_update()
4857 .execute(builtin_table_updates)
4858 .await
4859 .0,
4860 )
4861 } else {
4862 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4864
4865 None
4866 }
4867 }
4868}