1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44 RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49 SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
55 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59 SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63 StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
101use crate::coord::read_then_write::validate_read_then_write_dependencies;
102use crate::coord::sequencer::emit_optimizer_notices;
103use crate::coord::{
104 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
105 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
106 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
107 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
108 WatchSetResponse, validate_ip_with_policy_rules,
109};
110use crate::error::AdapterError;
111use crate::notice::{AdapterNotice, DroppedInUseIndex};
112use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
113use crate::optimize::{self, Optimize};
114use crate::session::{
115 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
116 WriteLocks, WriteOp,
117};
118use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
119use crate::{PeekResponseUnary, ReadHolds};
120
121type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
123
124mod cluster;
125mod copy_from;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134macro_rules! return_if_err {
137 ($expr:expr, $ctx:expr) => {
138 match $expr {
139 Ok(v) => v,
140 Err(e) => return $ctx.retire(Err(e.into())),
141 }
142 };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148 ops: Vec<catalog::Op>,
149 dropped_active_db: bool,
150 dropped_active_cluster: bool,
151 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154struct CreateSourceInner {
156 ops: Vec<catalog::Op>,
157 sources: Vec<(CatalogItemId, Source)>,
158 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162 pub(crate) async fn sequence_staged<S>(
167 &mut self,
168 mut ctx: S::Ctx,
169 parent_span: Span,
170 mut stage: S,
171 ) where
172 S: Staged + 'static,
173 S::Ctx: Send + 'static,
174 {
175 return_if_err!(stage.validity().check(self.catalog()), ctx);
176 loop {
177 let mut cancel_enabled = stage.cancel_enabled();
178 if let Some(session) = ctx.session() {
179 if cancel_enabled {
180 if let Some((_prev_tx, prev_rx)) = self
183 .staged_cancellation
184 .insert(session.conn_id().clone(), watch::channel(false))
185 {
186 let was_canceled = *prev_rx.borrow();
187 if was_canceled {
188 ctx.retire(Err(AdapterError::Canceled));
189 return;
190 }
191 }
192 } else {
193 self.staged_cancellation.remove(session.conn_id());
196 }
197 } else {
198 cancel_enabled = false
199 };
200 let next = stage
201 .stage(self, &mut ctx)
202 .instrument(parent_span.clone())
203 .await;
204 let res = return_if_err!(next, ctx);
205 stage = match res {
206 StageResult::Handle(handle) => {
207 let internal_cmd_tx = self.internal_cmd_tx.clone();
208 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210 });
211 return;
212 }
213 StageResult::HandleRetire(handle) => {
214 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215 ctx.retire(Ok(resp));
216 });
217 return;
218 }
219 StageResult::Response(resp) => {
220 ctx.retire(Ok(resp));
221 return;
222 }
223 StageResult::Immediate(stage) => *stage,
224 }
225 }
226 }
227
228 fn handle_spawn<C, T, F>(
229 &self,
230 ctx: C,
231 handle: JoinHandle<Result<T, AdapterError>>,
232 cancel_enabled: bool,
233 f: F,
234 ) where
235 C: StagedContext + Send + 'static,
236 T: Send + 'static,
237 F: FnOnce(C, T) + Send + 'static,
238 {
239 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240 .session()
241 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242 {
243 let mut rx = rx.clone();
244 Box::pin(async move {
245 let _ = rx.wait_for(|v| *v).await;
247 ()
248 })
249 } else {
250 Box::pin(future::pending())
251 };
252 spawn(|| "sequence_staged", async move {
253 tokio::select! {
254 res = handle => {
255 let next = return_if_err!(res, ctx);
256 f(ctx, next);
257 }
258 _ = rx, if cancel_enabled => {
259 ctx.retire(Err(AdapterError::Canceled));
260 }
261 }
262 });
263 }
264
265 async fn create_source_inner(
266 &self,
267 session: &Session,
268 plans: Vec<plan::CreateSourcePlanBundle>,
269 ) -> Result<CreateSourceInner, AdapterError> {
270 let mut ops = vec![];
271 let mut sources = vec![];
272
273 let if_not_exists_ids = plans
274 .iter()
275 .filter_map(
276 |plan::CreateSourcePlanBundle {
277 item_id,
278 global_id: _,
279 plan,
280 resolved_ids: _,
281 available_source_references: _,
282 }| {
283 if plan.if_not_exists {
284 Some((*item_id, plan.name.clone()))
285 } else {
286 None
287 }
288 },
289 )
290 .collect::<BTreeMap<_, _>>();
291
292 for plan::CreateSourcePlanBundle {
293 item_id,
294 global_id,
295 mut plan,
296 resolved_ids,
297 available_source_references,
298 } in plans
299 {
300 let name = plan.name.clone();
301
302 if let mz_sql::plan::DataSourceDesc::Webhook {
304 validate_using: Some(validate),
305 ..
306 } = &mut plan.source.data_source
307 {
308 if let Err(reason) = validate.reduce_expression().await {
309 self.metrics
310 .webhook_validation_reduce_failures
311 .with_label_values(&[reason])
312 .inc();
313 return Err(AdapterError::Internal(format!(
314 "failed to reduce check expression, {reason}"
315 )));
316 }
317 }
318
319 let mut reference_ops = vec![];
322 if let Some(references) = &available_source_references {
323 reference_ops.push(catalog::Op::UpdateSourceReferences {
324 source_id: item_id,
325 references: references.clone().into(),
326 });
327 }
328
329 let source = Source::new(plan, global_id, resolved_ids, None, false);
330 ops.push(catalog::Op::CreateItem {
331 id: item_id,
332 name,
333 item: CatalogItem::Source(source.clone()),
334 owner_id: *session.current_role_id(),
335 });
336 sources.push((item_id, source));
337 ops.extend(reference_ops);
339 }
340
341 Ok(CreateSourceInner {
342 ops,
343 sources,
344 if_not_exists_ids,
345 })
346 }
347
348 pub(crate) fn plan_subsource(
356 &self,
357 session: &Session,
358 params: &mz_sql::plan::Params,
359 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
360 item_id: CatalogItemId,
361 global_id: GlobalId,
362 ) -> Result<CreateSourcePlanBundle, AdapterError> {
363 let catalog = self.catalog().for_session(session);
364 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
365
366 let (plan, _sql_impl_ids) = self.plan_statement(
367 session,
368 Statement::CreateSubsource(subsource_stmt),
369 params,
370 &resolved_ids,
371 )?;
372 let plan = match plan {
373 Plan::CreateSource(plan) => plan,
374 _ => unreachable!(),
375 };
376 Ok(CreateSourcePlanBundle {
377 item_id,
378 global_id,
379 plan,
380 resolved_ids,
381 available_source_references: None,
382 })
383 }
384
385 pub(crate) async fn plan_purified_alter_source_add_subsource(
387 &mut self,
388 session: &Session,
389 params: Params,
390 source_name: ResolvedItemName,
391 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
392 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
393 ) -> Result<(Plan, ResolvedIds), AdapterError> {
394 let mut subsource_plans = Vec::with_capacity(subsources.len());
395
396 let conn_catalog = self.catalog().for_system_session();
398 let pcx = plan::PlanContext::zero();
399 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
400
401 let entry = self.catalog().get_entry(source_name.item_id());
402 let source = entry.source().ok_or_else(|| {
403 AdapterError::internal(
404 "plan alter source",
405 format!("expected Source found {entry:?}"),
406 )
407 })?;
408
409 let item_id = entry.id();
410 let ingestion_id = source.global_id();
411 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
412
413 let ids = self
414 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
415 .await?;
416 for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
417 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
418 subsource_plans.push(s);
419 }
420
421 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
422 subsources: subsource_plans,
423 options,
424 };
425
426 Ok((
427 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
428 item_id,
429 ingestion_id,
430 action,
431 }),
432 ResolvedIds::empty(),
433 ))
434 }
435
436 pub(crate) fn plan_purified_alter_source_refresh_references(
438 &self,
439 _session: &Session,
440 _params: Params,
441 source_name: ResolvedItemName,
442 available_source_references: plan::SourceReferences,
443 ) -> Result<(Plan, ResolvedIds), AdapterError> {
444 let entry = self.catalog().get_entry(source_name.item_id());
445 let source = entry.source().ok_or_else(|| {
446 AdapterError::internal(
447 "plan alter source",
448 format!("expected Source found {entry:?}"),
449 )
450 })?;
451 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
452 references: available_source_references,
453 };
454
455 Ok((
456 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
457 item_id: entry.id(),
458 ingestion_id: source.global_id(),
459 action,
460 }),
461 ResolvedIds::empty(),
462 ))
463 }
464
465 pub(crate) async fn plan_purified_create_source(
469 &mut self,
470 ctx: &ExecuteContext,
471 params: Params,
472 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
473 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
474 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
475 available_source_references: plan::SourceReferences,
476 ) -> Result<(Plan, ResolvedIds), AdapterError> {
477 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
478
479 if let Some(progress_stmt) = progress_stmt {
481 assert_none!(progress_stmt.of_source);
486 let (item_id, global_id) = self.allocate_user_id().await?;
487 let progress_plan =
488 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
489 let progress_full_name = self
490 .catalog()
491 .resolve_full_name(&progress_plan.plan.name, None);
492 let progress_subsource = ResolvedItemName::Item {
493 id: progress_plan.item_id,
494 qualifiers: progress_plan.plan.name.qualifiers.clone(),
495 full_name: progress_full_name,
496 print_id: true,
497 version: RelationVersionSelector::Latest,
498 };
499
500 create_source_plans.push(progress_plan);
501
502 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
503 }
504
505 let catalog = self.catalog().for_session(ctx.session());
506 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
507
508 let propagated_with_options: Vec<_> = source_stmt
509 .with_options
510 .iter()
511 .filter_map(|opt| match opt.name {
512 CreateSourceOptionName::TimestampInterval => None,
513 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
514 name: CreateSubsourceOptionName::RetainHistory,
515 value: opt.value.clone(),
516 }),
517 })
518 .collect();
519
520 let source_plan = match self.plan_statement(
522 ctx.session(),
523 Statement::CreateSource(source_stmt),
524 ¶ms,
525 &resolved_ids,
526 )? {
527 (Plan::CreateSource(plan), _sql_impl_ids) => plan,
528 (p, _) => unreachable!("s must be CreateSourcePlan but got {:?}", p),
529 };
530
531 let (item_id, global_id) = self.allocate_user_id().await?;
532
533 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
534 let of_source = ResolvedItemName::Item {
535 id: item_id,
536 qualifiers: source_plan.name.qualifiers.clone(),
537 full_name: source_full_name,
538 print_id: true,
539 version: RelationVersionSelector::Latest,
540 };
541
542 let conn_catalog = self.catalog().for_system_session();
544 let pcx = plan::PlanContext::zero();
545 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
546
547 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
548
549 for subsource_stmt in subsource_stmts.iter_mut() {
550 subsource_stmt
551 .with_options
552 .extend(propagated_with_options.iter().cloned())
553 }
554
555 create_source_plans.push(CreateSourcePlanBundle {
556 item_id,
557 global_id,
558 plan: source_plan,
559 resolved_ids: resolved_ids.clone(),
560 available_source_references: Some(available_source_references),
561 });
562
563 let ids = self
565 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
566 .await?;
567 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
568 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
569 create_source_plans.push(plan);
570 }
571
572 Ok((
573 Plan::CreateSources(create_source_plans),
574 ResolvedIds::empty(),
575 ))
576 }
577
578 #[instrument]
579 pub(super) async fn sequence_create_source(
580 &mut self,
581 ctx: &mut ExecuteContext,
582 plans: Vec<plan::CreateSourcePlanBundle>,
583 ) -> Result<ExecuteResponse, AdapterError> {
584 let CreateSourceInner {
585 ops,
586 sources,
587 if_not_exists_ids,
588 } = self.create_source_inner(ctx.session(), plans).await?;
589
590 let transact_result = self
591 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
592 .await;
593
594 for (item_id, source) in &sources {
596 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
597 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
598 ctx.session()
599 .add_notice(AdapterNotice::WebhookSourceCreated { url });
600 }
601 }
602 }
603
604 match transact_result {
605 Ok(()) => Ok(ExecuteResponse::CreatedSource),
606 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
607 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
608 })) if if_not_exists_ids.contains_key(&id) => {
609 ctx.session()
610 .add_notice(AdapterNotice::ObjectAlreadyExists {
611 name: if_not_exists_ids[&id].item.clone(),
612 ty: "source",
613 });
614 Ok(ExecuteResponse::CreatedSource)
615 }
616 Err(err) => Err(err),
617 }
618 }
619
620 #[instrument]
621 pub(super) async fn sequence_create_connection(
622 &mut self,
623 mut ctx: ExecuteContext,
624 plan: plan::CreateConnectionPlan,
625 resolved_ids: ResolvedIds,
626 ) {
627 let (connection_id, connection_gid) = match self.allocate_user_id().await {
628 Ok(item_id) => item_id,
629 Err(err) => return ctx.retire(Err(err)),
630 };
631
632 match &plan.connection.details {
633 ConnectionDetails::Ssh { key_1, key_2, .. } => {
634 let key_1 = match key_1.as_key_pair() {
635 Some(key_1) => key_1.clone(),
636 None => {
637 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
638 "the PUBLIC KEY 1 option cannot be explicitly specified"
639 ))));
640 }
641 };
642
643 let key_2 = match key_2.as_key_pair() {
644 Some(key_2) => key_2.clone(),
645 None => {
646 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
647 "the PUBLIC KEY 2 option cannot be explicitly specified"
648 ))));
649 }
650 };
651
652 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
653 let secret = key_set.to_bytes();
654 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
655 return ctx.retire(Err(err.into()));
656 }
657 self.caching_secrets_reader.invalidate(connection_id);
658 }
659 _ => (),
660 };
661
662 if plan.validate {
663 let internal_cmd_tx = self.internal_cmd_tx.clone();
664 let transient_revision = self.catalog().transient_revision();
665 let conn_id = ctx.session().conn_id().clone();
666 let otel_ctx = OpenTelemetryContext::obtain();
667 let role_metadata = ctx.session().role_metadata().clone();
668
669 let connection = plan
670 .connection
671 .details
672 .to_connection()
673 .into_inline_connection(self.catalog().state());
674
675 let current_storage_parameters = self.controller.storage.config().clone();
676 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
677 let result = match std::panic::AssertUnwindSafe(
678 connection.validate(connection_id, ¤t_storage_parameters),
679 )
680 .ore_catch_unwind()
681 .await
682 {
683 Ok(Ok(())) => Ok(plan),
684 Ok(Err(err)) => Err(err.into()),
685 Err(_panic) => {
686 tracing::error!("connection validation panicked");
687 Err(AdapterError::Internal(
688 "connection validation panicked".into(),
689 ))
690 }
691 };
692
693 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
695 CreateConnectionValidationReady {
696 ctx,
697 result,
698 connection_id,
699 connection_gid,
700 plan_validity: PlanValidity::new(
701 transient_revision,
702 resolved_ids.items().copied().collect(),
703 None,
704 None,
705 role_metadata,
706 ),
707 otel_ctx,
708 resolved_ids: resolved_ids.clone(),
709 },
710 ));
711 if let Err(e) = result {
712 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
713 }
714 });
715 } else {
716 let result = self
717 .sequence_create_connection_stage_finish(
718 &mut ctx,
719 connection_id,
720 connection_gid,
721 plan,
722 resolved_ids,
723 )
724 .await;
725 ctx.retire(result);
726 }
727 }
728
729 #[instrument]
730 pub(crate) async fn sequence_create_connection_stage_finish(
731 &mut self,
732 ctx: &mut ExecuteContext,
733 connection_id: CatalogItemId,
734 connection_gid: GlobalId,
735 plan: plan::CreateConnectionPlan,
736 resolved_ids: ResolvedIds,
737 ) -> Result<ExecuteResponse, AdapterError> {
738 let ops = vec![catalog::Op::CreateItem {
739 id: connection_id,
740 name: plan.name.clone(),
741 item: CatalogItem::Connection(Connection {
742 create_sql: plan.connection.create_sql,
743 global_id: connection_gid,
744 details: plan.connection.details.clone(),
745 resolved_ids,
746 }),
747 owner_id: *ctx.session().current_role_id(),
748 }];
749
750 let conn_id = ctx.session().conn_id().clone();
753 let transact_result = self
754 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
755 .await;
756
757 match transact_result {
758 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
759 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
760 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
761 })) if plan.if_not_exists => {
762 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
765 if let Err(e) = self.secrets_controller.delete(connection_id).await {
766 tracing::warn!(
767 "Dropping SSH secret for existing connection has encountered an error: {}",
768 e
769 );
770 } else {
771 self.caching_secrets_reader.invalidate(connection_id);
772 }
773 }
774 ctx.session()
775 .add_notice(AdapterNotice::ObjectAlreadyExists {
776 name: plan.name.item,
777 ty: "connection",
778 });
779 Ok(ExecuteResponse::CreatedConnection)
780 }
781 Err(err) => {
782 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
785 if let Err(e) = self.secrets_controller.delete(connection_id).await {
786 tracing::warn!(
787 "Dropping SSH secret for failed connection has encountered an error: {}",
788 e
789 );
790 } else {
791 self.caching_secrets_reader.invalidate(connection_id);
792 }
793 }
794 Err(err)
795 }
796 }
797 }
798
799 #[instrument]
800 pub(super) async fn sequence_create_database(
801 &mut self,
802 session: &Session,
803 plan: plan::CreateDatabasePlan,
804 ) -> Result<ExecuteResponse, AdapterError> {
805 let ops = vec![catalog::Op::CreateDatabase {
806 name: plan.name.clone(),
807 owner_id: *session.current_role_id(),
808 }];
809 match self.catalog_transact(Some(session), ops).await {
810 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
811 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
812 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
813 })) if plan.if_not_exists => {
814 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
815 Ok(ExecuteResponse::CreatedDatabase)
816 }
817 Err(err) => Err(err),
818 }
819 }
820
821 #[instrument]
822 pub(super) async fn sequence_create_schema(
823 &mut self,
824 session: &Session,
825 plan: plan::CreateSchemaPlan,
826 ) -> Result<ExecuteResponse, AdapterError> {
827 let op = catalog::Op::CreateSchema {
828 database_id: plan.database_spec,
829 schema_name: plan.schema_name.clone(),
830 owner_id: *session.current_role_id(),
831 };
832 match self.catalog_transact(Some(session), vec![op]).await {
833 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
834 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
835 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
836 })) if plan.if_not_exists => {
837 session.add_notice(AdapterNotice::SchemaAlreadyExists {
838 name: plan.schema_name,
839 });
840 Ok(ExecuteResponse::CreatedSchema)
841 }
842 Err(err) => Err(err),
843 }
844 }
845
846 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
848 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
849 if attributes.superuser.is_some() || attributes.password.is_some() {
850 return Err(AdapterError::UnavailableFeature {
851 feature: "SUPERUSER and PASSWORD attributes".to_string(),
852 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
853 });
854 }
855 }
856 Ok(())
857 }
858
859 #[instrument]
860 pub(super) async fn sequence_create_role(
861 &mut self,
862 conn_id: Option<&ConnectionId>,
863 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
864 ) -> Result<ExecuteResponse, AdapterError> {
865 self.validate_role_attributes(&attributes.clone())?;
866 let op = catalog::Op::CreateRole { name, attributes };
867 self.catalog_transact_with_context(conn_id, None, vec![op])
868 .await
869 .map(|_| ExecuteResponse::CreatedRole)
870 }
871
872 #[instrument]
873 pub(super) async fn sequence_create_network_policy(
874 &mut self,
875 session: &Session,
876 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
877 ) -> Result<ExecuteResponse, AdapterError> {
878 let op = catalog::Op::CreateNetworkPolicy {
879 rules,
880 name,
881 owner_id: *session.current_role_id(),
882 };
883 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
884 .await
885 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
886 }
887
888 #[instrument]
889 pub(super) async fn sequence_alter_network_policy(
890 &mut self,
891 session: &Session,
892 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
893 ) -> Result<ExecuteResponse, AdapterError> {
894 let current_network_policy_name =
896 self.catalog().system_config().default_network_policy_name();
897 if current_network_policy_name == name {
899 self.validate_alter_network_policy(session, &rules)?;
900 }
901
902 let op = catalog::Op::AlterNetworkPolicy {
903 id,
904 rules,
905 name,
906 owner_id: *session.current_role_id(),
907 };
908 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
909 .await
910 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
911 }
912
913 #[instrument]
914 pub(super) async fn sequence_create_table(
915 &mut self,
916 ctx: &mut ExecuteContext,
917 plan: plan::CreateTablePlan,
918 resolved_ids: ResolvedIds,
919 ) -> Result<ExecuteResponse, AdapterError> {
920 let plan::CreateTablePlan {
921 name,
922 table,
923 if_not_exists,
924 } = plan;
925
926 let conn_id = if table.temporary {
927 Some(ctx.session().conn_id())
928 } else {
929 None
930 };
931 let (table_id, global_id) = self.allocate_user_id().await?;
932 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
933
934 let data_source = match table.data_source {
935 plan::TableDataSource::TableWrites { defaults } => {
936 TableDataSource::TableWrites { defaults }
937 }
938 plan::TableDataSource::DataSource {
939 desc: data_source_plan,
940 timeline,
941 } => match data_source_plan {
942 plan::DataSourceDesc::IngestionExport {
943 ingestion_id,
944 external_reference,
945 details,
946 data_config,
947 } => TableDataSource::DataSource {
948 desc: DataSourceDesc::IngestionExport {
949 ingestion_id,
950 external_reference,
951 details,
952 data_config,
953 },
954 timeline,
955 },
956 plan::DataSourceDesc::Webhook {
957 validate_using,
958 body_format,
959 headers,
960 cluster_id,
961 } => TableDataSource::DataSource {
962 desc: DataSourceDesc::Webhook {
963 validate_using,
964 body_format,
965 headers,
966 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
967 },
968 timeline,
969 },
970 o => {
971 unreachable!("CREATE TABLE data source got {:?}", o)
972 }
973 },
974 };
975
976 let is_webhook = if let TableDataSource::DataSource {
977 desc: DataSourceDesc::Webhook { .. },
978 timeline: _,
979 } = &data_source
980 {
981 true
982 } else {
983 false
984 };
985
986 let table = Table {
987 create_sql: Some(table.create_sql),
988 desc: table.desc,
989 collections,
990 conn_id: conn_id.cloned(),
991 resolved_ids,
992 custom_logical_compaction_window: table.compaction_window,
993 is_retained_metrics_object: false,
994 data_source,
995 };
996 let ops = vec![catalog::Op::CreateItem {
997 id: table_id,
998 name: name.clone(),
999 item: CatalogItem::Table(table.clone()),
1000 owner_id: *ctx.session().current_role_id(),
1001 }];
1002
1003 let catalog_result = self
1004 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1005 .await;
1006
1007 if is_webhook {
1008 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1011 ctx.session()
1012 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1013 }
1014 }
1015
1016 match catalog_result {
1017 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1018 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1019 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1020 })) if if_not_exists => {
1021 ctx.session_mut()
1022 .add_notice(AdapterNotice::ObjectAlreadyExists {
1023 name: name.item,
1024 ty: "table",
1025 });
1026 Ok(ExecuteResponse::CreatedTable)
1027 }
1028 Err(err) => Err(err),
1029 }
1030 }
1031
1032 #[instrument]
1033 pub(super) async fn sequence_create_sink(
1034 &mut self,
1035 ctx: ExecuteContext,
1036 plan: plan::CreateSinkPlan,
1037 resolved_ids: ResolvedIds,
1038 ) {
1039 let plan::CreateSinkPlan {
1040 name,
1041 sink,
1042 with_snapshot,
1043 if_not_exists,
1044 in_cluster,
1045 } = plan;
1046
1047 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1049
1050 let catalog_sink = Sink {
1051 create_sql: sink.create_sql,
1052 global_id,
1053 from: sink.from,
1054 connection: sink.connection,
1055 envelope: sink.envelope,
1056 version: sink.version,
1057 with_snapshot,
1058 resolved_ids,
1059 cluster_id: in_cluster,
1060 commit_interval: sink.commit_interval,
1061 };
1062
1063 let ops = vec![catalog::Op::CreateItem {
1064 id: item_id,
1065 name: name.clone(),
1066 item: CatalogItem::Sink(catalog_sink.clone()),
1067 owner_id: *ctx.session().current_role_id(),
1068 }];
1069
1070 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1071
1072 match result {
1073 Ok(()) => {}
1074 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1075 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1076 })) if if_not_exists => {
1077 ctx.session()
1078 .add_notice(AdapterNotice::ObjectAlreadyExists {
1079 name: name.item,
1080 ty: "sink",
1081 });
1082 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1083 return;
1084 }
1085 Err(e) => {
1086 ctx.retire(Err(e));
1087 return;
1088 }
1089 };
1090
1091 self.create_storage_export(global_id, &catalog_sink)
1092 .await
1093 .unwrap_or_terminate("cannot fail to create exports");
1094
1095 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1096 .await;
1097
1098 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1099 }
1100
1101 pub(super) fn validate_system_column_references(
1124 &self,
1125 uses_ambiguous_columns: bool,
1126 depends_on: &BTreeSet<GlobalId>,
1127 ) -> Result<(), AdapterError> {
1128 if uses_ambiguous_columns
1129 && depends_on
1130 .iter()
1131 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1132 {
1133 Err(AdapterError::AmbiguousSystemColumnReference)
1134 } else {
1135 Ok(())
1136 }
1137 }
1138
1139 #[instrument]
1140 pub(super) async fn sequence_create_type(
1141 &mut self,
1142 session: &Session,
1143 plan: plan::CreateTypePlan,
1144 resolved_ids: ResolvedIds,
1145 ) -> Result<ExecuteResponse, AdapterError> {
1146 let (item_id, global_id) = self.allocate_user_id().await?;
1147 plan.typ
1149 .inner
1150 .desc(&self.catalog().for_session(session))
1151 .map_err(AdapterError::from)?;
1152 let typ = Type {
1153 create_sql: Some(plan.typ.create_sql),
1154 global_id,
1155 details: CatalogTypeDetails {
1156 array_id: None,
1157 typ: plan.typ.inner,
1158 pg_metadata: None,
1159 },
1160 resolved_ids,
1161 };
1162 let op = catalog::Op::CreateItem {
1163 id: item_id,
1164 name: plan.name,
1165 item: CatalogItem::Type(typ),
1166 owner_id: *session.current_role_id(),
1167 };
1168 match self.catalog_transact(Some(session), vec![op]).await {
1169 Ok(()) => Ok(ExecuteResponse::CreatedType),
1170 Err(err) => Err(err),
1171 }
1172 }
1173
1174 #[instrument]
1175 pub(super) async fn sequence_comment_on(
1176 &mut self,
1177 session: &Session,
1178 plan: plan::CommentPlan,
1179 ) -> Result<ExecuteResponse, AdapterError> {
1180 let op = catalog::Op::Comment {
1181 object_id: plan.object_id,
1182 sub_component: plan.sub_component,
1183 comment: plan.comment,
1184 };
1185 self.catalog_transact(Some(session), vec![op]).await?;
1186 Ok(ExecuteResponse::Comment)
1187 }
1188
1189 #[instrument]
1190 pub(super) async fn sequence_drop_objects(
1191 &mut self,
1192 ctx: &mut ExecuteContext,
1193 plan::DropObjectsPlan {
1194 drop_ids,
1195 object_type,
1196 referenced_ids,
1197 }: plan::DropObjectsPlan,
1198 ) -> Result<ExecuteResponse, AdapterError> {
1199 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1200 let mut objects = Vec::new();
1201 for obj_id in &drop_ids {
1202 if !referenced_ids_hashset.contains(obj_id) {
1203 let object_info = ErrorMessageObjectDescription::from_id(
1204 obj_id,
1205 &self.catalog().for_session(ctx.session()),
1206 )
1207 .to_string();
1208 objects.push(object_info);
1209 }
1210 }
1211
1212 if !objects.is_empty() {
1213 ctx.session()
1214 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1215 }
1216
1217 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1219 .iter()
1220 .filter_map(|id| match id {
1221 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1222 _ => None,
1223 })
1224 .flatten()
1225 .collect();
1226
1227 let DropOps {
1228 ops,
1229 dropped_active_db,
1230 dropped_active_cluster,
1231 dropped_in_use_indexes,
1232 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1233
1234 self.catalog_transact_with_context(None, Some(ctx), ops)
1235 .await?;
1236
1237 if !expr_cache_invalidate_ids.is_empty() {
1239 let _fut = self.catalog().update_expression_cache(
1240 Default::default(),
1241 Default::default(),
1242 expr_cache_invalidate_ids,
1243 );
1244 }
1245
1246 fail::fail_point!("after_sequencer_drop_replica");
1247
1248 if dropped_active_db {
1249 ctx.session()
1250 .add_notice(AdapterNotice::DroppedActiveDatabase {
1251 name: ctx.session().vars().database().to_string(),
1252 });
1253 }
1254 if dropped_active_cluster {
1255 ctx.session()
1256 .add_notice(AdapterNotice::DroppedActiveCluster {
1257 name: ctx.session().vars().cluster().to_string(),
1258 });
1259 }
1260 for dropped_in_use_index in dropped_in_use_indexes {
1261 ctx.session()
1262 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1263 self.metrics
1264 .optimization_notices
1265 .with_label_values(&["DroppedInUseIndex"])
1266 .inc_by(1);
1267 }
1268 Ok(ExecuteResponse::DroppedObject(object_type))
1269 }
1270
1271 fn validate_dropped_role_ownership(
1272 &self,
1273 session: &Session,
1274 dropped_roles: &BTreeMap<RoleId, &str>,
1275 ) -> Result<(), AdapterError> {
1276 fn privilege_check(
1277 privileges: &PrivilegeMap,
1278 dropped_roles: &BTreeMap<RoleId, &str>,
1279 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1280 object_id: &SystemObjectId,
1281 catalog: &ConnCatalog,
1282 ) {
1283 for privilege in privileges.all_values() {
1284 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1285 let grantor_name = catalog.get_role(&privilege.grantor).name();
1286 let object_description =
1287 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1288 dependent_objects
1289 .entry(role_name.to_string())
1290 .or_default()
1291 .push(format!(
1292 "privileges on {object_description} granted by {grantor_name}",
1293 ));
1294 }
1295 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1296 let grantee_name = catalog.get_role(&privilege.grantee).name();
1297 let object_description =
1298 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1299 dependent_objects
1300 .entry(role_name.to_string())
1301 .or_default()
1302 .push(format!(
1303 "privileges granted on {object_description} to {grantee_name}"
1304 ));
1305 }
1306 }
1307 }
1308
1309 let catalog = self.catalog().for_session(session);
1310 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1311 for entry in self.catalog.entries() {
1312 let id = SystemObjectId::Object(entry.id().into());
1313 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1314 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1315 dependent_objects
1316 .entry(role_name.to_string())
1317 .or_default()
1318 .push(format!("owner of {object_description}"));
1319 }
1320 privilege_check(
1321 entry.privileges(),
1322 dropped_roles,
1323 &mut dependent_objects,
1324 &id,
1325 &catalog,
1326 );
1327 }
1328 for database in self.catalog.databases() {
1329 let database_id = SystemObjectId::Object(database.id().into());
1330 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1331 let object_description =
1332 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1333 dependent_objects
1334 .entry(role_name.to_string())
1335 .or_default()
1336 .push(format!("owner of {object_description}"));
1337 }
1338 privilege_check(
1339 &database.privileges,
1340 dropped_roles,
1341 &mut dependent_objects,
1342 &database_id,
1343 &catalog,
1344 );
1345 for schema in database.schemas_by_id.values() {
1346 let schema_id = SystemObjectId::Object(
1347 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1348 );
1349 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1350 let object_description =
1351 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1352 dependent_objects
1353 .entry(role_name.to_string())
1354 .or_default()
1355 .push(format!("owner of {object_description}"));
1356 }
1357 privilege_check(
1358 &schema.privileges,
1359 dropped_roles,
1360 &mut dependent_objects,
1361 &schema_id,
1362 &catalog,
1363 );
1364 }
1365 }
1366 for cluster in self.catalog.clusters() {
1367 let cluster_id = SystemObjectId::Object(cluster.id().into());
1368 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1369 let object_description =
1370 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1371 dependent_objects
1372 .entry(role_name.to_string())
1373 .or_default()
1374 .push(format!("owner of {object_description}"));
1375 }
1376 privilege_check(
1377 &cluster.privileges,
1378 dropped_roles,
1379 &mut dependent_objects,
1380 &cluster_id,
1381 &catalog,
1382 );
1383 for replica in cluster.replicas() {
1384 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1385 let replica_id =
1386 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1387 let object_description =
1388 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1389 dependent_objects
1390 .entry(role_name.to_string())
1391 .or_default()
1392 .push(format!("owner of {object_description}"));
1393 }
1394 }
1395 }
1396 privilege_check(
1397 self.catalog().system_privileges(),
1398 dropped_roles,
1399 &mut dependent_objects,
1400 &SystemObjectId::System,
1401 &catalog,
1402 );
1403 for (default_privilege_object, default_privilege_acl_items) in
1404 self.catalog.default_privileges()
1405 {
1406 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1407 dependent_objects
1408 .entry(role_name.to_string())
1409 .or_default()
1410 .push(format!(
1411 "default privileges on {}S created by {}",
1412 default_privilege_object.object_type, role_name
1413 ));
1414 }
1415 for default_privilege_acl_item in default_privilege_acl_items {
1416 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1417 dependent_objects
1418 .entry(role_name.to_string())
1419 .or_default()
1420 .push(format!(
1421 "default privileges on {}S granted to {}",
1422 default_privilege_object.object_type, role_name
1423 ));
1424 }
1425 }
1426 }
1427
1428 if !dependent_objects.is_empty() {
1429 Err(AdapterError::DependentObject(dependent_objects))
1430 } else {
1431 Ok(())
1432 }
1433 }
1434
1435 #[instrument]
1436 pub(super) async fn sequence_drop_owned(
1437 &mut self,
1438 session: &Session,
1439 plan: plan::DropOwnedPlan,
1440 ) -> Result<ExecuteResponse, AdapterError> {
1441 for role_id in &plan.role_ids {
1442 self.catalog().ensure_not_reserved_role(role_id)?;
1443 }
1444
1445 let mut privilege_revokes = plan.privilege_revokes;
1446
1447 let session_catalog = self.catalog().for_session(session);
1449 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1450 && !session.is_superuser()
1451 {
1452 let role_membership =
1454 session_catalog.collect_role_membership(session.current_role_id());
1455 let invalid_revokes: BTreeSet<_> = privilege_revokes
1456 .extract_if(.., |(_, privilege)| {
1457 !role_membership.contains(&privilege.grantor)
1458 })
1459 .map(|(object_id, _)| object_id)
1460 .collect();
1461 for invalid_revoke in invalid_revokes {
1462 let object_description =
1463 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1464 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1465 }
1466 }
1467
1468 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1469 catalog::Op::UpdatePrivilege {
1470 target_id: object_id,
1471 privilege,
1472 variant: UpdatePrivilegeVariant::Revoke,
1473 }
1474 });
1475 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1476 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1477 privilege_object,
1478 privilege_acl_item,
1479 variant: UpdatePrivilegeVariant::Revoke,
1480 },
1481 );
1482 let DropOps {
1483 ops: drop_ops,
1484 dropped_active_db,
1485 dropped_active_cluster,
1486 dropped_in_use_indexes,
1487 } = self.sequence_drop_common(session, plan.drop_ids)?;
1488
1489 let ops = privilege_revoke_ops
1490 .chain(default_privilege_revoke_ops)
1491 .chain(drop_ops.into_iter())
1492 .collect();
1493
1494 self.catalog_transact(Some(session), ops).await?;
1495
1496 if dropped_active_db {
1497 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1498 name: session.vars().database().to_string(),
1499 });
1500 }
1501 if dropped_active_cluster {
1502 session.add_notice(AdapterNotice::DroppedActiveCluster {
1503 name: session.vars().cluster().to_string(),
1504 });
1505 }
1506 for dropped_in_use_index in dropped_in_use_indexes {
1507 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1508 }
1509 Ok(ExecuteResponse::DroppedOwned)
1510 }
1511
1512 fn sequence_drop_common(
1513 &self,
1514 session: &Session,
1515 ids: Vec<ObjectId>,
1516 ) -> Result<DropOps, AdapterError> {
1517 let mut dropped_active_db = false;
1518 let mut dropped_active_cluster = false;
1519 let mut dropped_in_use_indexes = Vec::new();
1520 let mut dropped_roles = BTreeMap::new();
1521 let mut dropped_databases = BTreeSet::new();
1522 let mut dropped_schemas = BTreeSet::new();
1523 let mut role_revokes = BTreeSet::new();
1527 let mut default_privilege_revokes = BTreeSet::new();
1530
1531 let mut clusters_to_drop = BTreeSet::new();
1533
1534 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1535 for id in &ids {
1536 match id {
1537 ObjectId::Database(id) => {
1538 let name = self.catalog().get_database(id).name();
1539 if name == session.vars().database() {
1540 dropped_active_db = true;
1541 }
1542 dropped_databases.insert(id);
1543 }
1544 ObjectId::Schema((_, spec)) => {
1545 if let SchemaSpecifier::Id(id) = spec {
1546 dropped_schemas.insert(id);
1547 }
1548 }
1549 ObjectId::Cluster(id) => {
1550 clusters_to_drop.insert(*id);
1551 if let Some(active_id) = self
1552 .catalog()
1553 .active_cluster(session)
1554 .ok()
1555 .map(|cluster| cluster.id())
1556 {
1557 if id == &active_id {
1558 dropped_active_cluster = true;
1559 }
1560 }
1561 }
1562 ObjectId::Role(id) => {
1563 let role = self.catalog().get_role(id);
1564 let name = role.name();
1565 dropped_roles.insert(*id, name);
1566 for (group_id, grantor_id) in &role.membership.map {
1568 role_revokes.insert((*group_id, *id, *grantor_id));
1569 }
1570 }
1571 ObjectId::Item(id) => {
1572 if let Some(index) = self.catalog().get_entry(id).index() {
1573 let humanizer = self.catalog().for_session(session);
1574 let dependants = self
1575 .controller
1576 .compute
1577 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1578 .ok()
1579 .into_iter()
1580 .flatten()
1581 .filter(|dependant_id| {
1582 if dependant_id.is_transient() {
1589 return false;
1590 }
1591 let Some(dependent_id) = humanizer
1593 .try_get_item_by_global_id(dependant_id)
1594 .map(|item| item.id())
1595 else {
1596 return false;
1597 };
1598 !ids_set.contains(&ObjectId::Item(dependent_id))
1601 })
1602 .flat_map(|dependant_id| {
1603 humanizer.humanize_id(dependant_id)
1607 })
1608 .collect_vec();
1609 if !dependants.is_empty() {
1610 dropped_in_use_indexes.push(DroppedInUseIndex {
1611 index_name: humanizer
1612 .humanize_id(index.global_id())
1613 .unwrap_or_else(|| id.to_string()),
1614 dependant_objects: dependants,
1615 });
1616 }
1617 }
1618 }
1619 _ => {}
1620 }
1621 }
1622
1623 for id in &ids {
1624 match id {
1625 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1629 if !clusters_to_drop.contains(cluster_id) {
1630 let cluster = self.catalog.get_cluster(*cluster_id);
1631 if cluster.is_managed() {
1632 let replica =
1633 cluster.replica(*replica_id).expect("Catalog out of sync");
1634 if !replica.config.location.internal() {
1635 coord_bail!("cannot drop replica of managed cluster");
1636 }
1637 }
1638 }
1639 }
1640 _ => {}
1641 }
1642 }
1643
1644 for role_id in dropped_roles.keys() {
1645 self.catalog().ensure_not_reserved_role(role_id)?;
1646 }
1647 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1648 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1650 for role in self.catalog().user_roles() {
1651 for dropped_role_id in
1652 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1653 {
1654 role_revokes.insert((
1655 **dropped_role_id,
1656 role.id(),
1657 *role
1658 .membership
1659 .map
1660 .get(*dropped_role_id)
1661 .expect("included in keys above"),
1662 ));
1663 }
1664 }
1665
1666 for (default_privilege_object, default_privilege_acls) in
1667 self.catalog().default_privileges()
1668 {
1669 if matches!(
1670 &default_privilege_object.database_id,
1671 Some(database_id) if dropped_databases.contains(database_id),
1672 ) || matches!(
1673 &default_privilege_object.schema_id,
1674 Some(schema_id) if dropped_schemas.contains(schema_id),
1675 ) {
1676 for default_privilege_acl in default_privilege_acls {
1677 default_privilege_revokes.insert((
1678 default_privilege_object.clone(),
1679 default_privilege_acl.clone(),
1680 ));
1681 }
1682 }
1683 }
1684
1685 let ops = role_revokes
1686 .into_iter()
1687 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1688 role_id,
1689 member_id,
1690 grantor_id,
1691 })
1692 .chain(default_privilege_revokes.into_iter().map(
1693 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1694 privilege_object,
1695 privilege_acl_item,
1696 variant: UpdatePrivilegeVariant::Revoke,
1697 },
1698 ))
1699 .chain(iter::once(catalog::Op::DropObjects(
1700 ids.into_iter()
1701 .map(DropObjectInfo::manual_drop_from_object_id)
1702 .collect(),
1703 )))
1704 .collect();
1705
1706 Ok(DropOps {
1707 ops,
1708 dropped_active_db,
1709 dropped_active_cluster,
1710 dropped_in_use_indexes,
1711 })
1712 }
1713
1714 pub(super) fn sequence_explain_schema(
1715 &self,
1716 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1717 ) -> Result<ExecuteResponse, AdapterError> {
1718 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1719 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1720 })?;
1721
1722 let json_string = json_string(&json_value);
1723 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1724 Ok(Self::send_immediate_rows(row))
1725 }
1726
1727 pub(super) fn sequence_show_all_variables(
1728 &self,
1729 session: &Session,
1730 ) -> Result<ExecuteResponse, AdapterError> {
1731 let mut rows = viewable_variables(self.catalog().state(), session)
1732 .map(|v| (v.name(), v.value(), v.description()))
1733 .collect::<Vec<_>>();
1734 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1735
1736 let rows: Vec<_> = rows
1738 .into_iter()
1739 .map(|(name, val, desc)| {
1740 Row::pack_slice(&[
1741 Datum::String(name),
1742 Datum::String(&val),
1743 Datum::String(desc),
1744 ])
1745 })
1746 .collect();
1747 Ok(Self::send_immediate_rows(rows))
1748 }
1749
1750 pub(super) fn sequence_show_variable(
1751 &self,
1752 session: &Session,
1753 plan: plan::ShowVariablePlan,
1754 ) -> Result<ExecuteResponse, AdapterError> {
1755 if &plan.name == SCHEMA_ALIAS {
1756 let schemas = self.catalog.resolve_search_path(session);
1757 let schema = schemas.first();
1758 return match schema {
1759 Some((database_spec, schema_spec)) => {
1760 let schema_name = &self
1761 .catalog
1762 .get_schema(database_spec, schema_spec, session.conn_id())
1763 .name()
1764 .schema;
1765 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1766 Ok(Self::send_immediate_rows(row))
1767 }
1768 None => {
1769 if session.vars().current_object_missing_warnings() {
1770 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1771 search_path: session
1772 .vars()
1773 .search_path()
1774 .into_iter()
1775 .map(|schema| schema.to_string())
1776 .collect(),
1777 });
1778 }
1779 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1780 }
1781 };
1782 }
1783
1784 let variable = session
1785 .vars()
1786 .get(self.catalog().system_config(), &plan.name)
1787 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1788
1789 variable.visible(session.user(), self.catalog().system_config())?;
1792
1793 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1794 if variable.name() == vars::DATABASE.name()
1795 && matches!(
1796 self.catalog().resolve_database(&variable.value()),
1797 Err(CatalogError::UnknownDatabase(_))
1798 )
1799 && session.vars().current_object_missing_warnings()
1800 {
1801 let name = variable.value();
1802 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1803 } else if variable.name() == vars::CLUSTER.name()
1804 && matches!(
1805 self.catalog().resolve_cluster(&variable.value()),
1806 Err(CatalogError::UnknownCluster(_))
1807 )
1808 && session.vars().current_object_missing_warnings()
1809 {
1810 let name = variable.value();
1811 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1812 }
1813 Ok(Self::send_immediate_rows(row))
1814 }
1815
1816 #[instrument]
1817 pub(super) async fn sequence_inspect_shard(
1818 &self,
1819 session: &Session,
1820 plan: plan::InspectShardPlan,
1821 ) -> Result<ExecuteResponse, AdapterError> {
1822 if !session.user().is_internal() {
1825 return Err(AdapterError::Unauthorized(
1826 rbac::UnauthorizedError::MzSystem {
1827 action: "inspect".into(),
1828 },
1829 ));
1830 }
1831 let state = self
1832 .controller
1833 .storage
1834 .inspect_persist_state(plan.id)
1835 .await?;
1836 let jsonb = Jsonb::from_serde_json(state)?;
1837 Ok(Self::send_immediate_rows(jsonb.into_row()))
1838 }
1839
1840 #[instrument]
1841 pub(super) fn sequence_set_variable(
1842 &self,
1843 session: &mut Session,
1844 plan: plan::SetVariablePlan,
1845 ) -> Result<ExecuteResponse, AdapterError> {
1846 let (name, local) = (plan.name, plan.local);
1847 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1848 self.validate_set_isolation_level(session)?;
1849 }
1850 if &name == vars::CLUSTER.name() {
1851 self.validate_set_cluster(session)?;
1852 }
1853
1854 let vars = session.vars_mut();
1855 let values = match plan.value {
1856 plan::VariableValue::Default => None,
1857 plan::VariableValue::Values(values) => Some(values),
1858 };
1859
1860 match values {
1861 Some(values) => {
1862 vars.set(
1863 self.catalog().system_config(),
1864 &name,
1865 VarInput::SqlSet(&values),
1866 local,
1867 )?;
1868
1869 let vars = session.vars();
1870
1871 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1874 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1875 } else if name == vars::CLUSTER.name()
1876 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1877 {
1878 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1879 }
1880
1881 if name.as_str() == vars::DATABASE.name()
1883 && matches!(
1884 self.catalog().resolve_database(vars.database()),
1885 Err(CatalogError::UnknownDatabase(_))
1886 )
1887 && session.vars().current_object_missing_warnings()
1888 {
1889 let name = vars.database().to_string();
1890 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1891 } else if name.as_str() == vars::CLUSTER.name()
1892 && matches!(
1893 self.catalog().resolve_cluster(vars.cluster()),
1894 Err(CatalogError::UnknownCluster(_))
1895 )
1896 && session.vars().current_object_missing_warnings()
1897 {
1898 let name = vars.cluster().to_string();
1899 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1900 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1901 let v = values.into_first().to_lowercase();
1902 if v == IsolationLevel::ReadUncommitted.as_str()
1903 || v == IsolationLevel::ReadCommitted.as_str()
1904 || v == IsolationLevel::RepeatableRead.as_str()
1905 {
1906 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1907 isolation_level: v,
1908 });
1909 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1910 session.add_notice(AdapterNotice::StrongSessionSerializable);
1911 }
1912 }
1913 }
1914 None => vars.reset(self.catalog().system_config(), &name, local)?,
1915 }
1916
1917 Ok(ExecuteResponse::SetVariable { name, reset: false })
1918 }
1919
1920 pub(super) fn sequence_reset_variable(
1921 &self,
1922 session: &mut Session,
1923 plan: plan::ResetVariablePlan,
1924 ) -> Result<ExecuteResponse, AdapterError> {
1925 let name = plan.name;
1926 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1927 self.validate_set_isolation_level(session)?;
1928 }
1929 if &name == vars::CLUSTER.name() {
1930 self.validate_set_cluster(session)?;
1931 }
1932 session
1933 .vars_mut()
1934 .reset(self.catalog().system_config(), &name, false)?;
1935 Ok(ExecuteResponse::SetVariable { name, reset: true })
1936 }
1937
1938 pub(super) fn sequence_set_transaction(
1939 &self,
1940 session: &mut Session,
1941 plan: plan::SetTransactionPlan,
1942 ) -> Result<ExecuteResponse, AdapterError> {
1943 for mode in plan.modes {
1945 match mode {
1946 TransactionMode::AccessMode(_) => {
1947 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1948 }
1949 TransactionMode::IsolationLevel(isolation_level) => {
1950 self.validate_set_isolation_level(session)?;
1951
1952 session.vars_mut().set(
1953 self.catalog().system_config(),
1954 TRANSACTION_ISOLATION_VAR_NAME,
1955 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1956 plan.local,
1957 )?
1958 }
1959 }
1960 }
1961 Ok(ExecuteResponse::SetVariable {
1962 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1963 reset: false,
1964 })
1965 }
1966
1967 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1968 if session.transaction().contains_ops() {
1969 Err(AdapterError::InvalidSetIsolationLevel)
1970 } else {
1971 Ok(())
1972 }
1973 }
1974
1975 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1976 if session.transaction().contains_ops() {
1977 Err(AdapterError::InvalidSetCluster)
1978 } else {
1979 Ok(())
1980 }
1981 }
1982
1983 #[instrument]
1984 pub(super) async fn sequence_end_transaction(
1985 &mut self,
1986 mut ctx: ExecuteContext,
1987 mut action: EndTransactionAction,
1988 ) {
1989 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1991 (&action, ctx.session().transaction())
1992 {
1993 action = EndTransactionAction::Rollback;
1994 }
1995 let response = match action {
1996 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
1997 params: BTreeMap::new(),
1998 }),
1999 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2000 params: BTreeMap::new(),
2001 }),
2002 };
2003
2004 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2005
2006 let (response, action) = match result {
2007 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2008 (response, action)
2009 }
2010 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2011 let validated_locks = match write_lock_guards {
2015 None => None,
2016 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2017 Ok(locks) => Some(locks),
2018 Err(missing) => {
2019 tracing::error!(?missing, "programming error, missing write locks");
2020 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2021 }
2022 },
2023 };
2024
2025 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2026 for WriteOp { id, rows } in writes {
2027 let total_rows = collected_writes.entry(id).or_default();
2028 total_rows.push(rows);
2029 }
2030
2031 self.submit_write(PendingWriteTxn::User {
2032 span: Span::current(),
2033 writes: collected_writes,
2034 write_locks: validated_locks,
2035 pending_txn: PendingTxn {
2036 ctx,
2037 response,
2038 action,
2039 },
2040 });
2041 return;
2042 }
2043 Ok((
2044 Some(TransactionOps::Peeks {
2045 determination,
2046 requires_linearization: RequireLinearization::Required,
2047 ..
2048 }),
2049 _,
2050 )) if ctx.session().vars().transaction_isolation()
2051 == &IsolationLevel::StrictSerializable =>
2052 {
2053 let conn_id = ctx.session().conn_id().clone();
2054 let pending_read_txn = PendingReadTxn {
2055 txn: PendingRead::Read {
2056 txn: PendingTxn {
2057 ctx,
2058 response,
2059 action,
2060 },
2061 },
2062 timestamp_context: determination.timestamp_context,
2063 created: Instant::now(),
2064 num_requeues: 0,
2065 otel_ctx: OpenTelemetryContext::obtain(),
2066 };
2067 self.strict_serializable_reads_tx
2068 .send((conn_id, pending_read_txn))
2069 .expect("sending to strict_serializable_reads_tx cannot fail");
2070 return;
2071 }
2072 Ok((
2073 Some(TransactionOps::Peeks {
2074 determination,
2075 requires_linearization: RequireLinearization::Required,
2076 ..
2077 }),
2078 _,
2079 )) if ctx.session().vars().transaction_isolation()
2080 == &IsolationLevel::StrongSessionSerializable =>
2081 {
2082 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2083 ctx.session_mut()
2084 .ensure_timestamp_oracle(timeline.clone())
2085 .apply_write(*ts);
2086 }
2087 (response, action)
2088 }
2089 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2090 self.internal_cmd_tx
2091 .send(Message::ExecuteSingleStatementTransaction {
2092 ctx,
2093 otel_ctx: OpenTelemetryContext::obtain(),
2094 stmt,
2095 params,
2096 })
2097 .expect("must send");
2098 return;
2099 }
2100 Ok((_, _)) => (response, action),
2101 Err(err) => (Err(err), EndTransactionAction::Rollback),
2102 };
2103 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2104 let response = response.map(|mut r| {
2106 r.extend_params(changed);
2107 ExecuteResponse::from(r)
2108 });
2109
2110 ctx.retire(response);
2111 }
2112
2113 #[instrument]
2114 async fn sequence_end_transaction_inner(
2115 &mut self,
2116 ctx: &mut ExecuteContext,
2117 action: EndTransactionAction,
2118 ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2119 let txn = self.clear_transaction(ctx.session_mut()).await;
2120
2121 if let EndTransactionAction::Commit = action {
2122 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2123 match &mut ops {
2124 TransactionOps::Writes(writes) => {
2125 for WriteOp { id, .. } in &mut writes.iter() {
2126 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2128 AdapterError::Catalog(mz_catalog::memory::error::Error {
2129 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2130 })
2131 })?;
2132 }
2133
2134 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2136 }
2137 TransactionOps::DDL {
2138 ops,
2139 state: _,
2140 side_effects,
2141 revision,
2142 snapshot: _,
2143 } => {
2144 if *revision != self.catalog().transient_revision() {
2146 return Err(AdapterError::DDLTransactionRace);
2147 }
2148 let ops = std::mem::take(ops);
2150 let side_effects = std::mem::take(side_effects);
2151 self.catalog_transact_with_side_effects(
2152 Some(ctx),
2153 ops,
2154 move |a, mut ctx| {
2155 Box::pin(async move {
2156 for side_effect in side_effects {
2157 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2158 }
2159 })
2160 },
2161 )
2162 .await?;
2163 }
2164 _ => (),
2165 }
2166 return Ok((Some(ops), write_lock_guards));
2167 }
2168 }
2169
2170 Ok((None, None))
2171 }
2172
2173 pub(super) async fn sequence_side_effecting_func(
2174 &mut self,
2175 ctx: ExecuteContext,
2176 plan: SideEffectingFunc,
2177 ) {
2178 match plan {
2179 SideEffectingFunc::PgCancelBackend { connection_id } => {
2180 if ctx.session().conn_id().unhandled() == connection_id {
2181 ctx.retire(Err(AdapterError::Canceled));
2185 return;
2186 }
2187
2188 let res = if let Some((id_handle, _conn_meta)) =
2189 self.active_conns.get_key_value(&connection_id)
2190 {
2191 self.handle_privileged_cancel(id_handle.clone()).await;
2193 Datum::True
2194 } else {
2195 Datum::False
2196 };
2197 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2198 }
2199 }
2200 }
2201
2202 pub(crate) async fn execute_side_effecting_func(
2211 &mut self,
2212 plan: SideEffectingFunc,
2213 conn_id: ConnectionId,
2214 current_role: RoleId,
2215 ) -> Result<ExecuteResponse, AdapterError> {
2216 match plan {
2217 SideEffectingFunc::PgCancelBackend { connection_id } => {
2218 if conn_id.unhandled() == connection_id {
2219 return Err(AdapterError::Canceled);
2223 }
2224
2225 if let Some((_id_handle, conn_meta)) =
2228 self.active_conns.get_key_value(&connection_id)
2229 {
2230 let target_role = *conn_meta.authenticated_role_id();
2231 let role_membership = self
2232 .catalog()
2233 .state()
2234 .collect_role_membership(¤t_role);
2235 if !role_membership.contains(&target_role) {
2236 let target_role_name = self
2237 .catalog()
2238 .try_get_role(&target_role)
2239 .map(|role| role.name().to_string())
2240 .unwrap_or_else(|| target_role.to_string());
2241 return Err(AdapterError::Unauthorized(
2242 rbac::UnauthorizedError::RoleMembership {
2243 role_names: vec![target_role_name],
2244 },
2245 ));
2246 }
2247
2248 let id_handle = self
2250 .active_conns
2251 .get_key_value(&connection_id)
2252 .map(|(id, _)| id.clone())
2253 .expect("checked above");
2254 self.handle_privileged_cancel(id_handle).await;
2255 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2256 } else {
2257 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2259 }
2260 }
2261 }
2262 }
2263
2264 pub(crate) async fn determine_real_time_recent_timestamp(
2268 &self,
2269 source_ids: impl Iterator<Item = GlobalId>,
2270 real_time_recency_timeout: Duration,
2271 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2272 let item_ids = source_ids
2273 .map(|gid| {
2274 self.catalog
2275 .try_resolve_item_id(&gid)
2276 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2277 })
2278 .collect::<Result<Vec<_>, _>>()?;
2279
2280 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2286 if to_visit.is_empty() {
2289 return Ok(None);
2290 }
2291
2292 let mut timestamp_objects = BTreeSet::new();
2293
2294 while let Some(id) = to_visit.pop_front() {
2295 timestamp_objects.insert(id);
2296 to_visit.extend(
2297 self.catalog()
2298 .get_entry(&id)
2299 .uses()
2300 .into_iter()
2301 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2302 );
2303 }
2304 let timestamp_objects = timestamp_objects
2305 .into_iter()
2306 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2307 .collect();
2308
2309 let r = self
2310 .controller
2311 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2312 .await?;
2313
2314 Ok(Some(r))
2315 }
2316
2317 pub(crate) async fn await_real_time_recent_timestamp<F>(
2318 catalog: Arc<Catalog>,
2319 fut: F,
2320 ) -> Result<Timestamp, AdapterError>
2321 where
2322 F: Future<Output = Result<Timestamp, StorageError>>,
2323 {
2324 fut.await
2325 .map_err(|error| Self::real_time_recent_timestamp_error(&catalog, error))
2326 }
2327
2328 fn real_time_recent_timestamp_error(catalog: &Catalog, error: StorageError) -> AdapterError {
2329 let rtr_name = |id: &GlobalId| {
2330 catalog
2331 .try_get_entry_by_global_id(id)
2332 .map(|e| e.name().item.clone())
2333 .unwrap_or_else(|| id.to_string())
2334 };
2335
2336 match error {
2337 StorageError::RtrTimeout(id) => AdapterError::RtrTimeout(rtr_name(&id)),
2338 StorageError::RtrDropFailure(id) => AdapterError::RtrDropFailure(rtr_name(&id)),
2339 error => error.into(),
2340 }
2341 }
2342
2343 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2346 &self,
2347 session: &Session,
2348 source_ids: impl Iterator<Item = GlobalId>,
2349 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2350 let vars = session.vars();
2351
2352 if vars.real_time_recency()
2353 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2354 && !session.contains_read_timestamp()
2355 {
2356 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2357 .await
2358 } else {
2359 Ok(None)
2360 }
2361 }
2362
2363 #[instrument]
2364 pub(super) async fn sequence_explain_plan(
2365 &mut self,
2366 ctx: ExecuteContext,
2367 plan: plan::ExplainPlanPlan,
2368 target_cluster: TargetCluster,
2369 ) {
2370 match &plan.explainee {
2371 plan::Explainee::Statement(stmt) => match stmt {
2372 plan::ExplaineeStatement::CreateView { .. } => {
2373 self.explain_create_view(ctx, plan).await;
2374 }
2375 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2376 self.explain_create_materialized_view(ctx, plan).await;
2377 }
2378 plan::ExplaineeStatement::CreateIndex { .. } => {
2379 self.explain_create_index(ctx, plan).await;
2380 }
2381 plan::ExplaineeStatement::Select { .. } => {
2382 self.explain_peek(ctx, plan, target_cluster).await;
2383 }
2384 plan::ExplaineeStatement::Subscribe { .. } => {
2385 self.explain_subscribe(ctx, plan, target_cluster).await;
2386 }
2387 },
2388 plan::Explainee::View(_) => {
2389 let result = self.explain_view(&ctx, plan);
2390 ctx.retire(result);
2391 }
2392 plan::Explainee::MaterializedView(_) => {
2393 let result = self.explain_materialized_view(&ctx, plan);
2394 ctx.retire(result);
2395 }
2396 plan::Explainee::Index(_) => {
2397 let result = self.explain_index(&ctx, plan);
2398 ctx.retire(result);
2399 }
2400 plan::Explainee::ReplanView(_) => {
2401 self.explain_replan_view(ctx, plan).await;
2402 }
2403 plan::Explainee::ReplanMaterializedView(_) => {
2404 self.explain_replan_materialized_view(ctx, plan).await;
2405 }
2406 plan::Explainee::ReplanIndex(_) => {
2407 self.explain_replan_index(ctx, plan).await;
2408 }
2409 };
2410 }
2411
2412 pub(super) async fn sequence_explain_pushdown(
2413 &mut self,
2414 ctx: ExecuteContext,
2415 plan: plan::ExplainPushdownPlan,
2416 target_cluster: TargetCluster,
2417 ) {
2418 match plan.explainee {
2419 Explainee::Statement(ExplaineeStatement::Select {
2420 broken: false,
2421 plan,
2422 desc: _,
2423 }) => {
2424 let stage = return_if_err!(
2425 self.peek_validate(
2426 ctx.session(),
2427 plan,
2428 target_cluster,
2429 None,
2430 ExplainContext::Pushdown,
2431 Some(ctx.session().vars().max_query_result_size()),
2432 ),
2433 ctx
2434 );
2435 self.sequence_staged(ctx, Span::current(), stage).await;
2436 }
2437 Explainee::MaterializedView(item_id) => {
2438 self.explain_pushdown_materialized_view(ctx, item_id).await;
2439 }
2440 _ => {
2441 ctx.retire(Err(AdapterError::Unsupported(
2442 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2443 )));
2444 }
2445 };
2446 }
2447
2448 async fn execute_explain_pushdown_with_read_holds(
2450 &self,
2451 ctx: ExecuteContext,
2452 as_of: Antichain<Timestamp>,
2453 mz_now: ResultSpec<'static>,
2454 read_holds: Option<ReadHolds>,
2455 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2456 ) {
2457 let fut = self
2458 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2459 .await;
2460 task::spawn(|| "render explain pushdown", async move {
2461 let _read_holds = read_holds;
2463 let res = fut.await;
2464 ctx.retire(res);
2465 });
2466 }
2467
2468 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2470 &self,
2471 session: &Session,
2472 as_of: Antichain<Timestamp>,
2473 mz_now: ResultSpec<'static>,
2474 imports: I,
2475 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2476 super::explain_pushdown_future_inner(
2478 session,
2479 &self.catalog,
2480 &self.controller.storage_collections,
2481 as_of,
2482 mz_now,
2483 imports,
2484 )
2485 .await
2486 }
2487
2488 #[instrument]
2489 pub(super) async fn sequence_insert(
2490 &mut self,
2491 mut ctx: ExecuteContext,
2492 plan: plan::InsertPlan,
2493 ) {
2494 if !ctx.session_mut().transaction().allows_writes() {
2502 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2503 return;
2504 }
2505
2506 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2520 let expr = return_if_err!(
2523 plan.values
2524 .clone()
2525 .lower(self.catalog().system_config(), None),
2526 ctx
2527 );
2528 OptimizedMirRelationExpr(expr)
2529 } else {
2530 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2532
2533 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2535
2536 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2538 };
2539
2540 match optimized_mir.into_inner() {
2541 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2542 let catalog = self.owned_catalog();
2543 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2544 let result =
2545 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2546 ctx.retire(result);
2547 });
2548 }
2549 _ => {
2551 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2552 Some(table) => {
2553 let desc = table.relation_desc_latest().expect("table has a desc");
2555 desc.arity()
2556 }
2557 None => {
2558 ctx.retire(Err(AdapterError::Catalog(
2559 mz_catalog::memory::error::Error {
2560 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2561 plan.id.to_string(),
2562 )),
2563 },
2564 )));
2565 return;
2566 }
2567 };
2568
2569 let finishing = RowSetFinishing {
2570 order_by: vec![],
2571 limit: None,
2572 offset: 0,
2573 project: (0..desc_arity).collect(),
2574 };
2575
2576 let read_then_write_plan = plan::ReadThenWritePlan {
2577 id: plan.id,
2578 selection: plan.values,
2579 finishing,
2580 assignments: BTreeMap::new(),
2581 kind: MutationKind::Insert,
2582 returning: plan.returning,
2583 };
2584
2585 self.sequence_read_then_write(ctx, read_then_write_plan)
2586 .await;
2587 }
2588 }
2589 }
2590
2591 #[instrument]
2596 pub(super) async fn sequence_read_then_write(
2597 &mut self,
2598 mut ctx: ExecuteContext,
2599 plan: plan::ReadThenWritePlan,
2600 ) {
2601 let mut source_ids: BTreeSet<_> = plan
2602 .selection
2603 .depends_on()
2604 .into_iter()
2605 .map(|gid| self.catalog().resolve_item_id(&gid))
2606 .collect();
2607 source_ids.insert(plan.id);
2608
2609 if ctx.session().transaction().write_locks().is_none() {
2611 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2613
2614 for id in &source_ids {
2616 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2617 write_locks.insert_lock(*id, lock);
2618 }
2619 }
2620
2621 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2623 Ok(locks) => locks,
2624 Err(missing) => {
2625 let role_metadata = ctx.session().role_metadata().clone();
2627 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2628 let plan = DeferredPlan {
2629 ctx,
2630 plan: Plan::ReadThenWrite(plan),
2631 validity: PlanValidity::new(
2632 self.catalog.transient_revision(),
2633 source_ids.clone(),
2634 None,
2635 None,
2636 role_metadata,
2637 ),
2638 requires_locks: source_ids,
2639 };
2640 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2641 }
2642 };
2643
2644 ctx.session_mut()
2645 .try_grant_write_locks(write_locks)
2646 .expect("session has already been granted write locks");
2647 }
2648
2649 let plan::ReadThenWritePlan {
2650 id,
2651 kind,
2652 selection,
2653 mut assignments,
2654 finishing,
2655 mut returning,
2656 } = plan;
2657
2658 let desc = match self.catalog().try_get_entry(&id) {
2660 Some(table) => {
2661 table
2663 .relation_desc_latest()
2664 .expect("table has a desc")
2665 .into_owned()
2666 }
2667 None => {
2668 ctx.retire(Err(AdapterError::Catalog(
2669 mz_catalog::memory::error::Error {
2670 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2671 },
2672 )));
2673 return;
2674 }
2675 };
2676
2677 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2679 || assignments.values().any(|e| e.contains_temporal())
2680 || returning.iter().any(|e| e.contains_temporal());
2681 if contains_temporal {
2682 ctx.retire(Err(AdapterError::Unsupported(
2683 "calls to mz_now in write statements",
2684 )));
2685 return;
2686 }
2687
2688 for gid in selection.depends_on() {
2690 let item_id = self.catalog().resolve_item_id(&gid);
2691 if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2692 ctx.retire(Err(err));
2693 return;
2694 }
2695 }
2696
2697 let (peek_tx, peek_rx) = oneshot::channel();
2698 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2699 let (tx, _, session, extra) = ctx.into_parts();
2700 let peek_ctx = ExecuteContext::from_parts(
2712 peek_client_tx,
2713 self.internal_cmd_tx.clone(),
2714 session,
2715 Default::default(),
2716 );
2717
2718 self.sequence_peek(
2719 peek_ctx,
2720 plan::SelectPlan {
2721 select: None,
2722 source: selection,
2723 when: QueryWhen::FreshestTableWrite,
2724 finishing,
2725 copy_to: None,
2726 },
2727 TargetCluster::Active,
2728 None,
2729 )
2730 .await;
2731
2732 let internal_cmd_tx = self.internal_cmd_tx.clone();
2733 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2734 let catalog = self.owned_catalog();
2735 let max_result_size = self.catalog().system_config().max_result_size();
2736
2737 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2738 let (peek_response, session) = match peek_rx.await {
2739 Ok(Response {
2740 result: Ok(resp),
2741 session,
2742 otel_ctx,
2743 }) => {
2744 otel_ctx.attach_as_parent();
2745 (resp, session)
2746 }
2747 Ok(Response {
2748 result: Err(e),
2749 session,
2750 otel_ctx,
2751 }) => {
2752 let ctx =
2753 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2754 otel_ctx.attach_as_parent();
2755 ctx.retire(Err(e));
2756 return;
2757 }
2758 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2760 };
2761 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2762 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2763
2764 if timeout_dur == Duration::ZERO {
2766 timeout_dur = Duration::MAX;
2767 }
2768
2769 let style = ExprPrepOneShot {
2770 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2772 catalog_state: catalog.state(),
2773 };
2774 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2775 return_if_err!(style.prep_scalar_expr(expr), ctx);
2776 }
2777
2778 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2779 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2780 let arena = RowArena::new();
2781 let mut diffs = Vec::new();
2782 let mut datum_vec = mz_repr::DatumVec::new();
2783
2784 while let Some(row) = rows.next() {
2785 if !assignments.is_empty() {
2786 assert!(
2787 matches!(kind, MutationKind::Update),
2788 "only updates support assignments"
2789 );
2790 let mut datums = datum_vec.borrow_with(row);
2791 let mut updates = vec![];
2792 for (idx, expr) in &assignments {
2793 let updated = match expr.eval(&datums, &arena) {
2794 Ok(updated) => updated,
2795 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2796 };
2797 updates.push((*idx, updated));
2798 }
2799 for (idx, new_value) in updates {
2800 datums[idx] = new_value;
2801 }
2802 let updated = Row::pack_slice(&datums);
2803 diffs.push((updated, Diff::ONE));
2804 }
2805 match kind {
2806 MutationKind::Update | MutationKind::Delete => {
2810 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2811 }
2812 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2813 }
2814 }
2815
2816 let mut byte_size: u64 = 0;
2819 for (row, diff) in &diffs {
2820 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2821 if diff.is_positive() {
2822 for (idx, datum) in row.iter().enumerate() {
2823 desc.constraints_met(idx, &datum)?;
2824 }
2825 }
2826 }
2827 Ok((diffs, byte_size))
2828 };
2829
2830 let diffs = match peek_response {
2831 ExecuteResponse::SendingRowsStreaming {
2832 rows: mut rows_stream,
2833 ..
2834 } => {
2835 let mut byte_size: u64 = 0;
2836 let mut diffs = Vec::new();
2837 let result = loop {
2838 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2839 Ok(Some(res)) => match res {
2840 PeekResponseUnary::Rows(new_rows) => {
2841 match make_diffs(new_rows) {
2842 Ok((mut new_diffs, new_byte_size)) => {
2843 byte_size = byte_size.saturating_add(new_byte_size);
2844 if byte_size > max_result_size {
2845 break Err(AdapterError::ResultSize(format!(
2846 "result exceeds max size of {max_result_size}"
2847 )));
2848 }
2849 diffs.append(&mut new_diffs)
2850 }
2851 Err(e) => break Err(e),
2852 };
2853 }
2854 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2855 PeekResponseUnary::Error(e) => {
2856 break Err(AdapterError::Unstructured(anyhow!(e)));
2857 }
2858 PeekResponseUnary::DependencyDropped(dep) => {
2859 break Err(dep.to_concurrent_dependency_drop());
2860 }
2861 },
2862 Ok(None) => break Ok(diffs),
2863 Err(_) => {
2864 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2869 conn_id: ctx.session().conn_id().clone(),
2870 });
2871 if let Err(e) = result {
2872 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2873 }
2874 break Err(AdapterError::StatementTimeout);
2875 }
2876 }
2877 };
2878
2879 result
2880 }
2881 ExecuteResponse::SendingRowsImmediate { rows } => {
2882 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2883 }
2884 resp => Err(AdapterError::Unstructured(anyhow!(
2885 "unexpected peek response: {resp:?}"
2886 ))),
2887 };
2888
2889 let mut returning_rows = Vec::new();
2890 let mut diff_err: Option<AdapterError> = None;
2891 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2892 let arena = RowArena::new();
2893 for (row, diff) in diffs {
2894 if !diff.is_positive() {
2895 continue;
2896 }
2897 let mut returning_row = Row::with_capacity(returning.len());
2898 let mut packer = returning_row.packer();
2899 for expr in &returning {
2900 let datums: Vec<_> = row.iter().collect();
2901 match expr.eval(&datums, &arena) {
2902 Ok(datum) => {
2903 packer.push(datum);
2904 }
2905 Err(err) => {
2906 diff_err = Some(err.into());
2907 break;
2908 }
2909 }
2910 }
2911 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2912 let diff = match NonZeroUsize::try_from(diff) {
2913 Ok(diff) => diff,
2914 Err(err) => {
2915 diff_err = Some(err.into());
2916 break;
2917 }
2918 };
2919 returning_rows.push((returning_row, diff));
2920 if diff_err.is_some() {
2921 break;
2922 }
2923 }
2924 }
2925 let diffs = if let Some(err) = diff_err {
2926 Err(err)
2927 } else {
2928 diffs
2929 };
2930
2931 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2934 if let Some(timestamp_context) = timestamp_context {
2943 let (tx, rx) = tokio::sync::oneshot::channel();
2944 let conn_id = ctx.session().conn_id().clone();
2945 let pending_read_txn = PendingReadTxn {
2946 txn: PendingRead::ReadThenWrite { ctx, tx },
2947 timestamp_context,
2948 created: Instant::now(),
2949 num_requeues: 0,
2950 otel_ctx: OpenTelemetryContext::obtain(),
2951 };
2952 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2953 if let Err(e) = result {
2955 warn!(
2956 "strict_serializable_reads_tx dropped before we could send: {:?}",
2957 e
2958 );
2959 return;
2960 }
2961 let result = rx.await;
2962 ctx = match result {
2964 Ok(Some(ctx)) => ctx,
2965 Ok(None) => {
2966 return;
2969 }
2970 Err(e) => {
2971 warn!(
2972 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2973 e
2974 );
2975 return;
2976 }
2977 };
2978 }
2979
2980 match diffs {
2981 Ok(diffs) => {
2982 let result = Self::send_diffs(
2983 ctx.session_mut(),
2984 plan::SendDiffsPlan {
2985 id,
2986 updates: diffs,
2987 kind,
2988 returning: returning_rows,
2989 max_result_size,
2990 },
2991 );
2992 ctx.retire(result);
2993 }
2994 Err(e) => {
2995 ctx.retire(Err(e));
2996 }
2997 }
2998 });
2999 }
3000
3001 #[instrument]
3002 pub(super) async fn sequence_alter_item_rename(
3003 &mut self,
3004 ctx: &mut ExecuteContext,
3005 plan: plan::AlterItemRenamePlan,
3006 ) -> Result<ExecuteResponse, AdapterError> {
3007 let op = catalog::Op::RenameItem {
3008 id: plan.id,
3009 current_full_name: plan.current_full_name,
3010 to_name: plan.to_name,
3011 };
3012 match self
3013 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3014 .await
3015 {
3016 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3017 Err(err) => Err(err),
3018 }
3019 }
3020
3021 #[instrument]
3022 pub(super) async fn sequence_alter_retain_history(
3023 &mut self,
3024 ctx: &mut ExecuteContext,
3025 plan: plan::AlterRetainHistoryPlan,
3026 ) -> Result<ExecuteResponse, AdapterError> {
3027 let ops = vec![catalog::Op::AlterRetainHistory {
3028 id: plan.id,
3029 value: plan.value,
3030 window: plan.window,
3031 }];
3032 self.catalog_transact_with_context(None, Some(ctx), ops)
3033 .await?;
3034 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3035 }
3036
3037 #[instrument]
3038 pub(super) async fn sequence_alter_source_timestamp_interval(
3039 &mut self,
3040 ctx: &mut ExecuteContext,
3041 plan: plan::AlterSourceTimestampIntervalPlan,
3042 ) -> Result<ExecuteResponse, AdapterError> {
3043 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3044 id: plan.id,
3045 value: plan.value,
3046 interval: plan.interval,
3047 }];
3048 self.catalog_transact_with_context(None, Some(ctx), ops)
3049 .await?;
3050 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3051 }
3052
3053 #[instrument]
3054 pub(super) async fn sequence_alter_schema_rename(
3055 &mut self,
3056 ctx: &mut ExecuteContext,
3057 plan: plan::AlterSchemaRenamePlan,
3058 ) -> Result<ExecuteResponse, AdapterError> {
3059 let (database_spec, schema_spec) = plan.cur_schema_spec;
3060 let op = catalog::Op::RenameSchema {
3061 database_spec,
3062 schema_spec,
3063 new_name: plan.new_schema_name,
3064 check_reserved_names: true,
3065 };
3066 match self
3067 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3068 .await
3069 {
3070 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3071 Err(err) => Err(err),
3072 }
3073 }
3074
3075 #[instrument]
3076 pub(super) async fn sequence_alter_schema_swap(
3077 &mut self,
3078 ctx: &mut ExecuteContext,
3079 plan: plan::AlterSchemaSwapPlan,
3080 ) -> Result<ExecuteResponse, AdapterError> {
3081 let plan::AlterSchemaSwapPlan {
3082 schema_a_spec: (schema_a_db, schema_a),
3083 schema_a_name,
3084 schema_b_spec: (schema_b_db, schema_b),
3085 schema_b_name,
3086 name_temp,
3087 } = plan;
3088
3089 let op_a = catalog::Op::RenameSchema {
3090 database_spec: schema_a_db,
3091 schema_spec: schema_a,
3092 new_name: name_temp,
3093 check_reserved_names: false,
3094 };
3095 let op_b = catalog::Op::RenameSchema {
3096 database_spec: schema_b_db,
3097 schema_spec: schema_b,
3098 new_name: schema_a_name,
3099 check_reserved_names: false,
3100 };
3101 let op_c = catalog::Op::RenameSchema {
3102 database_spec: schema_a_db,
3103 schema_spec: schema_a,
3104 new_name: schema_b_name,
3105 check_reserved_names: false,
3106 };
3107
3108 match self
3109 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3110 Box::pin(async {})
3111 })
3112 .await
3113 {
3114 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3115 Err(err) => Err(err),
3116 }
3117 }
3118
3119 #[instrument]
3120 pub(super) async fn sequence_alter_role(
3121 &mut self,
3122 session: &Session,
3123 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3124 ) -> Result<ExecuteResponse, AdapterError> {
3125 let catalog = self.catalog().for_session(session);
3126 let role = catalog.get_role(&id);
3127
3128 let mut notices = vec![];
3130
3131 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3133 let mut vars = role.vars().clone();
3134
3135 let mut nopassword = false;
3138
3139 match option {
3141 PlannedAlterRoleOption::Attributes(attrs) => {
3142 self.validate_role_attributes(&attrs.clone().into())?;
3143
3144 if let Some(inherit) = attrs.inherit {
3145 attributes.inherit = inherit;
3146 }
3147
3148 if let Some(password) = attrs.password {
3149 attributes.password = Some(password);
3150 attributes.scram_iterations =
3151 Some(self.catalog().system_config().scram_iterations())
3152 }
3153
3154 if let Some(superuser) = attrs.superuser {
3155 attributes.superuser = Some(superuser);
3156 }
3157
3158 if let Some(login) = attrs.login {
3159 attributes.login = Some(login);
3160 }
3161
3162 if attrs.nopassword.unwrap_or(false) {
3163 nopassword = true;
3164 }
3165
3166 if let Some(notice) = self.should_emit_rbac_notice(session) {
3167 notices.push(notice);
3168 }
3169 }
3170 PlannedAlterRoleOption::Variable(variable) => {
3171 let session_var = session.vars().inspect(variable.name())?;
3173 session_var.visible(session.user(), catalog.system_vars())?;
3175
3176 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3179 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3180 } else if let PlannedRoleVariable::Set {
3181 name,
3182 value: VariableValue::Values(vals),
3183 } = &variable
3184 {
3185 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3186 notices.push(AdapterNotice::IntrospectionClusterUsage);
3187 }
3188 }
3189
3190 let var_name = match variable {
3191 PlannedRoleVariable::Set { name, value } => {
3192 match &value {
3194 VariableValue::Default => {
3195 vars.remove(&name);
3196 }
3197 VariableValue::Values(vals) => {
3198 let var = match &vals[..] {
3199 [val] => OwnedVarInput::Flat(val.clone()),
3200 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3201 };
3202 session_var.check(var.borrow())?;
3204
3205 vars.insert(name.clone(), var);
3206 }
3207 };
3208 name
3209 }
3210 PlannedRoleVariable::Reset { name } => {
3211 vars.remove(&name);
3213 name
3214 }
3215 };
3216
3217 notices.push(AdapterNotice::VarDefaultUpdated {
3219 role: Some(name.clone()),
3220 var_name: Some(var_name),
3221 });
3222 }
3223 }
3224
3225 let op = catalog::Op::AlterRole {
3226 id,
3227 name,
3228 attributes,
3229 nopassword,
3230 vars: RoleVars { map: vars },
3231 };
3232 let response = self
3233 .catalog_transact(Some(session), vec![op])
3234 .await
3235 .map(|_| ExecuteResponse::AlteredRole)?;
3236
3237 session.add_notices(notices);
3239
3240 Ok(response)
3241 }
3242
3243 #[instrument]
3244 pub(super) async fn sequence_alter_sink_prepare(
3245 &mut self,
3246 ctx: ExecuteContext,
3247 plan: plan::AlterSinkPlan,
3248 ) {
3249 let id_bundle = crate::CollectionIdBundle {
3251 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3252 compute_ids: BTreeMap::new(),
3253 };
3254 let read_hold = self.acquire_read_holds(&id_bundle);
3255
3256 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3257 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3258 return;
3259 };
3260
3261 let otel_ctx = OpenTelemetryContext::obtain();
3262 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3263
3264 let plan_validity = PlanValidity::new(
3265 self.catalog().transient_revision(),
3266 BTreeSet::from_iter([plan.item_id, from_item_id]),
3267 Some(plan.in_cluster),
3268 None,
3269 ctx.session().role_metadata().clone(),
3270 );
3271
3272 info!(
3273 "preparing alter sink for {}: frontiers={:?} export={:?}",
3274 plan.global_id,
3275 self.controller
3276 .storage_collections
3277 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3278 self.controller.storage.export(plan.global_id)
3279 );
3280
3281 self.install_storage_watch_set(
3287 ctx.session().conn_id().clone(),
3288 BTreeSet::from_iter([plan.global_id]),
3289 read_ts,
3290 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3291 ctx: Some(ctx),
3292 otel_ctx,
3293 plan,
3294 plan_validity,
3295 read_hold,
3296 }),
3297 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3298 }
3299
3300 #[instrument]
3301 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3302 ctx.otel_ctx.attach_as_parent();
3303
3304 let plan::AlterSinkPlan {
3305 item_id,
3306 global_id,
3307 sink: sink_plan,
3308 with_snapshot,
3309 in_cluster,
3310 } = ctx.plan.clone();
3311
3312 match ctx.plan_validity.check(self.catalog()) {
3321 Ok(()) => {}
3322 Err(err) => {
3323 ctx.retire(Err(err));
3324 return;
3325 }
3326 }
3327
3328 let entry = self.catalog().get_entry(&item_id);
3329 let CatalogItem::Sink(old_sink) = entry.item() else {
3330 panic!("invalid item kind for `AlterSinkPlan`");
3331 };
3332
3333 if sink_plan.version != old_sink.version + 1 {
3334 ctx.retire(Err(AdapterError::ChangedPlan(
3335 "sink was altered concurrently".into(),
3336 )));
3337 return;
3338 }
3339
3340 info!(
3341 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3342 self.controller
3343 .storage_collections
3344 .collections_frontiers(vec![global_id, sink_plan.from]),
3345 self.controller.storage.export(global_id),
3346 );
3347
3348 let write_frontier = &self
3351 .controller
3352 .storage
3353 .export(global_id)
3354 .expect("sink known to exist")
3355 .write_frontier;
3356 let as_of = ctx.read_hold.least_valid_read();
3357 assert!(
3358 write_frontier.iter().all(|t| as_of.less_than(t)),
3359 "{:?} should be strictly less than {:?}",
3360 &*as_of,
3361 &**write_frontier
3362 );
3363
3364 let create_sql = &old_sink.create_sql;
3370 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3371 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3372 unreachable!("invalid statement kind for sink");
3373 };
3374
3375 stmt.with_options
3377 .retain(|o| o.name != CreateSinkOptionName::Version);
3378 stmt.with_options.push(CreateSinkOption {
3379 name: CreateSinkOptionName::Version,
3380 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3381 sink_plan.version.to_string(),
3382 ))),
3383 });
3384
3385 let conn_catalog = self.catalog().for_system_session();
3386 let (mut stmt, resolved_ids) =
3387 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3388
3389 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3391 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3392 stmt.from = ResolvedItemName::Item {
3393 id: from_entry.id(),
3394 qualifiers: from_entry.name.qualifiers.clone(),
3395 full_name,
3396 print_id: true,
3397 version: from_entry.version,
3398 };
3399
3400 let new_sink = Sink {
3401 create_sql: stmt.to_ast_string_stable(),
3402 global_id,
3403 from: sink_plan.from,
3404 connection: sink_plan.connection.clone(),
3405 envelope: sink_plan.envelope,
3406 version: sink_plan.version,
3407 with_snapshot,
3408 resolved_ids: resolved_ids.clone(),
3409 cluster_id: in_cluster,
3410 commit_interval: sink_plan.commit_interval,
3411 };
3412
3413 let ops = vec![catalog::Op::UpdateItem {
3414 id: item_id,
3415 name: entry.name().clone(),
3416 to_item: CatalogItem::Sink(new_sink),
3417 }];
3418
3419 match self
3420 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3421 .await
3422 {
3423 Ok(()) => {}
3424 Err(err) => {
3425 ctx.retire(Err(err));
3426 return;
3427 }
3428 }
3429
3430 let storage_sink_desc = StorageSinkDesc {
3431 from: sink_plan.from,
3432 from_desc: from_entry
3433 .relation_desc()
3434 .expect("sinks can only be built on items with descs")
3435 .into_owned(),
3436 connection: sink_plan
3437 .connection
3438 .clone()
3439 .into_inline_connection(self.catalog().state()),
3440 envelope: sink_plan.envelope,
3441 as_of,
3442 with_snapshot,
3443 version: sink_plan.version,
3444 from_storage_metadata: (),
3445 to_storage_metadata: (),
3446 commit_interval: sink_plan.commit_interval,
3447 };
3448
3449 self.controller
3450 .storage
3451 .alter_export(
3452 global_id,
3453 ExportDescription {
3454 sink: storage_sink_desc,
3455 instance_id: in_cluster,
3456 },
3457 )
3458 .await
3459 .unwrap_or_terminate("cannot fail to alter source desc");
3460
3461 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3462 }
3463
3464 #[instrument]
3465 pub(super) async fn sequence_alter_connection(
3466 &mut self,
3467 ctx: ExecuteContext,
3468 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3469 ) {
3470 match action {
3471 AlterConnectionAction::RotateKeys => {
3472 self.sequence_rotate_keys(ctx, id).await;
3473 }
3474 AlterConnectionAction::AlterOptions {
3475 set_options,
3476 drop_options,
3477 validate,
3478 } => {
3479 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3480 .await
3481 }
3482 }
3483 }
3484
3485 #[instrument]
3486 async fn sequence_alter_connection_options(
3487 &mut self,
3488 mut ctx: ExecuteContext,
3489 id: CatalogItemId,
3490 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3491 drop_options: BTreeSet<ConnectionOptionName>,
3492 validate: bool,
3493 ) {
3494 let cur_entry = self.catalog().get_entry(&id);
3495 let cur_conn = cur_entry.connection().expect("known to be connection");
3496 let connection_gid = cur_conn.global_id();
3497
3498 let inner = || -> Result<Connection, AdapterError> {
3499 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3501 .expect("invalid create sql persisted to catalog")
3502 .into_element()
3503 .ast
3504 {
3505 Statement::CreateConnection(stmt) => stmt,
3506 _ => unreachable!("proved type is source"),
3507 };
3508
3509 let catalog = self.catalog().for_system_session();
3510
3511 let (mut create_conn_stmt, resolved_ids) =
3513 mz_sql::names::resolve(&catalog, create_conn_stmt)
3514 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3515
3516 create_conn_stmt
3518 .values
3519 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3520
3521 create_conn_stmt.values.extend(
3523 set_options
3524 .into_iter()
3525 .map(|(name, value)| ConnectionOption { name, value }),
3526 );
3527
3528 let mut catalog = self.catalog().for_system_session();
3531 catalog.mark_id_unresolvable_for_replanning(id);
3532
3533 let plan = match mz_sql::plan::plan(
3535 None,
3536 &catalog,
3537 Statement::CreateConnection(create_conn_stmt),
3538 &Params::empty(),
3539 &resolved_ids,
3540 )
3541 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3542 {
3543 (Plan::CreateConnection(plan), _sql_impl_ids) => plan,
3544 (p, _) => {
3545 unreachable!("create connection plan is only valid response, got {:?}", p)
3546 }
3547 };
3548
3549 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3551 .expect("invalid create sql persisted to catalog")
3552 .into_element()
3553 .ast
3554 {
3555 Statement::CreateConnection(stmt) => stmt,
3556 _ => unreachable!("proved type is source"),
3557 };
3558
3559 let catalog = self.catalog().for_system_session();
3560
3561 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3563 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3564
3565 Ok(Connection {
3566 create_sql: plan.connection.create_sql,
3567 global_id: cur_conn.global_id,
3568 details: plan.connection.details,
3569 resolved_ids: new_deps,
3570 })
3571 };
3572
3573 let conn = match inner() {
3574 Ok(conn) => conn,
3575 Err(e) => {
3576 return ctx.retire(Err(e));
3577 }
3578 };
3579
3580 if validate {
3581 let connection = conn
3582 .details
3583 .to_connection()
3584 .into_inline_connection(self.catalog().state());
3585
3586 let internal_cmd_tx = self.internal_cmd_tx.clone();
3587 let transient_revision = self.catalog().transient_revision();
3588 let conn_id = ctx.session().conn_id().clone();
3589 let otel_ctx = OpenTelemetryContext::obtain();
3590 let role_metadata = ctx.session().role_metadata().clone();
3591 let current_storage_parameters = self.controller.storage.config().clone();
3592
3593 task::spawn(
3594 || format!("validate_alter_connection:{conn_id}"),
3595 async move {
3596 let resolved_ids = conn.resolved_ids.clone();
3597 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3598 let result = match std::panic::AssertUnwindSafe(
3599 connection.validate(id, ¤t_storage_parameters),
3600 )
3601 .ore_catch_unwind()
3602 .await
3603 {
3604 Ok(Ok(())) => Ok(conn),
3605 Ok(Err(err)) => Err(err.into()),
3606 Err(_panic) => {
3607 tracing::error!("alter connection validation panicked");
3608 Err(AdapterError::Internal(
3609 "connection validation panicked".into(),
3610 ))
3611 }
3612 };
3613
3614 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3616 AlterConnectionValidationReady {
3617 ctx,
3618 result,
3619 connection_id: id,
3620 connection_gid,
3621 plan_validity: PlanValidity::new(
3622 transient_revision,
3623 dependency_ids.clone(),
3624 None,
3625 None,
3626 role_metadata,
3627 ),
3628 otel_ctx,
3629 resolved_ids,
3630 },
3631 ));
3632 if let Err(e) = result {
3633 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3634 }
3635 },
3636 );
3637 } else {
3638 let result = self
3639 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3640 .await;
3641 ctx.retire(result);
3642 }
3643 }
3644
3645 #[instrument]
3646 pub(crate) async fn sequence_alter_connection_stage_finish(
3647 &mut self,
3648 session: &Session,
3649 id: CatalogItemId,
3650 connection: Connection,
3651 ) -> Result<ExecuteResponse, AdapterError> {
3652 match self.catalog.get_entry(&id).item() {
3653 CatalogItem::Connection(curr_conn) => {
3654 curr_conn
3655 .details
3656 .to_connection()
3657 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3658 .map_err(StorageError::from)?;
3659 }
3660 _ => unreachable!("known to be a connection"),
3661 };
3662
3663 let ops = vec![catalog::Op::UpdateItem {
3664 id,
3665 name: self.catalog.get_entry(&id).name().clone(),
3666 to_item: CatalogItem::Connection(connection.clone()),
3667 }];
3668
3669 self.catalog_transact(Some(session), ops).await?;
3670
3671 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3678 }
3679
3680 #[instrument]
3681 pub(super) async fn sequence_alter_source(
3682 &mut self,
3683 session: &Session,
3684 plan::AlterSourcePlan {
3685 item_id,
3686 ingestion_id,
3687 action,
3688 }: plan::AlterSourcePlan,
3689 ) -> Result<ExecuteResponse, AdapterError> {
3690 let cur_entry = self.catalog().get_entry(&item_id);
3691 let cur_source = cur_entry.source().expect("known to be source");
3692
3693 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3694 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3696 .expect("invalid create sql persisted to catalog")
3697 .into_element()
3698 .ast
3699 {
3700 Statement::CreateSource(stmt) => stmt,
3701 _ => unreachable!("proved type is source"),
3702 };
3703
3704 let catalog = coord.catalog().for_system_session();
3705
3706 mz_sql::names::resolve(&catalog, create_source_stmt)
3708 .map_err(|e| AdapterError::internal(err_cx, e))
3709 };
3710
3711 match action {
3712 plan::AlterSourceAction::AddSubsourceExports {
3713 subsources,
3714 options,
3715 } => {
3716 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3717
3718 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3719 text_columns: mut new_text_columns,
3720 exclude_columns: mut new_exclude_columns,
3721 ..
3722 } = options.try_into()?;
3723
3724 let (mut create_source_stmt, resolved_ids) =
3726 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3727
3728 let catalog = self.catalog();
3730 let curr_references: BTreeSet<_> = catalog
3731 .get_entry(&item_id)
3732 .used_by()
3733 .into_iter()
3734 .filter_map(|subsource| {
3735 catalog
3736 .get_entry(subsource)
3737 .subsource_details()
3738 .map(|(_id, reference, _details)| reference)
3739 })
3740 .collect();
3741
3742 let purification_err =
3745 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3746
3747 match &mut create_source_stmt.connection {
3751 CreateSourceConnection::Postgres {
3752 options: curr_options,
3753 ..
3754 } => {
3755 let mz_sql::plan::PgConfigOptionExtracted {
3756 mut text_columns, ..
3757 } = curr_options.clone().try_into()?;
3758
3759 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3762
3763 text_columns.retain(|column_qualified_reference| {
3765 mz_ore::soft_assert_eq_or_log!(
3766 column_qualified_reference.0.len(),
3767 4,
3768 "all TEXT COLUMNS values must be column-qualified references"
3769 );
3770 let mut table = column_qualified_reference.clone();
3771 table.0.truncate(3);
3772 curr_references.contains(&table)
3773 });
3774
3775 new_text_columns.extend(text_columns);
3777
3778 if !new_text_columns.is_empty() {
3780 new_text_columns.sort();
3781 let new_text_columns = new_text_columns
3782 .into_iter()
3783 .map(WithOptionValue::UnresolvedItemName)
3784 .collect();
3785
3786 curr_options.push(PgConfigOption {
3787 name: PgConfigOptionName::TextColumns,
3788 value: Some(WithOptionValue::Sequence(new_text_columns)),
3789 });
3790 }
3791 }
3792 CreateSourceConnection::MySql {
3793 options: curr_options,
3794 ..
3795 } => {
3796 let mz_sql::plan::MySqlConfigOptionExtracted {
3797 mut text_columns,
3798 mut exclude_columns,
3799 ..
3800 } = curr_options.clone().try_into()?;
3801
3802 curr_options.retain(|o| {
3805 !matches!(
3806 o.name,
3807 MySqlConfigOptionName::TextColumns
3808 | MySqlConfigOptionName::ExcludeColumns
3809 )
3810 });
3811
3812 let column_referenced =
3814 |column_qualified_reference: &UnresolvedItemName| {
3815 mz_ore::soft_assert_eq_or_log!(
3816 column_qualified_reference.0.len(),
3817 3,
3818 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3819 );
3820 let mut table = column_qualified_reference.clone();
3821 table.0.truncate(2);
3822 curr_references.contains(&table)
3823 };
3824 text_columns.retain(column_referenced);
3825 exclude_columns.retain(column_referenced);
3826
3827 new_text_columns.extend(text_columns);
3829 new_exclude_columns.extend(exclude_columns);
3830
3831 if !new_text_columns.is_empty() {
3833 new_text_columns.sort();
3834 let new_text_columns = new_text_columns
3835 .into_iter()
3836 .map(WithOptionValue::UnresolvedItemName)
3837 .collect();
3838
3839 curr_options.push(MySqlConfigOption {
3840 name: MySqlConfigOptionName::TextColumns,
3841 value: Some(WithOptionValue::Sequence(new_text_columns)),
3842 });
3843 }
3844 if !new_exclude_columns.is_empty() {
3846 new_exclude_columns.sort();
3847 let new_exclude_columns = new_exclude_columns
3848 .into_iter()
3849 .map(WithOptionValue::UnresolvedItemName)
3850 .collect();
3851
3852 curr_options.push(MySqlConfigOption {
3853 name: MySqlConfigOptionName::ExcludeColumns,
3854 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3855 });
3856 }
3857 }
3858 CreateSourceConnection::SqlServer {
3859 options: curr_options,
3860 ..
3861 } => {
3862 let mz_sql::plan::SqlServerConfigOptionExtracted {
3863 mut text_columns,
3864 mut exclude_columns,
3865 ..
3866 } = curr_options.clone().try_into()?;
3867
3868 curr_options.retain(|o| {
3871 !matches!(
3872 o.name,
3873 SqlServerConfigOptionName::TextColumns
3874 | SqlServerConfigOptionName::ExcludeColumns
3875 )
3876 });
3877
3878 let column_referenced =
3884 |column_qualified_reference: &UnresolvedItemName| {
3885 mz_ore::soft_assert_eq_or_log!(
3886 column_qualified_reference.0.len(),
3887 3,
3888 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3889 );
3890 let mut table = column_qualified_reference.clone();
3891 table.0.truncate(2);
3892 curr_references.iter().any(|r| r.0.ends_with(&table.0))
3893 };
3894 text_columns.retain(column_referenced);
3895 exclude_columns.retain(column_referenced);
3896
3897 new_text_columns.extend(text_columns);
3899 new_exclude_columns.extend(exclude_columns);
3900
3901 if !new_text_columns.is_empty() {
3903 new_text_columns.sort();
3904 let new_text_columns = new_text_columns
3905 .into_iter()
3906 .map(WithOptionValue::UnresolvedItemName)
3907 .collect();
3908
3909 curr_options.push(SqlServerConfigOption {
3910 name: SqlServerConfigOptionName::TextColumns,
3911 value: Some(WithOptionValue::Sequence(new_text_columns)),
3912 });
3913 }
3914 if !new_exclude_columns.is_empty() {
3916 new_exclude_columns.sort();
3917 let new_exclude_columns = new_exclude_columns
3918 .into_iter()
3919 .map(WithOptionValue::UnresolvedItemName)
3920 .collect();
3921
3922 curr_options.push(SqlServerConfigOption {
3923 name: SqlServerConfigOptionName::ExcludeColumns,
3924 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3925 });
3926 }
3927 }
3928 _ => return Err(purification_err()),
3929 };
3930
3931 let mut catalog = self.catalog().for_system_session();
3932 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3933
3934 let planned = mz_sql::plan::plan(
3936 None,
3937 &catalog,
3938 Statement::CreateSource(create_source_stmt),
3939 &Params::empty(),
3940 &resolved_ids,
3941 )
3942 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3943 let plan = match planned {
3944 (Plan::CreateSource(plan), _sql_impl_ids) => plan,
3945 (p, _) => {
3946 unreachable!("create source plan is only valid response, got {:?}", p)
3947 }
3948 };
3949
3950 let source = Source::new(
3954 plan,
3955 cur_source.global_id,
3956 resolved_ids,
3957 cur_source.custom_logical_compaction_window,
3958 cur_source.is_retained_metrics_object,
3959 );
3960
3961 let desc = match &source.data_source {
3963 DataSourceDesc::Ingestion { desc, .. }
3964 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3965 desc.clone().into_inline_connection(self.catalog().state())
3966 }
3967 _ => unreachable!("already verified of type ingestion"),
3968 };
3969
3970 self.controller
3971 .storage
3972 .check_alter_ingestion_source_desc(ingestion_id, &desc)
3973 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3974
3975 let mut ops = vec![catalog::Op::UpdateItem {
3978 id: item_id,
3979 name: self.catalog.get_entry(&item_id).name().clone(),
3982 to_item: CatalogItem::Source(source),
3983 }];
3984
3985 let CreateSourceInner {
3986 ops: new_ops,
3987 sources: _,
3988 if_not_exists_ids,
3989 } = self.create_source_inner(session, subsources).await?;
3990
3991 ops.extend(new_ops.into_iter());
3992
3993 assert!(
3994 if_not_exists_ids.is_empty(),
3995 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
3996 );
3997
3998 self.catalog_transact(Some(session), ops).await?;
3999 }
4000 plan::AlterSourceAction::RefreshReferences { references } => {
4001 self.catalog_transact(
4002 Some(session),
4003 vec![catalog::Op::UpdateSourceReferences {
4004 source_id: item_id,
4005 references: references.into(),
4006 }],
4007 )
4008 .await?;
4009 }
4010 }
4011
4012 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4013 }
4014
4015 #[instrument]
4016 pub(super) async fn sequence_alter_system_set(
4017 &mut self,
4018 session: &Session,
4019 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4020 ) -> Result<ExecuteResponse, AdapterError> {
4021 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4022 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4024 self.validate_alter_system_network_policy(session, &value)?;
4025 }
4026
4027 let op = match value {
4028 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4029 name: name.clone(),
4030 value: OwnedVarInput::SqlSet(values),
4031 },
4032 plan::VariableValue::Default => {
4033 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4034 }
4035 };
4036 self.catalog_transact(Some(session), vec![op]).await?;
4037
4038 session.add_notice(AdapterNotice::VarDefaultUpdated {
4039 role: None,
4040 var_name: Some(name),
4041 });
4042 Ok(ExecuteResponse::AlteredSystemConfiguration)
4043 }
4044
4045 #[instrument]
4046 pub(super) async fn sequence_alter_system_reset(
4047 &mut self,
4048 session: &Session,
4049 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4050 ) -> Result<ExecuteResponse, AdapterError> {
4051 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4052 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4053 self.catalog_transact(Some(session), vec![op]).await?;
4054 session.add_notice(AdapterNotice::VarDefaultUpdated {
4055 role: None,
4056 var_name: Some(name),
4057 });
4058 Ok(ExecuteResponse::AlteredSystemConfiguration)
4059 }
4060
4061 #[instrument]
4062 pub(super) async fn sequence_alter_system_reset_all(
4063 &mut self,
4064 session: &Session,
4065 _: plan::AlterSystemResetAllPlan,
4066 ) -> Result<ExecuteResponse, AdapterError> {
4067 self.is_user_allowed_to_alter_system(session, None)?;
4068 let op = catalog::Op::ResetAllSystemConfiguration;
4069 self.catalog_transact(Some(session), vec![op]).await?;
4070 session.add_notice(AdapterNotice::VarDefaultUpdated {
4071 role: None,
4072 var_name: None,
4073 });
4074 Ok(ExecuteResponse::AlteredSystemConfiguration)
4075 }
4076
4077 fn is_user_allowed_to_alter_system(
4079 &self,
4080 session: &Session,
4081 var_name: Option<&str>,
4082 ) -> Result<(), AdapterError> {
4083 match (session.user().kind(), var_name) {
4084 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4086 (UserKind::Superuser, Some(name))
4088 if session.user().is_internal()
4089 || self.catalog().system_config().user_modifiable(name) =>
4090 {
4091 let var = self.catalog().system_config().get(name)?;
4094 var.visible(session.user(), self.catalog().system_config())?;
4095 Ok(())
4096 }
4097 (UserKind::Regular, Some(name))
4100 if self.catalog().system_config().user_modifiable(name) =>
4101 {
4102 Err(AdapterError::Unauthorized(
4103 rbac::UnauthorizedError::Superuser {
4104 action: format!("toggle the '{name}' system configuration parameter"),
4105 },
4106 ))
4107 }
4108 _ => Err(AdapterError::Unauthorized(
4109 rbac::UnauthorizedError::MzSystem {
4110 action: "alter system".into(),
4111 },
4112 )),
4113 }
4114 }
4115
4116 fn validate_alter_system_network_policy(
4117 &self,
4118 session: &Session,
4119 policy_value: &plan::VariableValue,
4120 ) -> Result<(), AdapterError> {
4121 let policy_name = match &policy_value {
4122 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4124 plan::VariableValue::Values(values) if values.len() == 1 => {
4125 values.iter().next().cloned()
4126 }
4127 plan::VariableValue::Values(values) => {
4128 tracing::warn!(?values, "can't set multiple network policies at once");
4129 None
4130 }
4131 };
4132 let maybe_network_policy = policy_name
4133 .as_ref()
4134 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4135 let Some(network_policy) = maybe_network_policy else {
4136 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4137 VarError::InvalidParameterValue {
4138 name: NETWORK_POLICY.name(),
4139 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4140 reason: "no network policy with such name exists".to_string(),
4141 },
4142 )));
4143 };
4144 self.validate_alter_network_policy(session, &network_policy.rules)
4145 }
4146
4147 fn validate_alter_network_policy(
4152 &self,
4153 session: &Session,
4154 policy_rules: &Vec<NetworkPolicyRule>,
4155 ) -> Result<(), AdapterError> {
4156 if session.user().is_internal() {
4159 return Ok(());
4160 }
4161 if let Some(ip) = session.meta().client_ip() {
4162 validate_ip_with_policy_rules(ip, policy_rules)
4163 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4164 } else {
4165 return Err(AdapterError::NetworkPolicyDenied(
4168 NetworkPolicyError::MissingIp,
4169 ));
4170 }
4171 Ok(())
4172 }
4173
4174 #[instrument]
4176 pub(super) fn sequence_execute(
4177 &self,
4178 session: &mut Session,
4179 plan: plan::ExecutePlan,
4180 ) -> Result<String, AdapterError> {
4181 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4183 let ps = session
4184 .get_prepared_statement_unverified(&plan.name)
4185 .expect("known to exist");
4186 let stmt = ps.stmt().cloned();
4187 let desc = ps.desc().clone();
4188 let state_revision = ps.state_revision;
4189 let logging = Arc::clone(ps.logging());
4190 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4191 }
4192
4193 #[instrument]
4194 pub(super) async fn sequence_grant_privileges(
4195 &mut self,
4196 session: &Session,
4197 plan::GrantPrivilegesPlan {
4198 update_privileges,
4199 grantees,
4200 }: plan::GrantPrivilegesPlan,
4201 ) -> Result<ExecuteResponse, AdapterError> {
4202 self.sequence_update_privileges(
4203 session,
4204 update_privileges,
4205 grantees,
4206 UpdatePrivilegeVariant::Grant,
4207 )
4208 .await
4209 }
4210
4211 #[instrument]
4212 pub(super) async fn sequence_revoke_privileges(
4213 &mut self,
4214 session: &Session,
4215 plan::RevokePrivilegesPlan {
4216 update_privileges,
4217 revokees,
4218 }: plan::RevokePrivilegesPlan,
4219 ) -> Result<ExecuteResponse, AdapterError> {
4220 self.sequence_update_privileges(
4221 session,
4222 update_privileges,
4223 revokees,
4224 UpdatePrivilegeVariant::Revoke,
4225 )
4226 .await
4227 }
4228
4229 #[instrument]
4230 async fn sequence_update_privileges(
4231 &mut self,
4232 session: &Session,
4233 update_privileges: Vec<UpdatePrivilege>,
4234 grantees: Vec<RoleId>,
4235 variant: UpdatePrivilegeVariant,
4236 ) -> Result<ExecuteResponse, AdapterError> {
4237 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4238 let mut warnings = Vec::new();
4239 let catalog = self.catalog().for_session(session);
4240
4241 for UpdatePrivilege {
4242 acl_mode,
4243 target_id,
4244 grantor,
4245 } in update_privileges
4246 {
4247 let actual_object_type = catalog.get_system_object_type(&target_id);
4248 if actual_object_type.is_relation() {
4251 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4252 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4253 if !non_applicable_privileges.is_empty() {
4254 let object_description =
4255 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4256 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4257 non_applicable_privileges,
4258 object_description,
4259 })
4260 }
4261 }
4262
4263 if let SystemObjectId::Object(object_id) = &target_id {
4264 self.catalog()
4265 .ensure_not_reserved_object(object_id, session.conn_id())?;
4266 }
4267
4268 let privileges = self
4269 .catalog()
4270 .get_privileges(&target_id, session.conn_id())
4271 .ok_or(AdapterError::Unsupported(
4274 "GRANTs/REVOKEs on an object type with no privileges",
4275 ))?;
4276
4277 for grantee in &grantees {
4278 self.catalog().ensure_not_system_role(grantee)?;
4279 self.catalog().ensure_not_predefined_role(grantee)?;
4280 let existing_privilege = privileges
4281 .get_acl_item(grantee, &grantor)
4282 .map(Cow::Borrowed)
4283 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4284
4285 match variant {
4286 UpdatePrivilegeVariant::Grant
4287 if !existing_privilege.acl_mode.contains(acl_mode) =>
4288 {
4289 ops.push(catalog::Op::UpdatePrivilege {
4290 target_id: target_id.clone(),
4291 privilege: MzAclItem {
4292 grantee: *grantee,
4293 grantor,
4294 acl_mode,
4295 },
4296 variant,
4297 });
4298 }
4299 UpdatePrivilegeVariant::Revoke
4300 if !existing_privilege
4301 .acl_mode
4302 .intersection(acl_mode)
4303 .is_empty() =>
4304 {
4305 ops.push(catalog::Op::UpdatePrivilege {
4306 target_id: target_id.clone(),
4307 privilege: MzAclItem {
4308 grantee: *grantee,
4309 grantor,
4310 acl_mode,
4311 },
4312 variant,
4313 });
4314 }
4315 _ => {}
4317 }
4318 }
4319 }
4320
4321 if ops.is_empty() {
4322 session.add_notices(warnings);
4323 return Ok(variant.into());
4324 }
4325
4326 let res = self
4327 .catalog_transact(Some(session), ops)
4328 .await
4329 .map(|_| match variant {
4330 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4331 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4332 });
4333 if res.is_ok() {
4334 session.add_notices(warnings);
4335 }
4336 res
4337 }
4338
4339 #[instrument]
4340 pub(super) async fn sequence_alter_default_privileges(
4341 &mut self,
4342 session: &Session,
4343 plan::AlterDefaultPrivilegesPlan {
4344 privilege_objects,
4345 privilege_acl_items,
4346 is_grant,
4347 }: plan::AlterDefaultPrivilegesPlan,
4348 ) -> Result<ExecuteResponse, AdapterError> {
4349 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4350 let variant = if is_grant {
4351 UpdatePrivilegeVariant::Grant
4352 } else {
4353 UpdatePrivilegeVariant::Revoke
4354 };
4355 for privilege_object in &privilege_objects {
4356 self.catalog()
4357 .ensure_not_system_role(&privilege_object.role_id)?;
4358 self.catalog()
4359 .ensure_not_predefined_role(&privilege_object.role_id)?;
4360 if let Some(database_id) = privilege_object.database_id {
4361 self.catalog()
4362 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4363 }
4364 if let Some(schema_id) = privilege_object.schema_id {
4365 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4366 let schema_spec: SchemaSpecifier = schema_id.into();
4367
4368 self.catalog().ensure_not_reserved_object(
4369 &(database_spec, schema_spec).into(),
4370 session.conn_id(),
4371 )?;
4372 }
4373 for privilege_acl_item in &privilege_acl_items {
4374 self.catalog()
4375 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4376 self.catalog()
4377 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4378 ops.push(catalog::Op::UpdateDefaultPrivilege {
4379 privilege_object: privilege_object.clone(),
4380 privilege_acl_item: privilege_acl_item.clone(),
4381 variant,
4382 })
4383 }
4384 }
4385
4386 self.catalog_transact(Some(session), ops).await?;
4387 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4388 }
4389
4390 #[instrument]
4391 pub(super) async fn sequence_grant_role(
4392 &mut self,
4393 session: &Session,
4394 plan::GrantRolePlan {
4395 role_ids,
4396 member_ids,
4397 grantor_id,
4398 }: plan::GrantRolePlan,
4399 ) -> Result<ExecuteResponse, AdapterError> {
4400 let catalog = self.catalog();
4401 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4402 for role_id in role_ids {
4403 for member_id in &member_ids {
4404 let member_membership: BTreeSet<_> =
4405 catalog.get_role(member_id).membership().keys().collect();
4406 if member_membership.contains(&role_id) {
4407 let role_name = catalog.get_role(&role_id).name().to_string();
4408 let member_name = catalog.get_role(member_id).name().to_string();
4409 catalog.ensure_not_reserved_role(member_id)?;
4411 catalog.ensure_grantable_role(&role_id)?;
4412 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4413 role_name,
4414 member_name,
4415 });
4416 } else {
4417 ops.push(catalog::Op::GrantRole {
4418 role_id,
4419 member_id: *member_id,
4420 grantor_id,
4421 });
4422 }
4423 }
4424 }
4425
4426 if ops.is_empty() {
4427 return Ok(ExecuteResponse::GrantedRole);
4428 }
4429
4430 self.catalog_transact(Some(session), ops)
4431 .await
4432 .map(|_| ExecuteResponse::GrantedRole)
4433 }
4434
4435 #[instrument]
4436 pub(super) async fn sequence_revoke_role(
4437 &mut self,
4438 session: &Session,
4439 plan::RevokeRolePlan {
4440 role_ids,
4441 member_ids,
4442 grantor_id,
4443 }: plan::RevokeRolePlan,
4444 ) -> Result<ExecuteResponse, AdapterError> {
4445 let catalog = self.catalog();
4446 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4447 for role_id in role_ids {
4448 for member_id in &member_ids {
4449 let member_membership: BTreeSet<_> =
4450 catalog.get_role(member_id).membership().keys().collect();
4451 if !member_membership.contains(&role_id) {
4452 let role_name = catalog.get_role(&role_id).name().to_string();
4453 let member_name = catalog.get_role(member_id).name().to_string();
4454 catalog.ensure_not_reserved_role(member_id)?;
4456 catalog.ensure_grantable_role(&role_id)?;
4457 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4458 role_name,
4459 member_name,
4460 });
4461 } else {
4462 ops.push(catalog::Op::RevokeRole {
4463 role_id,
4464 member_id: *member_id,
4465 grantor_id,
4466 });
4467 }
4468 }
4469 }
4470
4471 if ops.is_empty() {
4472 return Ok(ExecuteResponse::RevokedRole);
4473 }
4474
4475 self.catalog_transact(Some(session), ops)
4476 .await
4477 .map(|_| ExecuteResponse::RevokedRole)
4478 }
4479
4480 #[instrument]
4481 pub(super) async fn sequence_alter_owner(
4482 &mut self,
4483 session: &Session,
4484 plan::AlterOwnerPlan {
4485 id,
4486 object_type,
4487 new_owner,
4488 }: plan::AlterOwnerPlan,
4489 ) -> Result<ExecuteResponse, AdapterError> {
4490 let mut ops = vec![catalog::Op::UpdateOwner {
4491 id: id.clone(),
4492 new_owner,
4493 }];
4494
4495 match &id {
4496 ObjectId::Item(global_id) => {
4497 let entry = self.catalog().get_entry(global_id);
4498
4499 if entry.is_index() {
4501 let name = self
4502 .catalog()
4503 .resolve_full_name(entry.name(), Some(session.conn_id()))
4504 .to_string();
4505 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4506 return Ok(ExecuteResponse::AlteredObject(object_type));
4507 }
4508
4509 let dependent_index_ops = entry
4511 .used_by()
4512 .into_iter()
4513 .filter(|id| self.catalog().get_entry(id).is_index())
4514 .map(|id| catalog::Op::UpdateOwner {
4515 id: ObjectId::Item(*id),
4516 new_owner,
4517 });
4518 ops.extend(dependent_index_ops);
4519
4520 let dependent_subsources =
4522 entry
4523 .progress_id()
4524 .into_iter()
4525 .map(|item_id| catalog::Op::UpdateOwner {
4526 id: ObjectId::Item(item_id),
4527 new_owner,
4528 });
4529 ops.extend(dependent_subsources);
4530 }
4531 ObjectId::Cluster(cluster_id) => {
4532 let cluster = self.catalog().get_cluster(*cluster_id);
4533 let managed_cluster_replica_ops =
4535 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4536 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4537 new_owner,
4538 });
4539 ops.extend(managed_cluster_replica_ops);
4540 }
4541 _ => {}
4542 }
4543
4544 self.catalog_transact(Some(session), ops)
4545 .await
4546 .map(|_| ExecuteResponse::AlteredObject(object_type))
4547 }
4548
4549 #[instrument]
4550 pub(super) async fn sequence_reassign_owned(
4551 &mut self,
4552 session: &Session,
4553 plan::ReassignOwnedPlan {
4554 old_roles,
4555 new_role,
4556 reassign_ids,
4557 }: plan::ReassignOwnedPlan,
4558 ) -> Result<ExecuteResponse, AdapterError> {
4559 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4560 self.catalog().ensure_not_reserved_role(role_id)?;
4561 }
4562
4563 let ops = reassign_ids
4564 .into_iter()
4565 .map(|id| catalog::Op::UpdateOwner {
4566 id,
4567 new_owner: new_role,
4568 })
4569 .collect();
4570
4571 self.catalog_transact(Some(session), ops)
4572 .await
4573 .map(|_| ExecuteResponse::ReassignOwned)
4574 }
4575
4576 #[instrument]
4577 pub(crate) async fn handle_deferred_statement(&mut self) {
4578 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4582 return;
4583 };
4584 match ps {
4585 crate::coord::PlanStatement::Statement { stmt, params } => {
4586 self.handle_execute_inner(stmt, params, ctx).await;
4587 }
4588 crate::coord::PlanStatement::Plan {
4589 plan,
4590 resolved_ids,
4591 sql_impl_resolved_ids,
4592 } => {
4593 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_resolved_ids)
4594 .await;
4595 }
4596 }
4597 }
4598
4599 #[instrument]
4600 #[allow(clippy::unused_async)]
4602 pub(super) async fn sequence_alter_table(
4603 &mut self,
4604 ctx: &mut ExecuteContext,
4605 plan: plan::AlterTablePlan,
4606 ) -> Result<ExecuteResponse, AdapterError> {
4607 let plan::AlterTablePlan {
4608 relation_id,
4609 column_name,
4610 column_type,
4611 raw_sql_type,
4612 } = plan;
4613
4614 let (_, new_global_id) = self.allocate_user_id().await?;
4616 let ops = vec![catalog::Op::AlterAddColumn {
4617 id: relation_id,
4618 new_global_id,
4619 name: column_name,
4620 typ: column_type,
4621 sql: raw_sql_type,
4622 }];
4623
4624 self.catalog_transact_with_context(None, Some(ctx), ops)
4625 .await?;
4626
4627 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4628 }
4629
4630 #[instrument]
4632 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4633 &mut self,
4634 ctx: ExecuteContext,
4635 plan: AlterMaterializedViewApplyReplacementPlan,
4636 ) {
4637 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4647
4648 let plan_validity = PlanValidity::new(
4649 self.catalog().transient_revision(),
4650 BTreeSet::from_iter([id, replacement_id]),
4651 None,
4652 None,
4653 ctx.session().role_metadata().clone(),
4654 );
4655
4656 let target = self.catalog.get_entry(&id);
4657 let target_gid = target.latest_global_id();
4658
4659 let replacement = self.catalog.get_entry(&replacement_id);
4660 let replacement_gid = replacement.latest_global_id();
4661
4662 let target_upper = self
4663 .controller
4664 .storage_collections
4665 .collection_frontiers(target_gid)
4666 .expect("target MV exists")
4667 .write_frontier;
4668 let replacement_upper = self
4669 .controller
4670 .compute
4671 .collection_frontiers(replacement_gid, replacement.cluster_id())
4672 .expect("replacement MV exists")
4673 .write_frontier;
4674
4675 info!(
4676 %id, %replacement_id, ?target_upper, ?replacement_upper,
4677 "preparing materialized view replacement application",
4678 );
4679
4680 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4681 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4690 name: target.name().item.clone(),
4691 }));
4692 return;
4693 };
4694
4695 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4699
4700 self.install_storage_watch_set(
4704 ctx.session().conn_id().clone(),
4705 BTreeSet::from_iter([target_gid]),
4706 replacement_upper_ts,
4707 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4708 ctx: Some(ctx),
4709 otel_ctx: OpenTelemetryContext::obtain(),
4710 plan,
4711 plan_validity,
4712 }),
4713 )
4714 .expect("target collection exists");
4715 }
4716
4717 #[instrument]
4719 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4720 &mut self,
4721 mut ctx: AlterMaterializedViewReadyContext,
4722 ) {
4723 ctx.otel_ctx.attach_as_parent();
4724
4725 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4726
4727 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4732 ctx.retire(Err(err));
4733 return;
4734 }
4735
4736 info!(
4737 %id, %replacement_id,
4738 "finishing materialized view replacement application",
4739 );
4740
4741 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4742 match self
4743 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4744 .await
4745 {
4746 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4747 ObjectType::MaterializedView,
4748 ))),
4749 Err(err) => ctx.retire(Err(err)),
4750 }
4751 }
4752
4753 pub(super) async fn statistics_oracle(
4754 &self,
4755 session: &Session,
4756 source_ids: &BTreeSet<GlobalId>,
4757 query_as_of: &Antichain<Timestamp>,
4758 is_oneshot: bool,
4759 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4760 super::statistics_oracle(
4761 session,
4762 source_ids,
4763 query_as_of,
4764 is_oneshot,
4765 self.catalog().system_config(),
4766 self.controller.storage_collections.as_ref(),
4767 )
4768 .await
4769 }
4770}
4771
4772impl Coordinator {
4773 pub(crate) fn emit_raw_optimizer_notices_to_user(
4781 &self,
4782 ctx: &ExecuteContext,
4783 notices: &[RawOptimizerNotice],
4784 ) {
4785 emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4786 }
4787
4788 async fn persist_dataflow_metainfo(
4798 &mut self,
4799 df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4800 export_id: GlobalId,
4801 ) -> Option<BuiltinTableAppendNotify> {
4802 if self.catalog().state().system_config().enable_mz_notices()
4805 && !df_meta.optimizer_notices.is_empty()
4806 {
4807 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4808 self.catalog().state().pack_optimizer_notices(
4809 &mut builtin_table_updates,
4810 df_meta.optimizer_notices.iter(),
4811 Diff::ONE,
4812 );
4813
4814 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4816
4817 Some(
4818 self.builtin_table_update()
4819 .execute(builtin_table_updates)
4820 .await
4821 .0,
4822 )
4823 } else {
4824 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4826
4827 None
4828 }
4829 }
4830}