1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44 RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49 SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
55 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59 SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63 StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
101use crate::coord::read_then_write::validate_read_then_write_dependencies;
102use crate::coord::sequencer::emit_optimizer_notices;
103use crate::coord::{
104 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
105 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
106 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
107 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
108 WatchSetResponse, validate_ip_with_policy_rules,
109};
110use crate::error::AdapterError;
111use crate::notice::{AdapterNotice, DroppedInUseIndex};
112use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
113use crate::optimize::{self, Optimize};
114use crate::session::{
115 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
116 WriteLocks, WriteOp,
117};
118use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
119use crate::{PeekResponseUnary, ReadHolds};
120
121type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
123
124mod cluster;
125mod copy_from;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134macro_rules! return_if_err {
137 ($expr:expr, $ctx:expr) => {
138 match $expr {
139 Ok(v) => v,
140 Err(e) => return $ctx.retire(Err(e.into())),
141 }
142 };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148 ops: Vec<catalog::Op>,
149 dropped_active_db: bool,
150 dropped_active_cluster: bool,
151 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154struct CreateSourceInner {
156 ops: Vec<catalog::Op>,
157 sources: Vec<(CatalogItemId, Source)>,
158 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162 pub(crate) async fn sequence_staged<S>(
167 &mut self,
168 mut ctx: S::Ctx,
169 parent_span: Span,
170 mut stage: S,
171 ) where
172 S: Staged + 'static,
173 S::Ctx: Send + 'static,
174 {
175 return_if_err!(stage.validity().check(self.catalog()), ctx);
176 loop {
177 let mut cancel_enabled = stage.cancel_enabled();
178 if let Some(session) = ctx.session() {
179 if cancel_enabled {
180 if let Some((_prev_tx, prev_rx)) = self
183 .staged_cancellation
184 .insert(session.conn_id().clone(), watch::channel(false))
185 {
186 let was_canceled = *prev_rx.borrow();
187 if was_canceled {
188 ctx.retire(Err(AdapterError::Canceled));
189 return;
190 }
191 }
192 } else {
193 self.staged_cancellation.remove(session.conn_id());
196 }
197 } else {
198 cancel_enabled = false
199 };
200 let next = stage
201 .stage(self, &mut ctx)
202 .instrument(parent_span.clone())
203 .await;
204 let res = return_if_err!(next, ctx);
205 stage = match res {
206 StageResult::Handle(handle) => {
207 let internal_cmd_tx = self.internal_cmd_tx.clone();
208 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210 });
211 return;
212 }
213 StageResult::HandleRetire(handle) => {
214 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215 ctx.retire(Ok(resp));
216 });
217 return;
218 }
219 StageResult::Response(resp) => {
220 ctx.retire(Ok(resp));
221 return;
222 }
223 StageResult::Immediate(stage) => *stage,
224 }
225 }
226 }
227
228 fn handle_spawn<C, T, F>(
229 &self,
230 ctx: C,
231 handle: JoinHandle<Result<T, AdapterError>>,
232 cancel_enabled: bool,
233 f: F,
234 ) where
235 C: StagedContext + Send + 'static,
236 T: Send + 'static,
237 F: FnOnce(C, T) + Send + 'static,
238 {
239 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240 .session()
241 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242 {
243 let mut rx = rx.clone();
244 Box::pin(async move {
245 let _ = rx.wait_for(|v| *v).await;
247 ()
248 })
249 } else {
250 Box::pin(future::pending())
251 };
252 spawn(|| "sequence_staged", async move {
253 tokio::select! {
254 res = handle => {
255 let next = return_if_err!(res, ctx);
256 f(ctx, next);
257 }
258 _ = rx, if cancel_enabled => {
259 ctx.retire(Err(AdapterError::Canceled));
260 }
261 }
262 });
263 }
264
265 async fn create_source_inner(
266 &self,
267 session: &Session,
268 plans: Vec<plan::CreateSourcePlanBundle>,
269 ) -> Result<CreateSourceInner, AdapterError> {
270 let mut ops = vec![];
271 let mut sources = vec![];
272
273 let if_not_exists_ids = plans
274 .iter()
275 .filter_map(
276 |plan::CreateSourcePlanBundle {
277 item_id,
278 global_id: _,
279 plan,
280 resolved_ids: _,
281 available_source_references: _,
282 }| {
283 if plan.if_not_exists {
284 Some((*item_id, plan.name.clone()))
285 } else {
286 None
287 }
288 },
289 )
290 .collect::<BTreeMap<_, _>>();
291
292 for plan::CreateSourcePlanBundle {
293 item_id,
294 global_id,
295 mut plan,
296 resolved_ids,
297 available_source_references,
298 } in plans
299 {
300 let name = plan.name.clone();
301
302 if let mz_sql::plan::DataSourceDesc::Webhook {
304 validate_using: Some(validate),
305 ..
306 } = &mut plan.source.data_source
307 {
308 if let Err(reason) = validate.reduce_expression().await {
309 self.metrics
310 .webhook_validation_reduce_failures
311 .with_label_values(&[reason])
312 .inc();
313 return Err(AdapterError::Internal(format!(
314 "failed to reduce check expression, {reason}"
315 )));
316 }
317 }
318
319 let mut reference_ops = vec![];
322 if let Some(references) = &available_source_references {
323 reference_ops.push(catalog::Op::UpdateSourceReferences {
324 source_id: item_id,
325 references: references.clone().into(),
326 });
327 }
328
329 let source = Source::new(plan, global_id, resolved_ids, None, false);
330 ops.push(catalog::Op::CreateItem {
331 id: item_id,
332 name,
333 item: CatalogItem::Source(source.clone()),
334 owner_id: *session.current_role_id(),
335 });
336 sources.push((item_id, source));
337 ops.extend(reference_ops);
339 }
340
341 Ok(CreateSourceInner {
342 ops,
343 sources,
344 if_not_exists_ids,
345 })
346 }
347
348 pub(crate) fn plan_subsource(
356 &self,
357 session: &Session,
358 params: &mz_sql::plan::Params,
359 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
360 item_id: CatalogItemId,
361 global_id: GlobalId,
362 ) -> Result<CreateSourcePlanBundle, AdapterError> {
363 let catalog = self.catalog().for_session(session);
364 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
365
366 let plan = self.plan_statement(
367 session,
368 Statement::CreateSubsource(subsource_stmt),
369 params,
370 &resolved_ids,
371 )?;
372 let plan = match plan {
373 Plan::CreateSource(plan) => plan,
374 _ => unreachable!(),
375 };
376 Ok(CreateSourcePlanBundle {
377 item_id,
378 global_id,
379 plan,
380 resolved_ids,
381 available_source_references: None,
382 })
383 }
384
385 pub(crate) async fn plan_purified_alter_source_add_subsource(
387 &mut self,
388 session: &Session,
389 params: Params,
390 source_name: ResolvedItemName,
391 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
392 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
393 ) -> Result<(Plan, ResolvedIds), AdapterError> {
394 let mut subsource_plans = Vec::with_capacity(subsources.len());
395
396 let conn_catalog = self.catalog().for_system_session();
398 let pcx = plan::PlanContext::zero();
399 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
400
401 let entry = self.catalog().get_entry(source_name.item_id());
402 let source = entry.source().ok_or_else(|| {
403 AdapterError::internal(
404 "plan alter source",
405 format!("expected Source found {entry:?}"),
406 )
407 })?;
408
409 let item_id = entry.id();
410 let ingestion_id = source.global_id();
411 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
412
413 let ids = self
414 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
415 .await?;
416 for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
417 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
418 subsource_plans.push(s);
419 }
420
421 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
422 subsources: subsource_plans,
423 options,
424 };
425
426 Ok((
427 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
428 item_id,
429 ingestion_id,
430 action,
431 }),
432 ResolvedIds::empty(),
433 ))
434 }
435
436 pub(crate) fn plan_purified_alter_source_refresh_references(
438 &self,
439 _session: &Session,
440 _params: Params,
441 source_name: ResolvedItemName,
442 available_source_references: plan::SourceReferences,
443 ) -> Result<(Plan, ResolvedIds), AdapterError> {
444 let entry = self.catalog().get_entry(source_name.item_id());
445 let source = entry.source().ok_or_else(|| {
446 AdapterError::internal(
447 "plan alter source",
448 format!("expected Source found {entry:?}"),
449 )
450 })?;
451 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
452 references: available_source_references,
453 };
454
455 Ok((
456 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
457 item_id: entry.id(),
458 ingestion_id: source.global_id(),
459 action,
460 }),
461 ResolvedIds::empty(),
462 ))
463 }
464
465 pub(crate) async fn plan_purified_create_source(
469 &mut self,
470 ctx: &ExecuteContext,
471 params: Params,
472 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
473 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
474 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
475 available_source_references: plan::SourceReferences,
476 ) -> Result<(Plan, ResolvedIds), AdapterError> {
477 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
478
479 if let Some(progress_stmt) = progress_stmt {
481 assert_none!(progress_stmt.of_source);
486 let (item_id, global_id) = self.allocate_user_id().await?;
487 let progress_plan =
488 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
489 let progress_full_name = self
490 .catalog()
491 .resolve_full_name(&progress_plan.plan.name, None);
492 let progress_subsource = ResolvedItemName::Item {
493 id: progress_plan.item_id,
494 qualifiers: progress_plan.plan.name.qualifiers.clone(),
495 full_name: progress_full_name,
496 print_id: true,
497 version: RelationVersionSelector::Latest,
498 };
499
500 create_source_plans.push(progress_plan);
501
502 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
503 }
504
505 let catalog = self.catalog().for_session(ctx.session());
506 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
507
508 let propagated_with_options: Vec<_> = source_stmt
509 .with_options
510 .iter()
511 .filter_map(|opt| match opt.name {
512 CreateSourceOptionName::TimestampInterval => None,
513 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
514 name: CreateSubsourceOptionName::RetainHistory,
515 value: opt.value.clone(),
516 }),
517 })
518 .collect();
519
520 let source_plan = match self.plan_statement(
522 ctx.session(),
523 Statement::CreateSource(source_stmt),
524 ¶ms,
525 &resolved_ids,
526 )? {
527 Plan::CreateSource(plan) => plan,
528 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
529 };
530
531 let (item_id, global_id) = self.allocate_user_id().await?;
532
533 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
534 let of_source = ResolvedItemName::Item {
535 id: item_id,
536 qualifiers: source_plan.name.qualifiers.clone(),
537 full_name: source_full_name,
538 print_id: true,
539 version: RelationVersionSelector::Latest,
540 };
541
542 let conn_catalog = self.catalog().for_system_session();
544 let pcx = plan::PlanContext::zero();
545 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
546
547 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
548
549 for subsource_stmt in subsource_stmts.iter_mut() {
550 subsource_stmt
551 .with_options
552 .extend(propagated_with_options.iter().cloned())
553 }
554
555 create_source_plans.push(CreateSourcePlanBundle {
556 item_id,
557 global_id,
558 plan: source_plan,
559 resolved_ids: resolved_ids.clone(),
560 available_source_references: Some(available_source_references),
561 });
562
563 let ids = self
565 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
566 .await?;
567 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
568 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
569 create_source_plans.push(plan);
570 }
571
572 Ok((
573 Plan::CreateSources(create_source_plans),
574 ResolvedIds::empty(),
575 ))
576 }
577
578 #[instrument]
579 pub(super) async fn sequence_create_source(
580 &mut self,
581 ctx: &mut ExecuteContext,
582 plans: Vec<plan::CreateSourcePlanBundle>,
583 ) -> Result<ExecuteResponse, AdapterError> {
584 let CreateSourceInner {
585 ops,
586 sources,
587 if_not_exists_ids,
588 } = self.create_source_inner(ctx.session(), plans).await?;
589
590 let transact_result = self
591 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
592 .await;
593
594 for (item_id, source) in &sources {
596 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
597 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
598 ctx.session()
599 .add_notice(AdapterNotice::WebhookSourceCreated { url });
600 }
601 }
602 }
603
604 match transact_result {
605 Ok(()) => Ok(ExecuteResponse::CreatedSource),
606 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
607 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
608 })) if if_not_exists_ids.contains_key(&id) => {
609 ctx.session()
610 .add_notice(AdapterNotice::ObjectAlreadyExists {
611 name: if_not_exists_ids[&id].item.clone(),
612 ty: "source",
613 });
614 Ok(ExecuteResponse::CreatedSource)
615 }
616 Err(err) => Err(err),
617 }
618 }
619
620 #[instrument]
621 pub(super) async fn sequence_create_connection(
622 &mut self,
623 mut ctx: ExecuteContext,
624 plan: plan::CreateConnectionPlan,
625 resolved_ids: ResolvedIds,
626 ) {
627 let (connection_id, connection_gid) = match self.allocate_user_id().await {
628 Ok(item_id) => item_id,
629 Err(err) => return ctx.retire(Err(err)),
630 };
631
632 match &plan.connection.details {
633 ConnectionDetails::Ssh { key_1, key_2, .. } => {
634 let key_1 = match key_1.as_key_pair() {
635 Some(key_1) => key_1.clone(),
636 None => {
637 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
638 "the PUBLIC KEY 1 option cannot be explicitly specified"
639 ))));
640 }
641 };
642
643 let key_2 = match key_2.as_key_pair() {
644 Some(key_2) => key_2.clone(),
645 None => {
646 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
647 "the PUBLIC KEY 2 option cannot be explicitly specified"
648 ))));
649 }
650 };
651
652 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
653 let secret = key_set.to_bytes();
654 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
655 return ctx.retire(Err(err.into()));
656 }
657 self.caching_secrets_reader.invalidate(connection_id);
658 }
659 _ => (),
660 };
661
662 if plan.validate {
663 let internal_cmd_tx = self.internal_cmd_tx.clone();
664 let transient_revision = self.catalog().transient_revision();
665 let conn_id = ctx.session().conn_id().clone();
666 let otel_ctx = OpenTelemetryContext::obtain();
667 let role_metadata = ctx.session().role_metadata().clone();
668
669 let connection = plan
670 .connection
671 .details
672 .to_connection()
673 .into_inline_connection(self.catalog().state());
674
675 let current_storage_parameters = self.controller.storage.config().clone();
676 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
677 let result = match std::panic::AssertUnwindSafe(
678 connection.validate(connection_id, ¤t_storage_parameters),
679 )
680 .ore_catch_unwind()
681 .await
682 {
683 Ok(Ok(())) => Ok(plan),
684 Ok(Err(err)) => Err(err.into()),
685 Err(_panic) => {
686 tracing::error!("connection validation panicked");
687 Err(AdapterError::Internal(
688 "connection validation panicked".into(),
689 ))
690 }
691 };
692
693 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
695 CreateConnectionValidationReady {
696 ctx,
697 result,
698 connection_id,
699 connection_gid,
700 plan_validity: PlanValidity::new(
701 transient_revision,
702 resolved_ids.items().copied().collect(),
703 None,
704 None,
705 role_metadata,
706 ),
707 otel_ctx,
708 resolved_ids: resolved_ids.clone(),
709 },
710 ));
711 if let Err(e) = result {
712 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
713 }
714 });
715 } else {
716 let result = self
717 .sequence_create_connection_stage_finish(
718 &mut ctx,
719 connection_id,
720 connection_gid,
721 plan,
722 resolved_ids,
723 )
724 .await;
725 ctx.retire(result);
726 }
727 }
728
729 #[instrument]
730 pub(crate) async fn sequence_create_connection_stage_finish(
731 &mut self,
732 ctx: &mut ExecuteContext,
733 connection_id: CatalogItemId,
734 connection_gid: GlobalId,
735 plan: plan::CreateConnectionPlan,
736 resolved_ids: ResolvedIds,
737 ) -> Result<ExecuteResponse, AdapterError> {
738 let ops = vec![catalog::Op::CreateItem {
739 id: connection_id,
740 name: plan.name.clone(),
741 item: CatalogItem::Connection(Connection {
742 create_sql: plan.connection.create_sql,
743 global_id: connection_gid,
744 details: plan.connection.details.clone(),
745 resolved_ids,
746 }),
747 owner_id: *ctx.session().current_role_id(),
748 }];
749
750 let conn_id = ctx.session().conn_id().clone();
753 let transact_result = self
754 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
755 .await;
756
757 match transact_result {
758 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
759 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
760 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
761 })) if plan.if_not_exists => {
762 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
765 if let Err(e) = self.secrets_controller.delete(connection_id).await {
766 tracing::warn!(
767 "Dropping SSH secret for existing connection has encountered an error: {}",
768 e
769 );
770 } else {
771 self.caching_secrets_reader.invalidate(connection_id);
772 }
773 }
774 ctx.session()
775 .add_notice(AdapterNotice::ObjectAlreadyExists {
776 name: plan.name.item,
777 ty: "connection",
778 });
779 Ok(ExecuteResponse::CreatedConnection)
780 }
781 Err(err) => {
782 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
785 if let Err(e) = self.secrets_controller.delete(connection_id).await {
786 tracing::warn!(
787 "Dropping SSH secret for failed connection has encountered an error: {}",
788 e
789 );
790 } else {
791 self.caching_secrets_reader.invalidate(connection_id);
792 }
793 }
794 Err(err)
795 }
796 }
797 }
798
799 #[instrument]
800 pub(super) async fn sequence_create_database(
801 &mut self,
802 session: &Session,
803 plan: plan::CreateDatabasePlan,
804 ) -> Result<ExecuteResponse, AdapterError> {
805 let ops = vec![catalog::Op::CreateDatabase {
806 name: plan.name.clone(),
807 owner_id: *session.current_role_id(),
808 }];
809 match self.catalog_transact(Some(session), ops).await {
810 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
811 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
812 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
813 })) if plan.if_not_exists => {
814 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
815 Ok(ExecuteResponse::CreatedDatabase)
816 }
817 Err(err) => Err(err),
818 }
819 }
820
821 #[instrument]
822 pub(super) async fn sequence_create_schema(
823 &mut self,
824 session: &Session,
825 plan: plan::CreateSchemaPlan,
826 ) -> Result<ExecuteResponse, AdapterError> {
827 let op = catalog::Op::CreateSchema {
828 database_id: plan.database_spec,
829 schema_name: plan.schema_name.clone(),
830 owner_id: *session.current_role_id(),
831 };
832 match self.catalog_transact(Some(session), vec![op]).await {
833 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
834 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
835 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
836 })) if plan.if_not_exists => {
837 session.add_notice(AdapterNotice::SchemaAlreadyExists {
838 name: plan.schema_name,
839 });
840 Ok(ExecuteResponse::CreatedSchema)
841 }
842 Err(err) => Err(err),
843 }
844 }
845
846 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
848 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
849 if attributes.superuser.is_some() || attributes.password.is_some() {
850 return Err(AdapterError::UnavailableFeature {
851 feature: "SUPERUSER and PASSWORD attributes".to_string(),
852 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
853 });
854 }
855 }
856 Ok(())
857 }
858
859 #[instrument]
860 pub(super) async fn sequence_create_role(
861 &mut self,
862 conn_id: Option<&ConnectionId>,
863 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
864 ) -> Result<ExecuteResponse, AdapterError> {
865 self.validate_role_attributes(&attributes.clone())?;
866 let op = catalog::Op::CreateRole { name, attributes };
867 self.catalog_transact_with_context(conn_id, None, vec![op])
868 .await
869 .map(|_| ExecuteResponse::CreatedRole)
870 }
871
872 #[instrument]
873 pub(super) async fn sequence_create_network_policy(
874 &mut self,
875 session: &Session,
876 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
877 ) -> Result<ExecuteResponse, AdapterError> {
878 let op = catalog::Op::CreateNetworkPolicy {
879 rules,
880 name,
881 owner_id: *session.current_role_id(),
882 };
883 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
884 .await
885 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
886 }
887
888 #[instrument]
889 pub(super) async fn sequence_alter_network_policy(
890 &mut self,
891 session: &Session,
892 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
893 ) -> Result<ExecuteResponse, AdapterError> {
894 let current_network_policy_name =
896 self.catalog().system_config().default_network_policy_name();
897 if current_network_policy_name == name {
899 self.validate_alter_network_policy(session, &rules)?;
900 }
901
902 let op = catalog::Op::AlterNetworkPolicy {
903 id,
904 rules,
905 name,
906 owner_id: *session.current_role_id(),
907 };
908 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
909 .await
910 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
911 }
912
913 #[instrument]
914 pub(super) async fn sequence_create_table(
915 &mut self,
916 ctx: &mut ExecuteContext,
917 plan: plan::CreateTablePlan,
918 resolved_ids: ResolvedIds,
919 ) -> Result<ExecuteResponse, AdapterError> {
920 let plan::CreateTablePlan {
921 name,
922 table,
923 if_not_exists,
924 } = plan;
925
926 let conn_id = if table.temporary {
927 Some(ctx.session().conn_id())
928 } else {
929 None
930 };
931 let (table_id, global_id) = self.allocate_user_id().await?;
932 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
933
934 let data_source = match table.data_source {
935 plan::TableDataSource::TableWrites { defaults } => {
936 TableDataSource::TableWrites { defaults }
937 }
938 plan::TableDataSource::DataSource {
939 desc: data_source_plan,
940 timeline,
941 } => match data_source_plan {
942 plan::DataSourceDesc::IngestionExport {
943 ingestion_id,
944 external_reference,
945 details,
946 data_config,
947 } => TableDataSource::DataSource {
948 desc: DataSourceDesc::IngestionExport {
949 ingestion_id,
950 external_reference,
951 details,
952 data_config,
953 },
954 timeline,
955 },
956 plan::DataSourceDesc::Webhook {
957 validate_using,
958 body_format,
959 headers,
960 cluster_id,
961 } => TableDataSource::DataSource {
962 desc: DataSourceDesc::Webhook {
963 validate_using,
964 body_format,
965 headers,
966 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
967 },
968 timeline,
969 },
970 o => {
971 unreachable!("CREATE TABLE data source got {:?}", o)
972 }
973 },
974 };
975
976 let is_webhook = if let TableDataSource::DataSource {
977 desc: DataSourceDesc::Webhook { .. },
978 timeline: _,
979 } = &data_source
980 {
981 true
982 } else {
983 false
984 };
985
986 let table = Table {
987 create_sql: Some(table.create_sql),
988 desc: table.desc,
989 collections,
990 conn_id: conn_id.cloned(),
991 resolved_ids,
992 custom_logical_compaction_window: table.compaction_window,
993 is_retained_metrics_object: false,
994 data_source,
995 };
996 let ops = vec![catalog::Op::CreateItem {
997 id: table_id,
998 name: name.clone(),
999 item: CatalogItem::Table(table.clone()),
1000 owner_id: *ctx.session().current_role_id(),
1001 }];
1002
1003 let catalog_result = self
1004 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1005 .await;
1006
1007 if is_webhook {
1008 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1011 ctx.session()
1012 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1013 }
1014 }
1015
1016 match catalog_result {
1017 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1018 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1019 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1020 })) if if_not_exists => {
1021 ctx.session_mut()
1022 .add_notice(AdapterNotice::ObjectAlreadyExists {
1023 name: name.item,
1024 ty: "table",
1025 });
1026 Ok(ExecuteResponse::CreatedTable)
1027 }
1028 Err(err) => Err(err),
1029 }
1030 }
1031
1032 #[instrument]
1033 pub(super) async fn sequence_create_sink(
1034 &mut self,
1035 ctx: ExecuteContext,
1036 plan: plan::CreateSinkPlan,
1037 resolved_ids: ResolvedIds,
1038 ) {
1039 let plan::CreateSinkPlan {
1040 name,
1041 sink,
1042 with_snapshot,
1043 if_not_exists,
1044 in_cluster,
1045 } = plan;
1046
1047 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1049
1050 let catalog_sink = Sink {
1051 create_sql: sink.create_sql,
1052 global_id,
1053 from: sink.from,
1054 connection: sink.connection,
1055 envelope: sink.envelope,
1056 version: sink.version,
1057 with_snapshot,
1058 resolved_ids,
1059 cluster_id: in_cluster,
1060 commit_interval: sink.commit_interval,
1061 };
1062
1063 let ops = vec![catalog::Op::CreateItem {
1064 id: item_id,
1065 name: name.clone(),
1066 item: CatalogItem::Sink(catalog_sink.clone()),
1067 owner_id: *ctx.session().current_role_id(),
1068 }];
1069
1070 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1071
1072 match result {
1073 Ok(()) => {}
1074 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1075 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1076 })) if if_not_exists => {
1077 ctx.session()
1078 .add_notice(AdapterNotice::ObjectAlreadyExists {
1079 name: name.item,
1080 ty: "sink",
1081 });
1082 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1083 return;
1084 }
1085 Err(e) => {
1086 ctx.retire(Err(e));
1087 return;
1088 }
1089 };
1090
1091 self.create_storage_export(global_id, &catalog_sink)
1092 .await
1093 .unwrap_or_terminate("cannot fail to create exports");
1094
1095 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1096 .await;
1097
1098 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1099 }
1100
1101 pub(super) fn validate_system_column_references(
1124 &self,
1125 uses_ambiguous_columns: bool,
1126 depends_on: &BTreeSet<GlobalId>,
1127 ) -> Result<(), AdapterError> {
1128 if uses_ambiguous_columns
1129 && depends_on
1130 .iter()
1131 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1132 {
1133 Err(AdapterError::AmbiguousSystemColumnReference)
1134 } else {
1135 Ok(())
1136 }
1137 }
1138
1139 #[instrument]
1140 pub(super) async fn sequence_create_type(
1141 &mut self,
1142 session: &Session,
1143 plan: plan::CreateTypePlan,
1144 resolved_ids: ResolvedIds,
1145 ) -> Result<ExecuteResponse, AdapterError> {
1146 let (item_id, global_id) = self.allocate_user_id().await?;
1147 plan.typ
1149 .inner
1150 .desc(&self.catalog().for_session(session))
1151 .map_err(AdapterError::from)?;
1152 let typ = Type {
1153 create_sql: Some(plan.typ.create_sql),
1154 global_id,
1155 details: CatalogTypeDetails {
1156 array_id: None,
1157 typ: plan.typ.inner,
1158 pg_metadata: None,
1159 },
1160 resolved_ids,
1161 };
1162 let op = catalog::Op::CreateItem {
1163 id: item_id,
1164 name: plan.name,
1165 item: CatalogItem::Type(typ),
1166 owner_id: *session.current_role_id(),
1167 };
1168 match self.catalog_transact(Some(session), vec![op]).await {
1169 Ok(()) => Ok(ExecuteResponse::CreatedType),
1170 Err(err) => Err(err),
1171 }
1172 }
1173
1174 #[instrument]
1175 pub(super) async fn sequence_comment_on(
1176 &mut self,
1177 session: &Session,
1178 plan: plan::CommentPlan,
1179 ) -> Result<ExecuteResponse, AdapterError> {
1180 let op = catalog::Op::Comment {
1181 object_id: plan.object_id,
1182 sub_component: plan.sub_component,
1183 comment: plan.comment,
1184 };
1185 self.catalog_transact(Some(session), vec![op]).await?;
1186 Ok(ExecuteResponse::Comment)
1187 }
1188
1189 #[instrument]
1190 pub(super) async fn sequence_drop_objects(
1191 &mut self,
1192 ctx: &mut ExecuteContext,
1193 plan::DropObjectsPlan {
1194 drop_ids,
1195 object_type,
1196 referenced_ids,
1197 }: plan::DropObjectsPlan,
1198 ) -> Result<ExecuteResponse, AdapterError> {
1199 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1200 let mut objects = Vec::new();
1201 for obj_id in &drop_ids {
1202 if !referenced_ids_hashset.contains(obj_id) {
1203 let object_info = ErrorMessageObjectDescription::from_id(
1204 obj_id,
1205 &self.catalog().for_session(ctx.session()),
1206 )
1207 .to_string();
1208 objects.push(object_info);
1209 }
1210 }
1211
1212 if !objects.is_empty() {
1213 ctx.session()
1214 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1215 }
1216
1217 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1219 .iter()
1220 .filter_map(|id| match id {
1221 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1222 _ => None,
1223 })
1224 .flatten()
1225 .collect();
1226
1227 let DropOps {
1228 ops,
1229 dropped_active_db,
1230 dropped_active_cluster,
1231 dropped_in_use_indexes,
1232 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1233
1234 self.catalog_transact_with_context(None, Some(ctx), ops)
1235 .await?;
1236
1237 if !expr_cache_invalidate_ids.is_empty() {
1239 let _fut = self.catalog().update_expression_cache(
1240 Default::default(),
1241 Default::default(),
1242 expr_cache_invalidate_ids,
1243 );
1244 }
1245
1246 fail::fail_point!("after_sequencer_drop_replica");
1247
1248 if dropped_active_db {
1249 ctx.session()
1250 .add_notice(AdapterNotice::DroppedActiveDatabase {
1251 name: ctx.session().vars().database().to_string(),
1252 });
1253 }
1254 if dropped_active_cluster {
1255 ctx.session()
1256 .add_notice(AdapterNotice::DroppedActiveCluster {
1257 name: ctx.session().vars().cluster().to_string(),
1258 });
1259 }
1260 for dropped_in_use_index in dropped_in_use_indexes {
1261 ctx.session()
1262 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1263 self.metrics
1264 .optimization_notices
1265 .with_label_values(&["DroppedInUseIndex"])
1266 .inc_by(1);
1267 }
1268 Ok(ExecuteResponse::DroppedObject(object_type))
1269 }
1270
1271 fn validate_dropped_role_ownership(
1272 &self,
1273 session: &Session,
1274 dropped_roles: &BTreeMap<RoleId, &str>,
1275 ) -> Result<(), AdapterError> {
1276 fn privilege_check(
1277 privileges: &PrivilegeMap,
1278 dropped_roles: &BTreeMap<RoleId, &str>,
1279 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1280 object_id: &SystemObjectId,
1281 catalog: &ConnCatalog,
1282 ) {
1283 for privilege in privileges.all_values() {
1284 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1285 let grantor_name = catalog.get_role(&privilege.grantor).name();
1286 let object_description =
1287 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1288 dependent_objects
1289 .entry(role_name.to_string())
1290 .or_default()
1291 .push(format!(
1292 "privileges on {object_description} granted by {grantor_name}",
1293 ));
1294 }
1295 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1296 let grantee_name = catalog.get_role(&privilege.grantee).name();
1297 let object_description =
1298 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1299 dependent_objects
1300 .entry(role_name.to_string())
1301 .or_default()
1302 .push(format!(
1303 "privileges granted on {object_description} to {grantee_name}"
1304 ));
1305 }
1306 }
1307 }
1308
1309 let catalog = self.catalog().for_session(session);
1310 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1311 for entry in self.catalog.entries() {
1312 let id = SystemObjectId::Object(entry.id().into());
1313 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1314 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1315 dependent_objects
1316 .entry(role_name.to_string())
1317 .or_default()
1318 .push(format!("owner of {object_description}"));
1319 }
1320 privilege_check(
1321 entry.privileges(),
1322 dropped_roles,
1323 &mut dependent_objects,
1324 &id,
1325 &catalog,
1326 );
1327 }
1328 for database in self.catalog.databases() {
1329 let database_id = SystemObjectId::Object(database.id().into());
1330 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1331 let object_description =
1332 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1333 dependent_objects
1334 .entry(role_name.to_string())
1335 .or_default()
1336 .push(format!("owner of {object_description}"));
1337 }
1338 privilege_check(
1339 &database.privileges,
1340 dropped_roles,
1341 &mut dependent_objects,
1342 &database_id,
1343 &catalog,
1344 );
1345 for schema in database.schemas_by_id.values() {
1346 let schema_id = SystemObjectId::Object(
1347 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1348 );
1349 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1350 let object_description =
1351 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1352 dependent_objects
1353 .entry(role_name.to_string())
1354 .or_default()
1355 .push(format!("owner of {object_description}"));
1356 }
1357 privilege_check(
1358 &schema.privileges,
1359 dropped_roles,
1360 &mut dependent_objects,
1361 &schema_id,
1362 &catalog,
1363 );
1364 }
1365 }
1366 for cluster in self.catalog.clusters() {
1367 let cluster_id = SystemObjectId::Object(cluster.id().into());
1368 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1369 let object_description =
1370 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1371 dependent_objects
1372 .entry(role_name.to_string())
1373 .or_default()
1374 .push(format!("owner of {object_description}"));
1375 }
1376 privilege_check(
1377 &cluster.privileges,
1378 dropped_roles,
1379 &mut dependent_objects,
1380 &cluster_id,
1381 &catalog,
1382 );
1383 for replica in cluster.replicas() {
1384 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1385 let replica_id =
1386 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1387 let object_description =
1388 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1389 dependent_objects
1390 .entry(role_name.to_string())
1391 .or_default()
1392 .push(format!("owner of {object_description}"));
1393 }
1394 }
1395 }
1396 privilege_check(
1397 self.catalog().system_privileges(),
1398 dropped_roles,
1399 &mut dependent_objects,
1400 &SystemObjectId::System,
1401 &catalog,
1402 );
1403 for (default_privilege_object, default_privilege_acl_items) in
1404 self.catalog.default_privileges()
1405 {
1406 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1407 dependent_objects
1408 .entry(role_name.to_string())
1409 .or_default()
1410 .push(format!(
1411 "default privileges on {}S created by {}",
1412 default_privilege_object.object_type, role_name
1413 ));
1414 }
1415 for default_privilege_acl_item in default_privilege_acl_items {
1416 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1417 dependent_objects
1418 .entry(role_name.to_string())
1419 .or_default()
1420 .push(format!(
1421 "default privileges on {}S granted to {}",
1422 default_privilege_object.object_type, role_name
1423 ));
1424 }
1425 }
1426 }
1427
1428 if !dependent_objects.is_empty() {
1429 Err(AdapterError::DependentObject(dependent_objects))
1430 } else {
1431 Ok(())
1432 }
1433 }
1434
1435 #[instrument]
1436 pub(super) async fn sequence_drop_owned(
1437 &mut self,
1438 session: &Session,
1439 plan: plan::DropOwnedPlan,
1440 ) -> Result<ExecuteResponse, AdapterError> {
1441 for role_id in &plan.role_ids {
1442 self.catalog().ensure_not_reserved_role(role_id)?;
1443 }
1444
1445 let mut privilege_revokes = plan.privilege_revokes;
1446
1447 let session_catalog = self.catalog().for_session(session);
1449 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1450 && !session.is_superuser()
1451 {
1452 let role_membership =
1454 session_catalog.collect_role_membership(session.current_role_id());
1455 let invalid_revokes: BTreeSet<_> = privilege_revokes
1456 .extract_if(.., |(_, privilege)| {
1457 !role_membership.contains(&privilege.grantor)
1458 })
1459 .map(|(object_id, _)| object_id)
1460 .collect();
1461 for invalid_revoke in invalid_revokes {
1462 let object_description =
1463 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1464 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1465 }
1466 }
1467
1468 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1469 catalog::Op::UpdatePrivilege {
1470 target_id: object_id,
1471 privilege,
1472 variant: UpdatePrivilegeVariant::Revoke,
1473 }
1474 });
1475 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1476 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1477 privilege_object,
1478 privilege_acl_item,
1479 variant: UpdatePrivilegeVariant::Revoke,
1480 },
1481 );
1482 let DropOps {
1483 ops: drop_ops,
1484 dropped_active_db,
1485 dropped_active_cluster,
1486 dropped_in_use_indexes,
1487 } = self.sequence_drop_common(session, plan.drop_ids)?;
1488
1489 let ops = privilege_revoke_ops
1490 .chain(default_privilege_revoke_ops)
1491 .chain(drop_ops.into_iter())
1492 .collect();
1493
1494 self.catalog_transact(Some(session), ops).await?;
1495
1496 if dropped_active_db {
1497 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1498 name: session.vars().database().to_string(),
1499 });
1500 }
1501 if dropped_active_cluster {
1502 session.add_notice(AdapterNotice::DroppedActiveCluster {
1503 name: session.vars().cluster().to_string(),
1504 });
1505 }
1506 for dropped_in_use_index in dropped_in_use_indexes {
1507 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1508 }
1509 Ok(ExecuteResponse::DroppedOwned)
1510 }
1511
1512 fn sequence_drop_common(
1513 &self,
1514 session: &Session,
1515 ids: Vec<ObjectId>,
1516 ) -> Result<DropOps, AdapterError> {
1517 let mut dropped_active_db = false;
1518 let mut dropped_active_cluster = false;
1519 let mut dropped_in_use_indexes = Vec::new();
1520 let mut dropped_roles = BTreeMap::new();
1521 let mut dropped_databases = BTreeSet::new();
1522 let mut dropped_schemas = BTreeSet::new();
1523 let mut role_revokes = BTreeSet::new();
1527 let mut default_privilege_revokes = BTreeSet::new();
1530
1531 let mut clusters_to_drop = BTreeSet::new();
1533
1534 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1535 for id in &ids {
1536 match id {
1537 ObjectId::Database(id) => {
1538 let name = self.catalog().get_database(id).name();
1539 if name == session.vars().database() {
1540 dropped_active_db = true;
1541 }
1542 dropped_databases.insert(id);
1543 }
1544 ObjectId::Schema((_, spec)) => {
1545 if let SchemaSpecifier::Id(id) = spec {
1546 dropped_schemas.insert(id);
1547 }
1548 }
1549 ObjectId::Cluster(id) => {
1550 clusters_to_drop.insert(*id);
1551 if let Some(active_id) = self
1552 .catalog()
1553 .active_cluster(session)
1554 .ok()
1555 .map(|cluster| cluster.id())
1556 {
1557 if id == &active_id {
1558 dropped_active_cluster = true;
1559 }
1560 }
1561 }
1562 ObjectId::Role(id) => {
1563 let role = self.catalog().get_role(id);
1564 let name = role.name();
1565 dropped_roles.insert(*id, name);
1566 for (group_id, grantor_id) in &role.membership.map {
1568 role_revokes.insert((*group_id, *id, *grantor_id));
1569 }
1570 }
1571 ObjectId::Item(id) => {
1572 if let Some(index) = self.catalog().get_entry(id).index() {
1573 let humanizer = self.catalog().for_session(session);
1574 let dependants = self
1575 .controller
1576 .compute
1577 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1578 .ok()
1579 .into_iter()
1580 .flatten()
1581 .filter(|dependant_id| {
1582 if dependant_id.is_transient() {
1589 return false;
1590 }
1591 let Some(dependent_id) = humanizer
1593 .try_get_item_by_global_id(dependant_id)
1594 .map(|item| item.id())
1595 else {
1596 return false;
1597 };
1598 !ids_set.contains(&ObjectId::Item(dependent_id))
1601 })
1602 .flat_map(|dependant_id| {
1603 humanizer.humanize_id(dependant_id)
1607 })
1608 .collect_vec();
1609 if !dependants.is_empty() {
1610 dropped_in_use_indexes.push(DroppedInUseIndex {
1611 index_name: humanizer
1612 .humanize_id(index.global_id())
1613 .unwrap_or_else(|| id.to_string()),
1614 dependant_objects: dependants,
1615 });
1616 }
1617 }
1618 }
1619 _ => {}
1620 }
1621 }
1622
1623 for id in &ids {
1624 match id {
1625 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1629 if !clusters_to_drop.contains(cluster_id) {
1630 let cluster = self.catalog.get_cluster(*cluster_id);
1631 if cluster.is_managed() {
1632 let replica =
1633 cluster.replica(*replica_id).expect("Catalog out of sync");
1634 if !replica.config.location.internal() {
1635 coord_bail!("cannot drop replica of managed cluster");
1636 }
1637 }
1638 }
1639 }
1640 _ => {}
1641 }
1642 }
1643
1644 for role_id in dropped_roles.keys() {
1645 self.catalog().ensure_not_reserved_role(role_id)?;
1646 }
1647 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1648 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1650 for role in self.catalog().user_roles() {
1651 for dropped_role_id in
1652 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1653 {
1654 role_revokes.insert((
1655 **dropped_role_id,
1656 role.id(),
1657 *role
1658 .membership
1659 .map
1660 .get(*dropped_role_id)
1661 .expect("included in keys above"),
1662 ));
1663 }
1664 }
1665
1666 for (default_privilege_object, default_privilege_acls) in
1667 self.catalog().default_privileges()
1668 {
1669 if matches!(
1670 &default_privilege_object.database_id,
1671 Some(database_id) if dropped_databases.contains(database_id),
1672 ) || matches!(
1673 &default_privilege_object.schema_id,
1674 Some(schema_id) if dropped_schemas.contains(schema_id),
1675 ) {
1676 for default_privilege_acl in default_privilege_acls {
1677 default_privilege_revokes.insert((
1678 default_privilege_object.clone(),
1679 default_privilege_acl.clone(),
1680 ));
1681 }
1682 }
1683 }
1684
1685 let ops = role_revokes
1686 .into_iter()
1687 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1688 role_id,
1689 member_id,
1690 grantor_id,
1691 })
1692 .chain(default_privilege_revokes.into_iter().map(
1693 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1694 privilege_object,
1695 privilege_acl_item,
1696 variant: UpdatePrivilegeVariant::Revoke,
1697 },
1698 ))
1699 .chain(iter::once(catalog::Op::DropObjects(
1700 ids.into_iter()
1701 .map(DropObjectInfo::manual_drop_from_object_id)
1702 .collect(),
1703 )))
1704 .collect();
1705
1706 Ok(DropOps {
1707 ops,
1708 dropped_active_db,
1709 dropped_active_cluster,
1710 dropped_in_use_indexes,
1711 })
1712 }
1713
1714 pub(super) fn sequence_explain_schema(
1715 &self,
1716 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1717 ) -> Result<ExecuteResponse, AdapterError> {
1718 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1719 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1720 })?;
1721
1722 let json_string = json_string(&json_value);
1723 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1724 Ok(Self::send_immediate_rows(row))
1725 }
1726
1727 pub(super) fn sequence_show_all_variables(
1728 &self,
1729 session: &Session,
1730 ) -> Result<ExecuteResponse, AdapterError> {
1731 let mut rows = viewable_variables(self.catalog().state(), session)
1732 .map(|v| (v.name(), v.value(), v.description()))
1733 .collect::<Vec<_>>();
1734 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1735
1736 let rows: Vec<_> = rows
1738 .into_iter()
1739 .map(|(name, val, desc)| {
1740 Row::pack_slice(&[
1741 Datum::String(name),
1742 Datum::String(&val),
1743 Datum::String(desc),
1744 ])
1745 })
1746 .collect();
1747 Ok(Self::send_immediate_rows(rows))
1748 }
1749
1750 pub(super) fn sequence_show_variable(
1751 &self,
1752 session: &Session,
1753 plan: plan::ShowVariablePlan,
1754 ) -> Result<ExecuteResponse, AdapterError> {
1755 if &plan.name == SCHEMA_ALIAS {
1756 let schemas = self.catalog.resolve_search_path(session);
1757 let schema = schemas.first();
1758 return match schema {
1759 Some((database_spec, schema_spec)) => {
1760 let schema_name = &self
1761 .catalog
1762 .get_schema(database_spec, schema_spec, session.conn_id())
1763 .name()
1764 .schema;
1765 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1766 Ok(Self::send_immediate_rows(row))
1767 }
1768 None => {
1769 if session.vars().current_object_missing_warnings() {
1770 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1771 search_path: session
1772 .vars()
1773 .search_path()
1774 .into_iter()
1775 .map(|schema| schema.to_string())
1776 .collect(),
1777 });
1778 }
1779 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1780 }
1781 };
1782 }
1783
1784 let variable = session
1785 .vars()
1786 .get(self.catalog().system_config(), &plan.name)
1787 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1788
1789 variable.visible(session.user(), self.catalog().system_config())?;
1792
1793 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1794 if variable.name() == vars::DATABASE.name()
1795 && matches!(
1796 self.catalog().resolve_database(&variable.value()),
1797 Err(CatalogError::UnknownDatabase(_))
1798 )
1799 && session.vars().current_object_missing_warnings()
1800 {
1801 let name = variable.value();
1802 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1803 } else if variable.name() == vars::CLUSTER.name()
1804 && matches!(
1805 self.catalog().resolve_cluster(&variable.value()),
1806 Err(CatalogError::UnknownCluster(_))
1807 )
1808 && session.vars().current_object_missing_warnings()
1809 {
1810 let name = variable.value();
1811 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1812 }
1813 Ok(Self::send_immediate_rows(row))
1814 }
1815
1816 #[instrument]
1817 pub(super) async fn sequence_inspect_shard(
1818 &self,
1819 session: &Session,
1820 plan: plan::InspectShardPlan,
1821 ) -> Result<ExecuteResponse, AdapterError> {
1822 if !session.user().is_internal() {
1825 return Err(AdapterError::Unauthorized(
1826 rbac::UnauthorizedError::MzSystem {
1827 action: "inspect".into(),
1828 },
1829 ));
1830 }
1831 let state = self
1832 .controller
1833 .storage
1834 .inspect_persist_state(plan.id)
1835 .await?;
1836 let jsonb = Jsonb::from_serde_json(state)?;
1837 Ok(Self::send_immediate_rows(jsonb.into_row()))
1838 }
1839
1840 #[instrument]
1841 pub(super) fn sequence_set_variable(
1842 &self,
1843 session: &mut Session,
1844 plan: plan::SetVariablePlan,
1845 ) -> Result<ExecuteResponse, AdapterError> {
1846 let (name, local) = (plan.name, plan.local);
1847 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1848 self.validate_set_isolation_level(session)?;
1849 }
1850 if &name == vars::CLUSTER.name() {
1851 self.validate_set_cluster(session)?;
1852 }
1853
1854 let vars = session.vars_mut();
1855 let values = match plan.value {
1856 plan::VariableValue::Default => None,
1857 plan::VariableValue::Values(values) => Some(values),
1858 };
1859
1860 match values {
1861 Some(values) => {
1862 vars.set(
1863 self.catalog().system_config(),
1864 &name,
1865 VarInput::SqlSet(&values),
1866 local,
1867 )?;
1868
1869 let vars = session.vars();
1870
1871 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1874 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1875 } else if name == vars::CLUSTER.name()
1876 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1877 {
1878 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1879 }
1880
1881 if name.as_str() == vars::DATABASE.name()
1883 && matches!(
1884 self.catalog().resolve_database(vars.database()),
1885 Err(CatalogError::UnknownDatabase(_))
1886 )
1887 && session.vars().current_object_missing_warnings()
1888 {
1889 let name = vars.database().to_string();
1890 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1891 } else if name.as_str() == vars::CLUSTER.name()
1892 && matches!(
1893 self.catalog().resolve_cluster(vars.cluster()),
1894 Err(CatalogError::UnknownCluster(_))
1895 )
1896 && session.vars().current_object_missing_warnings()
1897 {
1898 let name = vars.cluster().to_string();
1899 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1900 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1901 let v = values.into_first().to_lowercase();
1902 if v == IsolationLevel::ReadUncommitted.as_str()
1903 || v == IsolationLevel::ReadCommitted.as_str()
1904 || v == IsolationLevel::RepeatableRead.as_str()
1905 {
1906 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1907 isolation_level: v,
1908 });
1909 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1910 session.add_notice(AdapterNotice::StrongSessionSerializable);
1911 }
1912 }
1913 }
1914 None => vars.reset(self.catalog().system_config(), &name, local)?,
1915 }
1916
1917 Ok(ExecuteResponse::SetVariable { name, reset: false })
1918 }
1919
1920 pub(super) fn sequence_reset_variable(
1921 &self,
1922 session: &mut Session,
1923 plan: plan::ResetVariablePlan,
1924 ) -> Result<ExecuteResponse, AdapterError> {
1925 let name = plan.name;
1926 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1927 self.validate_set_isolation_level(session)?;
1928 }
1929 if &name == vars::CLUSTER.name() {
1930 self.validate_set_cluster(session)?;
1931 }
1932 session
1933 .vars_mut()
1934 .reset(self.catalog().system_config(), &name, false)?;
1935 Ok(ExecuteResponse::SetVariable { name, reset: true })
1936 }
1937
1938 pub(super) fn sequence_set_transaction(
1939 &self,
1940 session: &mut Session,
1941 plan: plan::SetTransactionPlan,
1942 ) -> Result<ExecuteResponse, AdapterError> {
1943 for mode in plan.modes {
1945 match mode {
1946 TransactionMode::AccessMode(_) => {
1947 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1948 }
1949 TransactionMode::IsolationLevel(isolation_level) => {
1950 self.validate_set_isolation_level(session)?;
1951
1952 session.vars_mut().set(
1953 self.catalog().system_config(),
1954 TRANSACTION_ISOLATION_VAR_NAME,
1955 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1956 plan.local,
1957 )?
1958 }
1959 }
1960 }
1961 Ok(ExecuteResponse::SetVariable {
1962 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1963 reset: false,
1964 })
1965 }
1966
1967 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1968 if session.transaction().contains_ops() {
1969 Err(AdapterError::InvalidSetIsolationLevel)
1970 } else {
1971 Ok(())
1972 }
1973 }
1974
1975 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1976 if session.transaction().contains_ops() {
1977 Err(AdapterError::InvalidSetCluster)
1978 } else {
1979 Ok(())
1980 }
1981 }
1982
1983 #[instrument]
1984 pub(super) async fn sequence_end_transaction(
1985 &mut self,
1986 mut ctx: ExecuteContext,
1987 mut action: EndTransactionAction,
1988 ) {
1989 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1991 (&action, ctx.session().transaction())
1992 {
1993 action = EndTransactionAction::Rollback;
1994 }
1995 let response = match action {
1996 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
1997 params: BTreeMap::new(),
1998 }),
1999 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2000 params: BTreeMap::new(),
2001 }),
2002 };
2003
2004 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2005
2006 let (response, action) = match result {
2007 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2008 (response, action)
2009 }
2010 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2011 let validated_locks = match write_lock_guards {
2015 None => None,
2016 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2017 Ok(locks) => Some(locks),
2018 Err(missing) => {
2019 tracing::error!(?missing, "programming error, missing write locks");
2020 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2021 }
2022 },
2023 };
2024
2025 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2026 for WriteOp { id, rows } in writes {
2027 let total_rows = collected_writes.entry(id).or_default();
2028 total_rows.push(rows);
2029 }
2030
2031 self.submit_write(PendingWriteTxn::User {
2032 span: Span::current(),
2033 writes: collected_writes,
2034 write_locks: validated_locks,
2035 pending_txn: PendingTxn {
2036 ctx,
2037 response,
2038 action,
2039 },
2040 });
2041 return;
2042 }
2043 Ok((
2044 Some(TransactionOps::Peeks {
2045 determination,
2046 requires_linearization: RequireLinearization::Required,
2047 ..
2048 }),
2049 _,
2050 )) if ctx.session().vars().transaction_isolation()
2051 == &IsolationLevel::StrictSerializable =>
2052 {
2053 let conn_id = ctx.session().conn_id().clone();
2054 let pending_read_txn = PendingReadTxn {
2055 txn: PendingRead::Read {
2056 txn: PendingTxn {
2057 ctx,
2058 response,
2059 action,
2060 },
2061 },
2062 timestamp_context: determination.timestamp_context,
2063 created: Instant::now(),
2064 num_requeues: 0,
2065 otel_ctx: OpenTelemetryContext::obtain(),
2066 };
2067 self.strict_serializable_reads_tx
2068 .send((conn_id, pending_read_txn))
2069 .expect("sending to strict_serializable_reads_tx cannot fail");
2070 return;
2071 }
2072 Ok((
2073 Some(TransactionOps::Peeks {
2074 determination,
2075 requires_linearization: RequireLinearization::Required,
2076 ..
2077 }),
2078 _,
2079 )) if ctx.session().vars().transaction_isolation()
2080 == &IsolationLevel::StrongSessionSerializable =>
2081 {
2082 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2083 ctx.session_mut()
2084 .ensure_timestamp_oracle(timeline.clone())
2085 .apply_write(*ts);
2086 }
2087 (response, action)
2088 }
2089 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2090 self.internal_cmd_tx
2091 .send(Message::ExecuteSingleStatementTransaction {
2092 ctx,
2093 otel_ctx: OpenTelemetryContext::obtain(),
2094 stmt,
2095 params,
2096 })
2097 .expect("must send");
2098 return;
2099 }
2100 Ok((_, _)) => (response, action),
2101 Err(err) => (Err(err), EndTransactionAction::Rollback),
2102 };
2103 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2104 let response = response.map(|mut r| {
2106 r.extend_params(changed);
2107 ExecuteResponse::from(r)
2108 });
2109
2110 ctx.retire(response);
2111 }
2112
2113 #[instrument]
2114 async fn sequence_end_transaction_inner(
2115 &mut self,
2116 ctx: &mut ExecuteContext,
2117 action: EndTransactionAction,
2118 ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2119 let txn = self.clear_transaction(ctx.session_mut()).await;
2120
2121 if let EndTransactionAction::Commit = action {
2122 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2123 match &mut ops {
2124 TransactionOps::Writes(writes) => {
2125 for WriteOp { id, .. } in &mut writes.iter() {
2126 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2128 AdapterError::Catalog(mz_catalog::memory::error::Error {
2129 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2130 })
2131 })?;
2132 }
2133
2134 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2136 }
2137 TransactionOps::DDL {
2138 ops,
2139 state: _,
2140 side_effects,
2141 revision,
2142 snapshot: _,
2143 } => {
2144 if *revision != self.catalog().transient_revision() {
2146 return Err(AdapterError::DDLTransactionRace);
2147 }
2148 let ops = std::mem::take(ops);
2150 let side_effects = std::mem::take(side_effects);
2151 self.catalog_transact_with_side_effects(
2152 Some(ctx),
2153 ops,
2154 move |a, mut ctx| {
2155 Box::pin(async move {
2156 for side_effect in side_effects {
2157 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2158 }
2159 })
2160 },
2161 )
2162 .await?;
2163 }
2164 _ => (),
2165 }
2166 return Ok((Some(ops), write_lock_guards));
2167 }
2168 }
2169
2170 Ok((None, None))
2171 }
2172
2173 pub(super) async fn sequence_side_effecting_func(
2174 &mut self,
2175 ctx: ExecuteContext,
2176 plan: SideEffectingFunc,
2177 ) {
2178 match plan {
2179 SideEffectingFunc::PgCancelBackend { connection_id } => {
2180 if ctx.session().conn_id().unhandled() == connection_id {
2181 ctx.retire(Err(AdapterError::Canceled));
2185 return;
2186 }
2187
2188 let res = if let Some((id_handle, _conn_meta)) =
2189 self.active_conns.get_key_value(&connection_id)
2190 {
2191 self.handle_privileged_cancel(id_handle.clone()).await;
2193 Datum::True
2194 } else {
2195 Datum::False
2196 };
2197 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2198 }
2199 }
2200 }
2201
2202 pub(crate) async fn execute_side_effecting_func(
2211 &mut self,
2212 plan: SideEffectingFunc,
2213 conn_id: ConnectionId,
2214 current_role: RoleId,
2215 ) -> Result<ExecuteResponse, AdapterError> {
2216 match plan {
2217 SideEffectingFunc::PgCancelBackend { connection_id } => {
2218 if conn_id.unhandled() == connection_id {
2219 return Err(AdapterError::Canceled);
2223 }
2224
2225 if let Some((_id_handle, conn_meta)) =
2228 self.active_conns.get_key_value(&connection_id)
2229 {
2230 let target_role = *conn_meta.authenticated_role_id();
2231 let role_membership = self
2232 .catalog()
2233 .state()
2234 .collect_role_membership(¤t_role);
2235 if !role_membership.contains(&target_role) {
2236 let target_role_name = self
2237 .catalog()
2238 .try_get_role(&target_role)
2239 .map(|role| role.name().to_string())
2240 .unwrap_or_else(|| target_role.to_string());
2241 return Err(AdapterError::Unauthorized(
2242 rbac::UnauthorizedError::RoleMembership {
2243 role_names: vec![target_role_name],
2244 },
2245 ));
2246 }
2247
2248 let id_handle = self
2250 .active_conns
2251 .get_key_value(&connection_id)
2252 .map(|(id, _)| id.clone())
2253 .expect("checked above");
2254 self.handle_privileged_cancel(id_handle).await;
2255 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2256 } else {
2257 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2259 }
2260 }
2261 }
2262 }
2263
2264 pub(crate) async fn determine_real_time_recent_timestamp(
2268 &self,
2269 source_ids: impl Iterator<Item = GlobalId>,
2270 real_time_recency_timeout: Duration,
2271 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2272 let item_ids = source_ids
2273 .map(|gid| {
2274 self.catalog
2275 .try_resolve_item_id(&gid)
2276 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2277 })
2278 .collect::<Result<Vec<_>, _>>()?;
2279
2280 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2286 if to_visit.is_empty() {
2289 return Ok(None);
2290 }
2291
2292 let mut timestamp_objects = BTreeSet::new();
2293
2294 while let Some(id) = to_visit.pop_front() {
2295 timestamp_objects.insert(id);
2296 to_visit.extend(
2297 self.catalog()
2298 .get_entry(&id)
2299 .uses()
2300 .into_iter()
2301 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2302 );
2303 }
2304 let timestamp_objects = timestamp_objects
2305 .into_iter()
2306 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2307 .collect();
2308
2309 let r = self
2310 .controller
2311 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2312 .await?;
2313
2314 Ok(Some(r))
2315 }
2316
2317 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2320 &self,
2321 session: &Session,
2322 source_ids: impl Iterator<Item = GlobalId>,
2323 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2324 let vars = session.vars();
2325
2326 if vars.real_time_recency()
2327 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2328 && !session.contains_read_timestamp()
2329 {
2330 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2331 .await
2332 } else {
2333 Ok(None)
2334 }
2335 }
2336
2337 #[instrument]
2338 pub(super) async fn sequence_explain_plan(
2339 &mut self,
2340 ctx: ExecuteContext,
2341 plan: plan::ExplainPlanPlan,
2342 target_cluster: TargetCluster,
2343 ) {
2344 match &plan.explainee {
2345 plan::Explainee::Statement(stmt) => match stmt {
2346 plan::ExplaineeStatement::CreateView { .. } => {
2347 self.explain_create_view(ctx, plan).await;
2348 }
2349 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2350 self.explain_create_materialized_view(ctx, plan).await;
2351 }
2352 plan::ExplaineeStatement::CreateIndex { .. } => {
2353 self.explain_create_index(ctx, plan).await;
2354 }
2355 plan::ExplaineeStatement::Select { .. } => {
2356 self.explain_peek(ctx, plan, target_cluster).await;
2357 }
2358 plan::ExplaineeStatement::Subscribe { .. } => {
2359 self.explain_subscribe(ctx, plan, target_cluster).await;
2360 }
2361 },
2362 plan::Explainee::View(_) => {
2363 let result = self.explain_view(&ctx, plan);
2364 ctx.retire(result);
2365 }
2366 plan::Explainee::MaterializedView(_) => {
2367 let result = self.explain_materialized_view(&ctx, plan);
2368 ctx.retire(result);
2369 }
2370 plan::Explainee::Index(_) => {
2371 let result = self.explain_index(&ctx, plan);
2372 ctx.retire(result);
2373 }
2374 plan::Explainee::ReplanView(_) => {
2375 self.explain_replan_view(ctx, plan).await;
2376 }
2377 plan::Explainee::ReplanMaterializedView(_) => {
2378 self.explain_replan_materialized_view(ctx, plan).await;
2379 }
2380 plan::Explainee::ReplanIndex(_) => {
2381 self.explain_replan_index(ctx, plan).await;
2382 }
2383 };
2384 }
2385
2386 pub(super) async fn sequence_explain_pushdown(
2387 &mut self,
2388 ctx: ExecuteContext,
2389 plan: plan::ExplainPushdownPlan,
2390 target_cluster: TargetCluster,
2391 ) {
2392 match plan.explainee {
2393 Explainee::Statement(ExplaineeStatement::Select {
2394 broken: false,
2395 plan,
2396 desc: _,
2397 }) => {
2398 let stage = return_if_err!(
2399 self.peek_validate(
2400 ctx.session(),
2401 plan,
2402 target_cluster,
2403 None,
2404 ExplainContext::Pushdown,
2405 Some(ctx.session().vars().max_query_result_size()),
2406 ),
2407 ctx
2408 );
2409 self.sequence_staged(ctx, Span::current(), stage).await;
2410 }
2411 Explainee::MaterializedView(item_id) => {
2412 self.explain_pushdown_materialized_view(ctx, item_id).await;
2413 }
2414 _ => {
2415 ctx.retire(Err(AdapterError::Unsupported(
2416 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2417 )));
2418 }
2419 };
2420 }
2421
2422 async fn execute_explain_pushdown_with_read_holds(
2424 &self,
2425 ctx: ExecuteContext,
2426 as_of: Antichain<Timestamp>,
2427 mz_now: ResultSpec<'static>,
2428 read_holds: Option<ReadHolds>,
2429 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2430 ) {
2431 let fut = self
2432 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2433 .await;
2434 task::spawn(|| "render explain pushdown", async move {
2435 let _read_holds = read_holds;
2437 let res = fut.await;
2438 ctx.retire(res);
2439 });
2440 }
2441
2442 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2444 &self,
2445 session: &Session,
2446 as_of: Antichain<Timestamp>,
2447 mz_now: ResultSpec<'static>,
2448 imports: I,
2449 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2450 super::explain_pushdown_future_inner(
2452 session,
2453 &self.catalog,
2454 &self.controller.storage_collections,
2455 as_of,
2456 mz_now,
2457 imports,
2458 )
2459 .await
2460 }
2461
2462 #[instrument]
2463 pub(super) async fn sequence_insert(
2464 &mut self,
2465 mut ctx: ExecuteContext,
2466 plan: plan::InsertPlan,
2467 ) {
2468 if !ctx.session_mut().transaction().allows_writes() {
2476 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2477 return;
2478 }
2479
2480 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2494 let expr = return_if_err!(
2497 plan.values
2498 .clone()
2499 .lower(self.catalog().system_config(), None),
2500 ctx
2501 );
2502 OptimizedMirRelationExpr(expr)
2503 } else {
2504 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2506
2507 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2509
2510 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2512 };
2513
2514 match optimized_mir.into_inner() {
2515 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2516 let catalog = self.owned_catalog();
2517 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2518 let result =
2519 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2520 ctx.retire(result);
2521 });
2522 }
2523 _ => {
2525 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2526 Some(table) => {
2527 let desc = table.relation_desc_latest().expect("table has a desc");
2529 desc.arity()
2530 }
2531 None => {
2532 ctx.retire(Err(AdapterError::Catalog(
2533 mz_catalog::memory::error::Error {
2534 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2535 plan.id.to_string(),
2536 )),
2537 },
2538 )));
2539 return;
2540 }
2541 };
2542
2543 let finishing = RowSetFinishing {
2544 order_by: vec![],
2545 limit: None,
2546 offset: 0,
2547 project: (0..desc_arity).collect(),
2548 };
2549
2550 let read_then_write_plan = plan::ReadThenWritePlan {
2551 id: plan.id,
2552 selection: plan.values,
2553 finishing,
2554 assignments: BTreeMap::new(),
2555 kind: MutationKind::Insert,
2556 returning: plan.returning,
2557 };
2558
2559 self.sequence_read_then_write(ctx, read_then_write_plan)
2560 .await;
2561 }
2562 }
2563 }
2564
2565 #[instrument]
2570 pub(super) async fn sequence_read_then_write(
2571 &mut self,
2572 mut ctx: ExecuteContext,
2573 plan: plan::ReadThenWritePlan,
2574 ) {
2575 let mut source_ids: BTreeSet<_> = plan
2576 .selection
2577 .depends_on()
2578 .into_iter()
2579 .map(|gid| self.catalog().resolve_item_id(&gid))
2580 .collect();
2581 source_ids.insert(plan.id);
2582
2583 if ctx.session().transaction().write_locks().is_none() {
2585 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2587
2588 for id in &source_ids {
2590 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2591 write_locks.insert_lock(*id, lock);
2592 }
2593 }
2594
2595 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2597 Ok(locks) => locks,
2598 Err(missing) => {
2599 let role_metadata = ctx.session().role_metadata().clone();
2601 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2602 let plan = DeferredPlan {
2603 ctx,
2604 plan: Plan::ReadThenWrite(plan),
2605 validity: PlanValidity::new(
2606 self.catalog.transient_revision(),
2607 source_ids.clone(),
2608 None,
2609 None,
2610 role_metadata,
2611 ),
2612 requires_locks: source_ids,
2613 };
2614 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2615 }
2616 };
2617
2618 ctx.session_mut()
2619 .try_grant_write_locks(write_locks)
2620 .expect("session has already been granted write locks");
2621 }
2622
2623 let plan::ReadThenWritePlan {
2624 id,
2625 kind,
2626 selection,
2627 mut assignments,
2628 finishing,
2629 mut returning,
2630 } = plan;
2631
2632 let desc = match self.catalog().try_get_entry(&id) {
2634 Some(table) => {
2635 table
2637 .relation_desc_latest()
2638 .expect("table has a desc")
2639 .into_owned()
2640 }
2641 None => {
2642 ctx.retire(Err(AdapterError::Catalog(
2643 mz_catalog::memory::error::Error {
2644 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2645 },
2646 )));
2647 return;
2648 }
2649 };
2650
2651 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2653 || assignments.values().any(|e| e.contains_temporal())
2654 || returning.iter().any(|e| e.contains_temporal());
2655 if contains_temporal {
2656 ctx.retire(Err(AdapterError::Unsupported(
2657 "calls to mz_now in write statements",
2658 )));
2659 return;
2660 }
2661
2662 for gid in selection.depends_on() {
2664 let item_id = self.catalog().resolve_item_id(&gid);
2665 if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2666 ctx.retire(Err(err));
2667 return;
2668 }
2669 }
2670
2671 let (peek_tx, peek_rx) = oneshot::channel();
2672 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2673 let (tx, _, session, extra) = ctx.into_parts();
2674 let peek_ctx = ExecuteContext::from_parts(
2686 peek_client_tx,
2687 self.internal_cmd_tx.clone(),
2688 session,
2689 Default::default(),
2690 );
2691
2692 self.sequence_peek(
2693 peek_ctx,
2694 plan::SelectPlan {
2695 select: None,
2696 source: selection,
2697 when: QueryWhen::FreshestTableWrite,
2698 finishing,
2699 copy_to: None,
2700 },
2701 TargetCluster::Active,
2702 None,
2703 )
2704 .await;
2705
2706 let internal_cmd_tx = self.internal_cmd_tx.clone();
2707 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2708 let catalog = self.owned_catalog();
2709 let max_result_size = self.catalog().system_config().max_result_size();
2710
2711 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2712 let (peek_response, session) = match peek_rx.await {
2713 Ok(Response {
2714 result: Ok(resp),
2715 session,
2716 otel_ctx,
2717 }) => {
2718 otel_ctx.attach_as_parent();
2719 (resp, session)
2720 }
2721 Ok(Response {
2722 result: Err(e),
2723 session,
2724 otel_ctx,
2725 }) => {
2726 let ctx =
2727 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2728 otel_ctx.attach_as_parent();
2729 ctx.retire(Err(e));
2730 return;
2731 }
2732 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2734 };
2735 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2736 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2737
2738 if timeout_dur == Duration::ZERO {
2740 timeout_dur = Duration::MAX;
2741 }
2742
2743 let style = ExprPrepOneShot {
2744 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2746 catalog_state: catalog.state(),
2747 };
2748 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2749 return_if_err!(style.prep_scalar_expr(expr), ctx);
2750 }
2751
2752 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2753 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2754 let arena = RowArena::new();
2755 let mut diffs = Vec::new();
2756 let mut datum_vec = mz_repr::DatumVec::new();
2757
2758 while let Some(row) = rows.next() {
2759 if !assignments.is_empty() {
2760 assert!(
2761 matches!(kind, MutationKind::Update),
2762 "only updates support assignments"
2763 );
2764 let mut datums = datum_vec.borrow_with(row);
2765 let mut updates = vec![];
2766 for (idx, expr) in &assignments {
2767 let updated = match expr.eval(&datums, &arena) {
2768 Ok(updated) => updated,
2769 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2770 };
2771 updates.push((*idx, updated));
2772 }
2773 for (idx, new_value) in updates {
2774 datums[idx] = new_value;
2775 }
2776 let updated = Row::pack_slice(&datums);
2777 diffs.push((updated, Diff::ONE));
2778 }
2779 match kind {
2780 MutationKind::Update | MutationKind::Delete => {
2784 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2785 }
2786 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2787 }
2788 }
2789
2790 let mut byte_size: u64 = 0;
2793 for (row, diff) in &diffs {
2794 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2795 if diff.is_positive() {
2796 for (idx, datum) in row.iter().enumerate() {
2797 desc.constraints_met(idx, &datum)?;
2798 }
2799 }
2800 }
2801 Ok((diffs, byte_size))
2802 };
2803
2804 let diffs = match peek_response {
2805 ExecuteResponse::SendingRowsStreaming {
2806 rows: mut rows_stream,
2807 ..
2808 } => {
2809 let mut byte_size: u64 = 0;
2810 let mut diffs = Vec::new();
2811 let result = loop {
2812 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2813 Ok(Some(res)) => match res {
2814 PeekResponseUnary::Rows(new_rows) => {
2815 match make_diffs(new_rows) {
2816 Ok((mut new_diffs, new_byte_size)) => {
2817 byte_size = byte_size.saturating_add(new_byte_size);
2818 if byte_size > max_result_size {
2819 break Err(AdapterError::ResultSize(format!(
2820 "result exceeds max size of {max_result_size}"
2821 )));
2822 }
2823 diffs.append(&mut new_diffs)
2824 }
2825 Err(e) => break Err(e),
2826 };
2827 }
2828 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2829 PeekResponseUnary::Error(e) => {
2830 break Err(AdapterError::Unstructured(anyhow!(e)));
2831 }
2832 PeekResponseUnary::DependencyDropped(dep) => {
2833 break Err(dep.to_concurrent_dependency_drop());
2834 }
2835 },
2836 Ok(None) => break Ok(diffs),
2837 Err(_) => {
2838 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2843 conn_id: ctx.session().conn_id().clone(),
2844 });
2845 if let Err(e) = result {
2846 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2847 }
2848 break Err(AdapterError::StatementTimeout);
2849 }
2850 }
2851 };
2852
2853 result
2854 }
2855 ExecuteResponse::SendingRowsImmediate { rows } => {
2856 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2857 }
2858 resp => Err(AdapterError::Unstructured(anyhow!(
2859 "unexpected peek response: {resp:?}"
2860 ))),
2861 };
2862
2863 let mut returning_rows = Vec::new();
2864 let mut diff_err: Option<AdapterError> = None;
2865 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2866 let arena = RowArena::new();
2867 for (row, diff) in diffs {
2868 if !diff.is_positive() {
2869 continue;
2870 }
2871 let mut returning_row = Row::with_capacity(returning.len());
2872 let mut packer = returning_row.packer();
2873 for expr in &returning {
2874 let datums: Vec<_> = row.iter().collect();
2875 match expr.eval(&datums, &arena) {
2876 Ok(datum) => {
2877 packer.push(datum);
2878 }
2879 Err(err) => {
2880 diff_err = Some(err.into());
2881 break;
2882 }
2883 }
2884 }
2885 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2886 let diff = match NonZeroUsize::try_from(diff) {
2887 Ok(diff) => diff,
2888 Err(err) => {
2889 diff_err = Some(err.into());
2890 break;
2891 }
2892 };
2893 returning_rows.push((returning_row, diff));
2894 if diff_err.is_some() {
2895 break;
2896 }
2897 }
2898 }
2899 let diffs = if let Some(err) = diff_err {
2900 Err(err)
2901 } else {
2902 diffs
2903 };
2904
2905 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2908 if let Some(timestamp_context) = timestamp_context {
2917 let (tx, rx) = tokio::sync::oneshot::channel();
2918 let conn_id = ctx.session().conn_id().clone();
2919 let pending_read_txn = PendingReadTxn {
2920 txn: PendingRead::ReadThenWrite { ctx, tx },
2921 timestamp_context,
2922 created: Instant::now(),
2923 num_requeues: 0,
2924 otel_ctx: OpenTelemetryContext::obtain(),
2925 };
2926 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2927 if let Err(e) = result {
2929 warn!(
2930 "strict_serializable_reads_tx dropped before we could send: {:?}",
2931 e
2932 );
2933 return;
2934 }
2935 let result = rx.await;
2936 ctx = match result {
2938 Ok(Some(ctx)) => ctx,
2939 Ok(None) => {
2940 return;
2943 }
2944 Err(e) => {
2945 warn!(
2946 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2947 e
2948 );
2949 return;
2950 }
2951 };
2952 }
2953
2954 match diffs {
2955 Ok(diffs) => {
2956 let result = Self::send_diffs(
2957 ctx.session_mut(),
2958 plan::SendDiffsPlan {
2959 id,
2960 updates: diffs,
2961 kind,
2962 returning: returning_rows,
2963 max_result_size,
2964 },
2965 );
2966 ctx.retire(result);
2967 }
2968 Err(e) => {
2969 ctx.retire(Err(e));
2970 }
2971 }
2972 });
2973 }
2974
2975 #[instrument]
2976 pub(super) async fn sequence_alter_item_rename(
2977 &mut self,
2978 ctx: &mut ExecuteContext,
2979 plan: plan::AlterItemRenamePlan,
2980 ) -> Result<ExecuteResponse, AdapterError> {
2981 let op = catalog::Op::RenameItem {
2982 id: plan.id,
2983 current_full_name: plan.current_full_name,
2984 to_name: plan.to_name,
2985 };
2986 match self
2987 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
2988 .await
2989 {
2990 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
2991 Err(err) => Err(err),
2992 }
2993 }
2994
2995 #[instrument]
2996 pub(super) async fn sequence_alter_retain_history(
2997 &mut self,
2998 ctx: &mut ExecuteContext,
2999 plan: plan::AlterRetainHistoryPlan,
3000 ) -> Result<ExecuteResponse, AdapterError> {
3001 let ops = vec![catalog::Op::AlterRetainHistory {
3002 id: plan.id,
3003 value: plan.value,
3004 window: plan.window,
3005 }];
3006 self.catalog_transact_with_context(None, Some(ctx), ops)
3007 .await?;
3008 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3009 }
3010
3011 #[instrument]
3012 pub(super) async fn sequence_alter_source_timestamp_interval(
3013 &mut self,
3014 ctx: &mut ExecuteContext,
3015 plan: plan::AlterSourceTimestampIntervalPlan,
3016 ) -> Result<ExecuteResponse, AdapterError> {
3017 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3018 id: plan.id,
3019 value: plan.value,
3020 interval: plan.interval,
3021 }];
3022 self.catalog_transact_with_context(None, Some(ctx), ops)
3023 .await?;
3024 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3025 }
3026
3027 #[instrument]
3028 pub(super) async fn sequence_alter_schema_rename(
3029 &mut self,
3030 ctx: &mut ExecuteContext,
3031 plan: plan::AlterSchemaRenamePlan,
3032 ) -> Result<ExecuteResponse, AdapterError> {
3033 let (database_spec, schema_spec) = plan.cur_schema_spec;
3034 let op = catalog::Op::RenameSchema {
3035 database_spec,
3036 schema_spec,
3037 new_name: plan.new_schema_name,
3038 check_reserved_names: true,
3039 };
3040 match self
3041 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3042 .await
3043 {
3044 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3045 Err(err) => Err(err),
3046 }
3047 }
3048
3049 #[instrument]
3050 pub(super) async fn sequence_alter_schema_swap(
3051 &mut self,
3052 ctx: &mut ExecuteContext,
3053 plan: plan::AlterSchemaSwapPlan,
3054 ) -> Result<ExecuteResponse, AdapterError> {
3055 let plan::AlterSchemaSwapPlan {
3056 schema_a_spec: (schema_a_db, schema_a),
3057 schema_a_name,
3058 schema_b_spec: (schema_b_db, schema_b),
3059 schema_b_name,
3060 name_temp,
3061 } = plan;
3062
3063 let op_a = catalog::Op::RenameSchema {
3064 database_spec: schema_a_db,
3065 schema_spec: schema_a,
3066 new_name: name_temp,
3067 check_reserved_names: false,
3068 };
3069 let op_b = catalog::Op::RenameSchema {
3070 database_spec: schema_b_db,
3071 schema_spec: schema_b,
3072 new_name: schema_a_name,
3073 check_reserved_names: false,
3074 };
3075 let op_c = catalog::Op::RenameSchema {
3076 database_spec: schema_a_db,
3077 schema_spec: schema_a,
3078 new_name: schema_b_name,
3079 check_reserved_names: false,
3080 };
3081
3082 match self
3083 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3084 Box::pin(async {})
3085 })
3086 .await
3087 {
3088 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3089 Err(err) => Err(err),
3090 }
3091 }
3092
3093 #[instrument]
3094 pub(super) async fn sequence_alter_role(
3095 &mut self,
3096 session: &Session,
3097 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3098 ) -> Result<ExecuteResponse, AdapterError> {
3099 let catalog = self.catalog().for_session(session);
3100 let role = catalog.get_role(&id);
3101
3102 let mut notices = vec![];
3104
3105 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3107 let mut vars = role.vars().clone();
3108
3109 let mut nopassword = false;
3112
3113 match option {
3115 PlannedAlterRoleOption::Attributes(attrs) => {
3116 self.validate_role_attributes(&attrs.clone().into())?;
3117
3118 if let Some(inherit) = attrs.inherit {
3119 attributes.inherit = inherit;
3120 }
3121
3122 if let Some(password) = attrs.password {
3123 attributes.password = Some(password);
3124 attributes.scram_iterations =
3125 Some(self.catalog().system_config().scram_iterations())
3126 }
3127
3128 if let Some(superuser) = attrs.superuser {
3129 attributes.superuser = Some(superuser);
3130 }
3131
3132 if let Some(login) = attrs.login {
3133 attributes.login = Some(login);
3134 }
3135
3136 if attrs.nopassword.unwrap_or(false) {
3137 nopassword = true;
3138 }
3139
3140 if let Some(notice) = self.should_emit_rbac_notice(session) {
3141 notices.push(notice);
3142 }
3143 }
3144 PlannedAlterRoleOption::Variable(variable) => {
3145 let session_var = session.vars().inspect(variable.name())?;
3147 session_var.visible(session.user(), catalog.system_vars())?;
3149
3150 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3153 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3154 } else if let PlannedRoleVariable::Set {
3155 name,
3156 value: VariableValue::Values(vals),
3157 } = &variable
3158 {
3159 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3160 notices.push(AdapterNotice::IntrospectionClusterUsage);
3161 }
3162 }
3163
3164 let var_name = match variable {
3165 PlannedRoleVariable::Set { name, value } => {
3166 match &value {
3168 VariableValue::Default => {
3169 vars.remove(&name);
3170 }
3171 VariableValue::Values(vals) => {
3172 let var = match &vals[..] {
3173 [val] => OwnedVarInput::Flat(val.clone()),
3174 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3175 };
3176 session_var.check(var.borrow())?;
3178
3179 vars.insert(name.clone(), var);
3180 }
3181 };
3182 name
3183 }
3184 PlannedRoleVariable::Reset { name } => {
3185 vars.remove(&name);
3187 name
3188 }
3189 };
3190
3191 notices.push(AdapterNotice::VarDefaultUpdated {
3193 role: Some(name.clone()),
3194 var_name: Some(var_name),
3195 });
3196 }
3197 }
3198
3199 let op = catalog::Op::AlterRole {
3200 id,
3201 name,
3202 attributes,
3203 nopassword,
3204 vars: RoleVars { map: vars },
3205 };
3206 let response = self
3207 .catalog_transact(Some(session), vec![op])
3208 .await
3209 .map(|_| ExecuteResponse::AlteredRole)?;
3210
3211 session.add_notices(notices);
3213
3214 Ok(response)
3215 }
3216
3217 #[instrument]
3218 pub(super) async fn sequence_alter_sink_prepare(
3219 &mut self,
3220 ctx: ExecuteContext,
3221 plan: plan::AlterSinkPlan,
3222 ) {
3223 let id_bundle = crate::CollectionIdBundle {
3225 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3226 compute_ids: BTreeMap::new(),
3227 };
3228 let read_hold = self.acquire_read_holds(&id_bundle);
3229
3230 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3231 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3232 return;
3233 };
3234
3235 let otel_ctx = OpenTelemetryContext::obtain();
3236 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3237
3238 let plan_validity = PlanValidity::new(
3239 self.catalog().transient_revision(),
3240 BTreeSet::from_iter([plan.item_id, from_item_id]),
3241 Some(plan.in_cluster),
3242 None,
3243 ctx.session().role_metadata().clone(),
3244 );
3245
3246 info!(
3247 "preparing alter sink for {}: frontiers={:?} export={:?}",
3248 plan.global_id,
3249 self.controller
3250 .storage_collections
3251 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3252 self.controller.storage.export(plan.global_id)
3253 );
3254
3255 self.install_storage_watch_set(
3261 ctx.session().conn_id().clone(),
3262 BTreeSet::from_iter([plan.global_id]),
3263 read_ts,
3264 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3265 ctx: Some(ctx),
3266 otel_ctx,
3267 plan,
3268 plan_validity,
3269 read_hold,
3270 }),
3271 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3272 }
3273
3274 #[instrument]
3275 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3276 ctx.otel_ctx.attach_as_parent();
3277
3278 let plan::AlterSinkPlan {
3279 item_id,
3280 global_id,
3281 sink: sink_plan,
3282 with_snapshot,
3283 in_cluster,
3284 } = ctx.plan.clone();
3285
3286 match ctx.plan_validity.check(self.catalog()) {
3295 Ok(()) => {}
3296 Err(err) => {
3297 ctx.retire(Err(err));
3298 return;
3299 }
3300 }
3301
3302 let entry = self.catalog().get_entry(&item_id);
3303 let CatalogItem::Sink(old_sink) = entry.item() else {
3304 panic!("invalid item kind for `AlterSinkPlan`");
3305 };
3306
3307 if sink_plan.version != old_sink.version + 1 {
3308 ctx.retire(Err(AdapterError::ChangedPlan(
3309 "sink was altered concurrently".into(),
3310 )));
3311 return;
3312 }
3313
3314 info!(
3315 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3316 self.controller
3317 .storage_collections
3318 .collections_frontiers(vec![global_id, sink_plan.from]),
3319 self.controller.storage.export(global_id),
3320 );
3321
3322 let write_frontier = &self
3325 .controller
3326 .storage
3327 .export(global_id)
3328 .expect("sink known to exist")
3329 .write_frontier;
3330 let as_of = ctx.read_hold.least_valid_read();
3331 assert!(
3332 write_frontier.iter().all(|t| as_of.less_than(t)),
3333 "{:?} should be strictly less than {:?}",
3334 &*as_of,
3335 &**write_frontier
3336 );
3337
3338 let create_sql = &old_sink.create_sql;
3344 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3345 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3346 unreachable!("invalid statement kind for sink");
3347 };
3348
3349 stmt.with_options
3351 .retain(|o| o.name != CreateSinkOptionName::Version);
3352 stmt.with_options.push(CreateSinkOption {
3353 name: CreateSinkOptionName::Version,
3354 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3355 sink_plan.version.to_string(),
3356 ))),
3357 });
3358
3359 let conn_catalog = self.catalog().for_system_session();
3360 let (mut stmt, resolved_ids) =
3361 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3362
3363 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3365 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3366 stmt.from = ResolvedItemName::Item {
3367 id: from_entry.id(),
3368 qualifiers: from_entry.name.qualifiers.clone(),
3369 full_name,
3370 print_id: true,
3371 version: from_entry.version,
3372 };
3373
3374 let new_sink = Sink {
3375 create_sql: stmt.to_ast_string_stable(),
3376 global_id,
3377 from: sink_plan.from,
3378 connection: sink_plan.connection.clone(),
3379 envelope: sink_plan.envelope,
3380 version: sink_plan.version,
3381 with_snapshot,
3382 resolved_ids: resolved_ids.clone(),
3383 cluster_id: in_cluster,
3384 commit_interval: sink_plan.commit_interval,
3385 };
3386
3387 let ops = vec![catalog::Op::UpdateItem {
3388 id: item_id,
3389 name: entry.name().clone(),
3390 to_item: CatalogItem::Sink(new_sink),
3391 }];
3392
3393 match self
3394 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3395 .await
3396 {
3397 Ok(()) => {}
3398 Err(err) => {
3399 ctx.retire(Err(err));
3400 return;
3401 }
3402 }
3403
3404 let storage_sink_desc = StorageSinkDesc {
3405 from: sink_plan.from,
3406 from_desc: from_entry
3407 .relation_desc()
3408 .expect("sinks can only be built on items with descs")
3409 .into_owned(),
3410 connection: sink_plan
3411 .connection
3412 .clone()
3413 .into_inline_connection(self.catalog().state()),
3414 envelope: sink_plan.envelope,
3415 as_of,
3416 with_snapshot,
3417 version: sink_plan.version,
3418 from_storage_metadata: (),
3419 to_storage_metadata: (),
3420 commit_interval: sink_plan.commit_interval,
3421 };
3422
3423 self.controller
3424 .storage
3425 .alter_export(
3426 global_id,
3427 ExportDescription {
3428 sink: storage_sink_desc,
3429 instance_id: in_cluster,
3430 },
3431 )
3432 .await
3433 .unwrap_or_terminate("cannot fail to alter source desc");
3434
3435 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3436 }
3437
3438 #[instrument]
3439 pub(super) async fn sequence_alter_connection(
3440 &mut self,
3441 ctx: ExecuteContext,
3442 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3443 ) {
3444 match action {
3445 AlterConnectionAction::RotateKeys => {
3446 self.sequence_rotate_keys(ctx, id).await;
3447 }
3448 AlterConnectionAction::AlterOptions {
3449 set_options,
3450 drop_options,
3451 validate,
3452 } => {
3453 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3454 .await
3455 }
3456 }
3457 }
3458
3459 #[instrument]
3460 async fn sequence_alter_connection_options(
3461 &mut self,
3462 mut ctx: ExecuteContext,
3463 id: CatalogItemId,
3464 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3465 drop_options: BTreeSet<ConnectionOptionName>,
3466 validate: bool,
3467 ) {
3468 let cur_entry = self.catalog().get_entry(&id);
3469 let cur_conn = cur_entry.connection().expect("known to be connection");
3470 let connection_gid = cur_conn.global_id();
3471
3472 let inner = || -> Result<Connection, AdapterError> {
3473 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3475 .expect("invalid create sql persisted to catalog")
3476 .into_element()
3477 .ast
3478 {
3479 Statement::CreateConnection(stmt) => stmt,
3480 _ => unreachable!("proved type is source"),
3481 };
3482
3483 let catalog = self.catalog().for_system_session();
3484
3485 let (mut create_conn_stmt, resolved_ids) =
3487 mz_sql::names::resolve(&catalog, create_conn_stmt)
3488 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3489
3490 create_conn_stmt
3492 .values
3493 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3494
3495 create_conn_stmt.values.extend(
3497 set_options
3498 .into_iter()
3499 .map(|(name, value)| ConnectionOption { name, value }),
3500 );
3501
3502 let mut catalog = self.catalog().for_system_session();
3505 catalog.mark_id_unresolvable_for_replanning(id);
3506
3507 let plan = match mz_sql::plan::plan(
3509 None,
3510 &catalog,
3511 Statement::CreateConnection(create_conn_stmt),
3512 &Params::empty(),
3513 &resolved_ids,
3514 )
3515 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3516 {
3517 Plan::CreateConnection(plan) => plan,
3518 _ => unreachable!("create source plan is only valid response"),
3519 };
3520
3521 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3523 .expect("invalid create sql persisted to catalog")
3524 .into_element()
3525 .ast
3526 {
3527 Statement::CreateConnection(stmt) => stmt,
3528 _ => unreachable!("proved type is source"),
3529 };
3530
3531 let catalog = self.catalog().for_system_session();
3532
3533 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3535 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3536
3537 Ok(Connection {
3538 create_sql: plan.connection.create_sql,
3539 global_id: cur_conn.global_id,
3540 details: plan.connection.details,
3541 resolved_ids: new_deps,
3542 })
3543 };
3544
3545 let conn = match inner() {
3546 Ok(conn) => conn,
3547 Err(e) => {
3548 return ctx.retire(Err(e));
3549 }
3550 };
3551
3552 if validate {
3553 let connection = conn
3554 .details
3555 .to_connection()
3556 .into_inline_connection(self.catalog().state());
3557
3558 let internal_cmd_tx = self.internal_cmd_tx.clone();
3559 let transient_revision = self.catalog().transient_revision();
3560 let conn_id = ctx.session().conn_id().clone();
3561 let otel_ctx = OpenTelemetryContext::obtain();
3562 let role_metadata = ctx.session().role_metadata().clone();
3563 let current_storage_parameters = self.controller.storage.config().clone();
3564
3565 task::spawn(
3566 || format!("validate_alter_connection:{conn_id}"),
3567 async move {
3568 let resolved_ids = conn.resolved_ids.clone();
3569 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3570 let result = match std::panic::AssertUnwindSafe(
3571 connection.validate(id, ¤t_storage_parameters),
3572 )
3573 .ore_catch_unwind()
3574 .await
3575 {
3576 Ok(Ok(())) => Ok(conn),
3577 Ok(Err(err)) => Err(err.into()),
3578 Err(_panic) => {
3579 tracing::error!("alter connection validation panicked");
3580 Err(AdapterError::Internal(
3581 "connection validation panicked".into(),
3582 ))
3583 }
3584 };
3585
3586 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3588 AlterConnectionValidationReady {
3589 ctx,
3590 result,
3591 connection_id: id,
3592 connection_gid,
3593 plan_validity: PlanValidity::new(
3594 transient_revision,
3595 dependency_ids.clone(),
3596 None,
3597 None,
3598 role_metadata,
3599 ),
3600 otel_ctx,
3601 resolved_ids,
3602 },
3603 ));
3604 if let Err(e) = result {
3605 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3606 }
3607 },
3608 );
3609 } else {
3610 let result = self
3611 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3612 .await;
3613 ctx.retire(result);
3614 }
3615 }
3616
3617 #[instrument]
3618 pub(crate) async fn sequence_alter_connection_stage_finish(
3619 &mut self,
3620 session: &Session,
3621 id: CatalogItemId,
3622 connection: Connection,
3623 ) -> Result<ExecuteResponse, AdapterError> {
3624 match self.catalog.get_entry(&id).item() {
3625 CatalogItem::Connection(curr_conn) => {
3626 curr_conn
3627 .details
3628 .to_connection()
3629 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3630 .map_err(StorageError::from)?;
3631 }
3632 _ => unreachable!("known to be a connection"),
3633 };
3634
3635 let ops = vec![catalog::Op::UpdateItem {
3636 id,
3637 name: self.catalog.get_entry(&id).name().clone(),
3638 to_item: CatalogItem::Connection(connection.clone()),
3639 }];
3640
3641 self.catalog_transact(Some(session), ops).await?;
3642
3643 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3650 }
3651
3652 #[instrument]
3653 pub(super) async fn sequence_alter_source(
3654 &mut self,
3655 session: &Session,
3656 plan::AlterSourcePlan {
3657 item_id,
3658 ingestion_id,
3659 action,
3660 }: plan::AlterSourcePlan,
3661 ) -> Result<ExecuteResponse, AdapterError> {
3662 let cur_entry = self.catalog().get_entry(&item_id);
3663 let cur_source = cur_entry.source().expect("known to be source");
3664
3665 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3666 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3668 .expect("invalid create sql persisted to catalog")
3669 .into_element()
3670 .ast
3671 {
3672 Statement::CreateSource(stmt) => stmt,
3673 _ => unreachable!("proved type is source"),
3674 };
3675
3676 let catalog = coord.catalog().for_system_session();
3677
3678 mz_sql::names::resolve(&catalog, create_source_stmt)
3680 .map_err(|e| AdapterError::internal(err_cx, e))
3681 };
3682
3683 match action {
3684 plan::AlterSourceAction::AddSubsourceExports {
3685 subsources,
3686 options,
3687 } => {
3688 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3689
3690 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3691 text_columns: mut new_text_columns,
3692 exclude_columns: mut new_exclude_columns,
3693 ..
3694 } = options.try_into()?;
3695
3696 let (mut create_source_stmt, resolved_ids) =
3698 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3699
3700 let catalog = self.catalog();
3702 let curr_references: BTreeSet<_> = catalog
3703 .get_entry(&item_id)
3704 .used_by()
3705 .into_iter()
3706 .filter_map(|subsource| {
3707 catalog
3708 .get_entry(subsource)
3709 .subsource_details()
3710 .map(|(_id, reference, _details)| reference)
3711 })
3712 .collect();
3713
3714 let purification_err =
3717 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3718
3719 match &mut create_source_stmt.connection {
3723 CreateSourceConnection::Postgres {
3724 options: curr_options,
3725 ..
3726 } => {
3727 let mz_sql::plan::PgConfigOptionExtracted {
3728 mut text_columns, ..
3729 } = curr_options.clone().try_into()?;
3730
3731 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3734
3735 text_columns.retain(|column_qualified_reference| {
3737 mz_ore::soft_assert_eq_or_log!(
3738 column_qualified_reference.0.len(),
3739 4,
3740 "all TEXT COLUMNS values must be column-qualified references"
3741 );
3742 let mut table = column_qualified_reference.clone();
3743 table.0.truncate(3);
3744 curr_references.contains(&table)
3745 });
3746
3747 new_text_columns.extend(text_columns);
3749
3750 if !new_text_columns.is_empty() {
3752 new_text_columns.sort();
3753 let new_text_columns = new_text_columns
3754 .into_iter()
3755 .map(WithOptionValue::UnresolvedItemName)
3756 .collect();
3757
3758 curr_options.push(PgConfigOption {
3759 name: PgConfigOptionName::TextColumns,
3760 value: Some(WithOptionValue::Sequence(new_text_columns)),
3761 });
3762 }
3763 }
3764 CreateSourceConnection::MySql {
3765 options: curr_options,
3766 ..
3767 } => {
3768 let mz_sql::plan::MySqlConfigOptionExtracted {
3769 mut text_columns,
3770 mut exclude_columns,
3771 ..
3772 } = curr_options.clone().try_into()?;
3773
3774 curr_options.retain(|o| {
3777 !matches!(
3778 o.name,
3779 MySqlConfigOptionName::TextColumns
3780 | MySqlConfigOptionName::ExcludeColumns
3781 )
3782 });
3783
3784 let column_referenced =
3786 |column_qualified_reference: &UnresolvedItemName| {
3787 mz_ore::soft_assert_eq_or_log!(
3788 column_qualified_reference.0.len(),
3789 3,
3790 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3791 );
3792 let mut table = column_qualified_reference.clone();
3793 table.0.truncate(2);
3794 curr_references.contains(&table)
3795 };
3796 text_columns.retain(column_referenced);
3797 exclude_columns.retain(column_referenced);
3798
3799 new_text_columns.extend(text_columns);
3801 new_exclude_columns.extend(exclude_columns);
3802
3803 if !new_text_columns.is_empty() {
3805 new_text_columns.sort();
3806 let new_text_columns = new_text_columns
3807 .into_iter()
3808 .map(WithOptionValue::UnresolvedItemName)
3809 .collect();
3810
3811 curr_options.push(MySqlConfigOption {
3812 name: MySqlConfigOptionName::TextColumns,
3813 value: Some(WithOptionValue::Sequence(new_text_columns)),
3814 });
3815 }
3816 if !new_exclude_columns.is_empty() {
3818 new_exclude_columns.sort();
3819 let new_exclude_columns = new_exclude_columns
3820 .into_iter()
3821 .map(WithOptionValue::UnresolvedItemName)
3822 .collect();
3823
3824 curr_options.push(MySqlConfigOption {
3825 name: MySqlConfigOptionName::ExcludeColumns,
3826 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3827 });
3828 }
3829 }
3830 CreateSourceConnection::SqlServer {
3831 options: curr_options,
3832 ..
3833 } => {
3834 let mz_sql::plan::SqlServerConfigOptionExtracted {
3835 mut text_columns,
3836 mut exclude_columns,
3837 ..
3838 } = curr_options.clone().try_into()?;
3839
3840 curr_options.retain(|o| {
3843 !matches!(
3844 o.name,
3845 SqlServerConfigOptionName::TextColumns
3846 | SqlServerConfigOptionName::ExcludeColumns
3847 )
3848 });
3849
3850 let column_referenced =
3856 |column_qualified_reference: &UnresolvedItemName| {
3857 mz_ore::soft_assert_eq_or_log!(
3858 column_qualified_reference.0.len(),
3859 3,
3860 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3861 );
3862 let mut table = column_qualified_reference.clone();
3863 table.0.truncate(2);
3864 curr_references.iter().any(|r| r.0.ends_with(&table.0))
3865 };
3866 text_columns.retain(column_referenced);
3867 exclude_columns.retain(column_referenced);
3868
3869 new_text_columns.extend(text_columns);
3871 new_exclude_columns.extend(exclude_columns);
3872
3873 if !new_text_columns.is_empty() {
3875 new_text_columns.sort();
3876 let new_text_columns = new_text_columns
3877 .into_iter()
3878 .map(WithOptionValue::UnresolvedItemName)
3879 .collect();
3880
3881 curr_options.push(SqlServerConfigOption {
3882 name: SqlServerConfigOptionName::TextColumns,
3883 value: Some(WithOptionValue::Sequence(new_text_columns)),
3884 });
3885 }
3886 if !new_exclude_columns.is_empty() {
3888 new_exclude_columns.sort();
3889 let new_exclude_columns = new_exclude_columns
3890 .into_iter()
3891 .map(WithOptionValue::UnresolvedItemName)
3892 .collect();
3893
3894 curr_options.push(SqlServerConfigOption {
3895 name: SqlServerConfigOptionName::ExcludeColumns,
3896 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3897 });
3898 }
3899 }
3900 _ => return Err(purification_err()),
3901 };
3902
3903 let mut catalog = self.catalog().for_system_session();
3904 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3905
3906 let planned = mz_sql::plan::plan(
3908 None,
3909 &catalog,
3910 Statement::CreateSource(create_source_stmt),
3911 &Params::empty(),
3912 &resolved_ids,
3913 )
3914 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3915 let plan = match planned {
3916 Plan::CreateSource(plan) => plan,
3917 _ => unreachable!("create source plan is only valid response"),
3918 };
3919
3920 let source = Source::new(
3924 plan,
3925 cur_source.global_id,
3926 resolved_ids,
3927 cur_source.custom_logical_compaction_window,
3928 cur_source.is_retained_metrics_object,
3929 );
3930
3931 let desc = match &source.data_source {
3933 DataSourceDesc::Ingestion { desc, .. }
3934 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3935 desc.clone().into_inline_connection(self.catalog().state())
3936 }
3937 _ => unreachable!("already verified of type ingestion"),
3938 };
3939
3940 self.controller
3941 .storage
3942 .check_alter_ingestion_source_desc(ingestion_id, &desc)
3943 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3944
3945 let mut ops = vec![catalog::Op::UpdateItem {
3948 id: item_id,
3949 name: self.catalog.get_entry(&item_id).name().clone(),
3952 to_item: CatalogItem::Source(source),
3953 }];
3954
3955 let CreateSourceInner {
3956 ops: new_ops,
3957 sources: _,
3958 if_not_exists_ids,
3959 } = self.create_source_inner(session, subsources).await?;
3960
3961 ops.extend(new_ops.into_iter());
3962
3963 assert!(
3964 if_not_exists_ids.is_empty(),
3965 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
3966 );
3967
3968 self.catalog_transact(Some(session), ops).await?;
3969 }
3970 plan::AlterSourceAction::RefreshReferences { references } => {
3971 self.catalog_transact(
3972 Some(session),
3973 vec![catalog::Op::UpdateSourceReferences {
3974 source_id: item_id,
3975 references: references.into(),
3976 }],
3977 )
3978 .await?;
3979 }
3980 }
3981
3982 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3983 }
3984
3985 #[instrument]
3986 pub(super) async fn sequence_alter_system_set(
3987 &mut self,
3988 session: &Session,
3989 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
3990 ) -> Result<ExecuteResponse, AdapterError> {
3991 self.is_user_allowed_to_alter_system(session, Some(&name))?;
3992 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
3994 self.validate_alter_system_network_policy(session, &value)?;
3995 }
3996
3997 let op = match value {
3998 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
3999 name: name.clone(),
4000 value: OwnedVarInput::SqlSet(values),
4001 },
4002 plan::VariableValue::Default => {
4003 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4004 }
4005 };
4006 self.catalog_transact(Some(session), vec![op]).await?;
4007
4008 session.add_notice(AdapterNotice::VarDefaultUpdated {
4009 role: None,
4010 var_name: Some(name),
4011 });
4012 Ok(ExecuteResponse::AlteredSystemConfiguration)
4013 }
4014
4015 #[instrument]
4016 pub(super) async fn sequence_alter_system_reset(
4017 &mut self,
4018 session: &Session,
4019 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4020 ) -> Result<ExecuteResponse, AdapterError> {
4021 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4022 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4023 self.catalog_transact(Some(session), vec![op]).await?;
4024 session.add_notice(AdapterNotice::VarDefaultUpdated {
4025 role: None,
4026 var_name: Some(name),
4027 });
4028 Ok(ExecuteResponse::AlteredSystemConfiguration)
4029 }
4030
4031 #[instrument]
4032 pub(super) async fn sequence_alter_system_reset_all(
4033 &mut self,
4034 session: &Session,
4035 _: plan::AlterSystemResetAllPlan,
4036 ) -> Result<ExecuteResponse, AdapterError> {
4037 self.is_user_allowed_to_alter_system(session, None)?;
4038 let op = catalog::Op::ResetAllSystemConfiguration;
4039 self.catalog_transact(Some(session), vec![op]).await?;
4040 session.add_notice(AdapterNotice::VarDefaultUpdated {
4041 role: None,
4042 var_name: None,
4043 });
4044 Ok(ExecuteResponse::AlteredSystemConfiguration)
4045 }
4046
4047 fn is_user_allowed_to_alter_system(
4049 &self,
4050 session: &Session,
4051 var_name: Option<&str>,
4052 ) -> Result<(), AdapterError> {
4053 match (session.user().kind(), var_name) {
4054 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4056 (UserKind::Superuser, Some(name))
4058 if session.user().is_internal()
4059 || self.catalog().system_config().user_modifiable(name) =>
4060 {
4061 let var = self.catalog().system_config().get(name)?;
4064 var.visible(session.user(), self.catalog().system_config())?;
4065 Ok(())
4066 }
4067 (UserKind::Regular, Some(name))
4070 if self.catalog().system_config().user_modifiable(name) =>
4071 {
4072 Err(AdapterError::Unauthorized(
4073 rbac::UnauthorizedError::Superuser {
4074 action: format!("toggle the '{name}' system configuration parameter"),
4075 },
4076 ))
4077 }
4078 _ => Err(AdapterError::Unauthorized(
4079 rbac::UnauthorizedError::MzSystem {
4080 action: "alter system".into(),
4081 },
4082 )),
4083 }
4084 }
4085
4086 fn validate_alter_system_network_policy(
4087 &self,
4088 session: &Session,
4089 policy_value: &plan::VariableValue,
4090 ) -> Result<(), AdapterError> {
4091 let policy_name = match &policy_value {
4092 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4094 plan::VariableValue::Values(values) if values.len() == 1 => {
4095 values.iter().next().cloned()
4096 }
4097 plan::VariableValue::Values(values) => {
4098 tracing::warn!(?values, "can't set multiple network policies at once");
4099 None
4100 }
4101 };
4102 let maybe_network_policy = policy_name
4103 .as_ref()
4104 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4105 let Some(network_policy) = maybe_network_policy else {
4106 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4107 VarError::InvalidParameterValue {
4108 name: NETWORK_POLICY.name(),
4109 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4110 reason: "no network policy with such name exists".to_string(),
4111 },
4112 )));
4113 };
4114 self.validate_alter_network_policy(session, &network_policy.rules)
4115 }
4116
4117 fn validate_alter_network_policy(
4122 &self,
4123 session: &Session,
4124 policy_rules: &Vec<NetworkPolicyRule>,
4125 ) -> Result<(), AdapterError> {
4126 if session.user().is_internal() {
4129 return Ok(());
4130 }
4131 if let Some(ip) = session.meta().client_ip() {
4132 validate_ip_with_policy_rules(ip, policy_rules)
4133 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4134 } else {
4135 return Err(AdapterError::NetworkPolicyDenied(
4138 NetworkPolicyError::MissingIp,
4139 ));
4140 }
4141 Ok(())
4142 }
4143
4144 #[instrument]
4146 pub(super) fn sequence_execute(
4147 &self,
4148 session: &mut Session,
4149 plan: plan::ExecutePlan,
4150 ) -> Result<String, AdapterError> {
4151 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4153 let ps = session
4154 .get_prepared_statement_unverified(&plan.name)
4155 .expect("known to exist");
4156 let stmt = ps.stmt().cloned();
4157 let desc = ps.desc().clone();
4158 let state_revision = ps.state_revision;
4159 let logging = Arc::clone(ps.logging());
4160 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4161 }
4162
4163 #[instrument]
4164 pub(super) async fn sequence_grant_privileges(
4165 &mut self,
4166 session: &Session,
4167 plan::GrantPrivilegesPlan {
4168 update_privileges,
4169 grantees,
4170 }: plan::GrantPrivilegesPlan,
4171 ) -> Result<ExecuteResponse, AdapterError> {
4172 self.sequence_update_privileges(
4173 session,
4174 update_privileges,
4175 grantees,
4176 UpdatePrivilegeVariant::Grant,
4177 )
4178 .await
4179 }
4180
4181 #[instrument]
4182 pub(super) async fn sequence_revoke_privileges(
4183 &mut self,
4184 session: &Session,
4185 plan::RevokePrivilegesPlan {
4186 update_privileges,
4187 revokees,
4188 }: plan::RevokePrivilegesPlan,
4189 ) -> Result<ExecuteResponse, AdapterError> {
4190 self.sequence_update_privileges(
4191 session,
4192 update_privileges,
4193 revokees,
4194 UpdatePrivilegeVariant::Revoke,
4195 )
4196 .await
4197 }
4198
4199 #[instrument]
4200 async fn sequence_update_privileges(
4201 &mut self,
4202 session: &Session,
4203 update_privileges: Vec<UpdatePrivilege>,
4204 grantees: Vec<RoleId>,
4205 variant: UpdatePrivilegeVariant,
4206 ) -> Result<ExecuteResponse, AdapterError> {
4207 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4208 let mut warnings = Vec::new();
4209 let catalog = self.catalog().for_session(session);
4210
4211 for UpdatePrivilege {
4212 acl_mode,
4213 target_id,
4214 grantor,
4215 } in update_privileges
4216 {
4217 let actual_object_type = catalog.get_system_object_type(&target_id);
4218 if actual_object_type.is_relation() {
4221 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4222 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4223 if !non_applicable_privileges.is_empty() {
4224 let object_description =
4225 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4226 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4227 non_applicable_privileges,
4228 object_description,
4229 })
4230 }
4231 }
4232
4233 if let SystemObjectId::Object(object_id) = &target_id {
4234 self.catalog()
4235 .ensure_not_reserved_object(object_id, session.conn_id())?;
4236 }
4237
4238 let privileges = self
4239 .catalog()
4240 .get_privileges(&target_id, session.conn_id())
4241 .ok_or(AdapterError::Unsupported(
4244 "GRANTs/REVOKEs on an object type with no privileges",
4245 ))?;
4246
4247 for grantee in &grantees {
4248 self.catalog().ensure_not_system_role(grantee)?;
4249 self.catalog().ensure_not_predefined_role(grantee)?;
4250 let existing_privilege = privileges
4251 .get_acl_item(grantee, &grantor)
4252 .map(Cow::Borrowed)
4253 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4254
4255 match variant {
4256 UpdatePrivilegeVariant::Grant
4257 if !existing_privilege.acl_mode.contains(acl_mode) =>
4258 {
4259 ops.push(catalog::Op::UpdatePrivilege {
4260 target_id: target_id.clone(),
4261 privilege: MzAclItem {
4262 grantee: *grantee,
4263 grantor,
4264 acl_mode,
4265 },
4266 variant,
4267 });
4268 }
4269 UpdatePrivilegeVariant::Revoke
4270 if !existing_privilege
4271 .acl_mode
4272 .intersection(acl_mode)
4273 .is_empty() =>
4274 {
4275 ops.push(catalog::Op::UpdatePrivilege {
4276 target_id: target_id.clone(),
4277 privilege: MzAclItem {
4278 grantee: *grantee,
4279 grantor,
4280 acl_mode,
4281 },
4282 variant,
4283 });
4284 }
4285 _ => {}
4287 }
4288 }
4289 }
4290
4291 if ops.is_empty() {
4292 session.add_notices(warnings);
4293 return Ok(variant.into());
4294 }
4295
4296 let res = self
4297 .catalog_transact(Some(session), ops)
4298 .await
4299 .map(|_| match variant {
4300 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4301 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4302 });
4303 if res.is_ok() {
4304 session.add_notices(warnings);
4305 }
4306 res
4307 }
4308
4309 #[instrument]
4310 pub(super) async fn sequence_alter_default_privileges(
4311 &mut self,
4312 session: &Session,
4313 plan::AlterDefaultPrivilegesPlan {
4314 privilege_objects,
4315 privilege_acl_items,
4316 is_grant,
4317 }: plan::AlterDefaultPrivilegesPlan,
4318 ) -> Result<ExecuteResponse, AdapterError> {
4319 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4320 let variant = if is_grant {
4321 UpdatePrivilegeVariant::Grant
4322 } else {
4323 UpdatePrivilegeVariant::Revoke
4324 };
4325 for privilege_object in &privilege_objects {
4326 self.catalog()
4327 .ensure_not_system_role(&privilege_object.role_id)?;
4328 self.catalog()
4329 .ensure_not_predefined_role(&privilege_object.role_id)?;
4330 if let Some(database_id) = privilege_object.database_id {
4331 self.catalog()
4332 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4333 }
4334 if let Some(schema_id) = privilege_object.schema_id {
4335 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4336 let schema_spec: SchemaSpecifier = schema_id.into();
4337
4338 self.catalog().ensure_not_reserved_object(
4339 &(database_spec, schema_spec).into(),
4340 session.conn_id(),
4341 )?;
4342 }
4343 for privilege_acl_item in &privilege_acl_items {
4344 self.catalog()
4345 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4346 self.catalog()
4347 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4348 ops.push(catalog::Op::UpdateDefaultPrivilege {
4349 privilege_object: privilege_object.clone(),
4350 privilege_acl_item: privilege_acl_item.clone(),
4351 variant,
4352 })
4353 }
4354 }
4355
4356 self.catalog_transact(Some(session), ops).await?;
4357 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4358 }
4359
4360 #[instrument]
4361 pub(super) async fn sequence_grant_role(
4362 &mut self,
4363 session: &Session,
4364 plan::GrantRolePlan {
4365 role_ids,
4366 member_ids,
4367 grantor_id,
4368 }: plan::GrantRolePlan,
4369 ) -> Result<ExecuteResponse, AdapterError> {
4370 let catalog = self.catalog();
4371 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4372 for role_id in role_ids {
4373 for member_id in &member_ids {
4374 let member_membership: BTreeSet<_> =
4375 catalog.get_role(member_id).membership().keys().collect();
4376 if member_membership.contains(&role_id) {
4377 let role_name = catalog.get_role(&role_id).name().to_string();
4378 let member_name = catalog.get_role(member_id).name().to_string();
4379 catalog.ensure_not_reserved_role(member_id)?;
4381 catalog.ensure_grantable_role(&role_id)?;
4382 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4383 role_name,
4384 member_name,
4385 });
4386 } else {
4387 ops.push(catalog::Op::GrantRole {
4388 role_id,
4389 member_id: *member_id,
4390 grantor_id,
4391 });
4392 }
4393 }
4394 }
4395
4396 if ops.is_empty() {
4397 return Ok(ExecuteResponse::GrantedRole);
4398 }
4399
4400 self.catalog_transact(Some(session), ops)
4401 .await
4402 .map(|_| ExecuteResponse::GrantedRole)
4403 }
4404
4405 #[instrument]
4406 pub(super) async fn sequence_revoke_role(
4407 &mut self,
4408 session: &Session,
4409 plan::RevokeRolePlan {
4410 role_ids,
4411 member_ids,
4412 grantor_id,
4413 }: plan::RevokeRolePlan,
4414 ) -> Result<ExecuteResponse, AdapterError> {
4415 let catalog = self.catalog();
4416 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4417 for role_id in role_ids {
4418 for member_id in &member_ids {
4419 let member_membership: BTreeSet<_> =
4420 catalog.get_role(member_id).membership().keys().collect();
4421 if !member_membership.contains(&role_id) {
4422 let role_name = catalog.get_role(&role_id).name().to_string();
4423 let member_name = catalog.get_role(member_id).name().to_string();
4424 catalog.ensure_not_reserved_role(member_id)?;
4426 catalog.ensure_grantable_role(&role_id)?;
4427 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4428 role_name,
4429 member_name,
4430 });
4431 } else {
4432 ops.push(catalog::Op::RevokeRole {
4433 role_id,
4434 member_id: *member_id,
4435 grantor_id,
4436 });
4437 }
4438 }
4439 }
4440
4441 if ops.is_empty() {
4442 return Ok(ExecuteResponse::RevokedRole);
4443 }
4444
4445 self.catalog_transact(Some(session), ops)
4446 .await
4447 .map(|_| ExecuteResponse::RevokedRole)
4448 }
4449
4450 #[instrument]
4451 pub(super) async fn sequence_alter_owner(
4452 &mut self,
4453 session: &Session,
4454 plan::AlterOwnerPlan {
4455 id,
4456 object_type,
4457 new_owner,
4458 }: plan::AlterOwnerPlan,
4459 ) -> Result<ExecuteResponse, AdapterError> {
4460 let mut ops = vec![catalog::Op::UpdateOwner {
4461 id: id.clone(),
4462 new_owner,
4463 }];
4464
4465 match &id {
4466 ObjectId::Item(global_id) => {
4467 let entry = self.catalog().get_entry(global_id);
4468
4469 if entry.is_index() {
4471 let name = self
4472 .catalog()
4473 .resolve_full_name(entry.name(), Some(session.conn_id()))
4474 .to_string();
4475 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4476 return Ok(ExecuteResponse::AlteredObject(object_type));
4477 }
4478
4479 let dependent_index_ops = entry
4481 .used_by()
4482 .into_iter()
4483 .filter(|id| self.catalog().get_entry(id).is_index())
4484 .map(|id| catalog::Op::UpdateOwner {
4485 id: ObjectId::Item(*id),
4486 new_owner,
4487 });
4488 ops.extend(dependent_index_ops);
4489
4490 let dependent_subsources =
4492 entry
4493 .progress_id()
4494 .into_iter()
4495 .map(|item_id| catalog::Op::UpdateOwner {
4496 id: ObjectId::Item(item_id),
4497 new_owner,
4498 });
4499 ops.extend(dependent_subsources);
4500 }
4501 ObjectId::Cluster(cluster_id) => {
4502 let cluster = self.catalog().get_cluster(*cluster_id);
4503 let managed_cluster_replica_ops =
4505 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4506 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4507 new_owner,
4508 });
4509 ops.extend(managed_cluster_replica_ops);
4510 }
4511 _ => {}
4512 }
4513
4514 self.catalog_transact(Some(session), ops)
4515 .await
4516 .map(|_| ExecuteResponse::AlteredObject(object_type))
4517 }
4518
4519 #[instrument]
4520 pub(super) async fn sequence_reassign_owned(
4521 &mut self,
4522 session: &Session,
4523 plan::ReassignOwnedPlan {
4524 old_roles,
4525 new_role,
4526 reassign_ids,
4527 }: plan::ReassignOwnedPlan,
4528 ) -> Result<ExecuteResponse, AdapterError> {
4529 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4530 self.catalog().ensure_not_reserved_role(role_id)?;
4531 }
4532
4533 let ops = reassign_ids
4534 .into_iter()
4535 .map(|id| catalog::Op::UpdateOwner {
4536 id,
4537 new_owner: new_role,
4538 })
4539 .collect();
4540
4541 self.catalog_transact(Some(session), ops)
4542 .await
4543 .map(|_| ExecuteResponse::ReassignOwned)
4544 }
4545
4546 #[instrument]
4547 pub(crate) async fn handle_deferred_statement(&mut self) {
4548 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4552 return;
4553 };
4554 match ps {
4555 crate::coord::PlanStatement::Statement { stmt, params } => {
4556 self.handle_execute_inner(stmt, params, ctx).await;
4557 }
4558 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4559 self.sequence_plan(ctx, plan, resolved_ids).await;
4560 }
4561 }
4562 }
4563
4564 #[instrument]
4565 #[allow(clippy::unused_async)]
4567 pub(super) async fn sequence_alter_table(
4568 &mut self,
4569 ctx: &mut ExecuteContext,
4570 plan: plan::AlterTablePlan,
4571 ) -> Result<ExecuteResponse, AdapterError> {
4572 let plan::AlterTablePlan {
4573 relation_id,
4574 column_name,
4575 column_type,
4576 raw_sql_type,
4577 } = plan;
4578
4579 let (_, new_global_id) = self.allocate_user_id().await?;
4581 let ops = vec![catalog::Op::AlterAddColumn {
4582 id: relation_id,
4583 new_global_id,
4584 name: column_name,
4585 typ: column_type,
4586 sql: raw_sql_type,
4587 }];
4588
4589 self.catalog_transact_with_context(None, Some(ctx), ops)
4590 .await?;
4591
4592 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4593 }
4594
4595 #[instrument]
4597 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4598 &mut self,
4599 ctx: ExecuteContext,
4600 plan: AlterMaterializedViewApplyReplacementPlan,
4601 ) {
4602 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4612
4613 let plan_validity = PlanValidity::new(
4614 self.catalog().transient_revision(),
4615 BTreeSet::from_iter([id, replacement_id]),
4616 None,
4617 None,
4618 ctx.session().role_metadata().clone(),
4619 );
4620
4621 let target = self.catalog.get_entry(&id);
4622 let target_gid = target.latest_global_id();
4623
4624 let replacement = self.catalog.get_entry(&replacement_id);
4625 let replacement_gid = replacement.latest_global_id();
4626
4627 let target_upper = self
4628 .controller
4629 .storage_collections
4630 .collection_frontiers(target_gid)
4631 .expect("target MV exists")
4632 .write_frontier;
4633 let replacement_upper = self
4634 .controller
4635 .compute
4636 .collection_frontiers(replacement_gid, replacement.cluster_id())
4637 .expect("replacement MV exists")
4638 .write_frontier;
4639
4640 info!(
4641 %id, %replacement_id, ?target_upper, ?replacement_upper,
4642 "preparing materialized view replacement application",
4643 );
4644
4645 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4646 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4655 name: target.name().item.clone(),
4656 }));
4657 return;
4658 };
4659
4660 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4664
4665 self.install_storage_watch_set(
4669 ctx.session().conn_id().clone(),
4670 BTreeSet::from_iter([target_gid]),
4671 replacement_upper_ts,
4672 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4673 ctx: Some(ctx),
4674 otel_ctx: OpenTelemetryContext::obtain(),
4675 plan,
4676 plan_validity,
4677 }),
4678 )
4679 .expect("target collection exists");
4680 }
4681
4682 #[instrument]
4684 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4685 &mut self,
4686 mut ctx: AlterMaterializedViewReadyContext,
4687 ) {
4688 ctx.otel_ctx.attach_as_parent();
4689
4690 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4691
4692 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4697 ctx.retire(Err(err));
4698 return;
4699 }
4700
4701 info!(
4702 %id, %replacement_id,
4703 "finishing materialized view replacement application",
4704 );
4705
4706 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4707 match self
4708 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4709 .await
4710 {
4711 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4712 ObjectType::MaterializedView,
4713 ))),
4714 Err(err) => ctx.retire(Err(err)),
4715 }
4716 }
4717
4718 pub(super) async fn statistics_oracle(
4719 &self,
4720 session: &Session,
4721 source_ids: &BTreeSet<GlobalId>,
4722 query_as_of: &Antichain<Timestamp>,
4723 is_oneshot: bool,
4724 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4725 super::statistics_oracle(
4726 session,
4727 source_ids,
4728 query_as_of,
4729 is_oneshot,
4730 self.catalog().system_config(),
4731 self.controller.storage_collections.as_ref(),
4732 )
4733 .await
4734 }
4735}
4736
4737impl Coordinator {
4738 pub(crate) fn emit_raw_optimizer_notices_to_user(
4746 &self,
4747 ctx: &ExecuteContext,
4748 notices: &[RawOptimizerNotice],
4749 ) {
4750 emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4751 }
4752
4753 async fn persist_dataflow_metainfo(
4763 &mut self,
4764 df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4765 export_id: GlobalId,
4766 ) -> Option<BuiltinTableAppendNotify> {
4767 if self.catalog().state().system_config().enable_mz_notices()
4770 && !df_meta.optimizer_notices.is_empty()
4771 {
4772 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4773 self.catalog().state().pack_optimizer_notices(
4774 &mut builtin_table_updates,
4775 df_meta.optimizer_notices.iter(),
4776 Diff::ONE,
4777 );
4778
4779 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4781
4782 Some(
4783 self.builtin_table_update()
4784 .execute(builtin_table_updates)
4785 .await
4786 .0,
4787 )
4788 } else {
4789 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4791
4792 None
4793 }
4794 }
4795}