1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, Eval, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44 RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49 SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
55 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59 SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63 StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{
101 BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn, UserWriteResponder,
102};
103use crate::coord::read_then_write::validate_read_then_write_dependencies;
104use crate::coord::sequencer::emit_optimizer_notices;
105use crate::coord::{
106 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
107 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
108 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
109 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
110 WatchSetResponse, validate_ip_with_policy_rules,
111};
112use crate::error::AdapterError;
113use crate::notice::{AdapterNotice, DroppedInUseIndex};
114use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
115use crate::optimize::{self, Optimize};
116use crate::session::{
117 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
118 WriteLocks, WriteOp,
119};
120use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
121use crate::{PeekResponseUnary, ReadHolds};
122
123type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
125
126mod cluster;
127mod copy_from;
128mod create_index;
129mod create_materialized_view;
130mod create_view;
131mod explain_timestamp;
132mod peek;
133mod secret;
134mod subscribe;
135
136macro_rules! return_if_err {
139 ($expr:expr, $ctx:expr) => {
140 match $expr {
141 Ok(v) => v,
142 Err(e) => return $ctx.retire(Err(e.into())),
143 }
144 };
145}
146
147pub(super) use return_if_err;
148
149struct DropOps {
150 ops: Vec<catalog::Op>,
151 dropped_active_db: bool,
152 dropped_active_cluster: bool,
153 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
154}
155
156struct CreateSourceInner {
158 ops: Vec<catalog::Op>,
159 sources: Vec<(CatalogItemId, Source)>,
160 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
161}
162
163impl Coordinator {
164 pub(crate) async fn sequence_staged<S>(
171 &mut self,
172 mut ctx: S::Ctx,
173 parent_span: Span,
174 mut stage: S,
175 ) where
176 S: Staged + 'static,
177 S::Ctx: Send + 'static,
178 {
179 return_if_err!(stage.validity().check(self.catalog()), ctx);
180 loop {
181 let mut cancel_enabled = stage.cancel_enabled();
182 if let Some(session) = ctx.session() {
183 if cancel_enabled {
184 if let Some((_prev_tx, prev_rx)) = self
187 .connection_cancel_watches
188 .insert(session.conn_id().clone(), watch::channel(false))
189 {
190 let was_canceled = *prev_rx.borrow();
191 if was_canceled {
192 ctx.retire(Err(AdapterError::Canceled));
193 return;
194 }
195 }
196 } else {
197 self.connection_cancel_watches.remove(session.conn_id());
200 }
201 } else {
202 cancel_enabled = false
203 };
204 let next = stage
205 .stage(self, &mut ctx)
206 .instrument(parent_span.clone())
207 .await;
208 let res = return_if_err!(next, ctx);
209 stage = match res {
210 StageResult::Handle(handle) => {
211 let internal_cmd_tx = self.internal_cmd_tx.clone();
212 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
213 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
214 });
215 return;
216 }
217 StageResult::HandleRetire(handle) => {
218 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
219 ctx.retire(Ok(resp));
220 });
221 return;
222 }
223 StageResult::Response(resp) => {
224 ctx.retire(Ok(resp));
225 return;
226 }
227 StageResult::Immediate(stage) => *stage,
228 }
229 }
230 }
231
232 fn handle_spawn<C, T, F>(
235 &self,
236 ctx: C,
237 handle: JoinHandle<Result<T, AdapterError>>,
238 cancel_enabled: bool,
239 f: F,
240 ) where
241 C: StagedContext + Send + 'static,
242 T: Send + 'static,
243 F: FnOnce(C, T) + Send + 'static,
244 {
245 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
246 .session()
247 .and_then(|session| self.connection_cancel_watches.get(session.conn_id()))
248 {
249 let mut rx = rx.clone();
250 Box::pin(async move {
251 let _ = rx.wait_for(|v| *v).await;
253 ()
254 })
255 } else {
256 Box::pin(future::pending())
257 };
258 spawn(|| "sequence_staged", async move {
259 tokio::select! {
260 res = handle => {
261 let next = return_if_err!(res, ctx);
262 f(ctx, next);
263 }
264 _ = rx, if cancel_enabled => {
265 ctx.retire(Err(AdapterError::Canceled));
266 }
267 }
268 });
269 }
270
271 async fn create_source_inner(
272 &self,
273 session: &Session,
274 plans: Vec<plan::CreateSourcePlanBundle>,
275 ) -> Result<CreateSourceInner, AdapterError> {
276 let mut ops = vec![];
277 let mut sources = vec![];
278
279 let if_not_exists_ids = plans
280 .iter()
281 .filter_map(
282 |plan::CreateSourcePlanBundle {
283 item_id,
284 global_id: _,
285 plan,
286 resolved_ids: _,
287 available_source_references: _,
288 }| {
289 if plan.if_not_exists {
290 Some((*item_id, plan.name.clone()))
291 } else {
292 None
293 }
294 },
295 )
296 .collect::<BTreeMap<_, _>>();
297
298 for plan::CreateSourcePlanBundle {
299 item_id,
300 global_id,
301 mut plan,
302 resolved_ids,
303 available_source_references,
304 } in plans
305 {
306 let name = plan.name.clone();
307
308 if let mz_sql::plan::DataSourceDesc::Webhook {
310 validate_using: Some(validate),
311 ..
312 } = &mut plan.source.data_source
313 {
314 if let Err(reason) = validate.reduce_expression().await {
315 self.metrics
316 .webhook_validation_reduce_failures
317 .with_label_values(&[reason])
318 .inc();
319 return Err(AdapterError::Internal(format!(
320 "failed to reduce check expression, {reason}"
321 )));
322 }
323 }
324
325 let mut reference_ops = vec![];
328 if let Some(references) = &available_source_references {
329 reference_ops.push(catalog::Op::UpdateSourceReferences {
330 source_id: item_id,
331 references: references.clone().into(),
332 });
333 }
334
335 let source = Source::new(plan, global_id, resolved_ids, None, false);
336 ops.push(catalog::Op::CreateItem {
337 id: item_id,
338 name,
339 item: CatalogItem::Source(source.clone()),
340 owner_id: *session.current_role_id(),
341 });
342 sources.push((item_id, source));
343 ops.extend(reference_ops);
345 }
346
347 Ok(CreateSourceInner {
348 ops,
349 sources,
350 if_not_exists_ids,
351 })
352 }
353
354 pub(crate) fn plan_subsource(
362 &self,
363 session: &Session,
364 params: &mz_sql::plan::Params,
365 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
366 item_id: CatalogItemId,
367 global_id: GlobalId,
368 ) -> Result<CreateSourcePlanBundle, AdapterError> {
369 let catalog = self.catalog().for_session(session);
370 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
371
372 let (plan, _sql_impl_ids) = self.plan_statement(
373 session,
374 Statement::CreateSubsource(subsource_stmt),
375 params,
376 &resolved_ids,
377 )?;
378 let plan = match plan {
379 Plan::CreateSource(plan) => plan,
380 _ => unreachable!(),
381 };
382 Ok(CreateSourcePlanBundle {
383 item_id,
384 global_id,
385 plan,
386 resolved_ids,
387 available_source_references: None,
388 })
389 }
390
391 pub(crate) async fn plan_purified_alter_source_add_subsource(
393 &mut self,
394 session: &Session,
395 params: Params,
396 source_name: ResolvedItemName,
397 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
398 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
399 ) -> Result<(Plan, ResolvedIds), AdapterError> {
400 let mut subsource_plans = Vec::with_capacity(subsources.len());
401
402 let conn_catalog = self.catalog().for_system_session();
404 let pcx = plan::PlanContext::zero();
405 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
406
407 let entry = self.catalog().get_entry(source_name.item_id());
408 let source = entry.source().ok_or_else(|| {
409 AdapterError::internal(
410 "plan alter source",
411 format!("expected Source found {entry:?}"),
412 )
413 })?;
414
415 let item_id = entry.id();
416 let ingestion_id = source.global_id();
417 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
418
419 let ids = self
420 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
421 .await?;
422 for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
423 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
424 subsource_plans.push(s);
425 }
426
427 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
428 subsources: subsource_plans,
429 options,
430 };
431
432 Ok((
433 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
434 item_id,
435 ingestion_id,
436 action,
437 }),
438 ResolvedIds::empty(),
439 ))
440 }
441
442 pub(crate) fn plan_purified_alter_source_refresh_references(
444 &self,
445 _session: &Session,
446 _params: Params,
447 source_name: ResolvedItemName,
448 available_source_references: plan::SourceReferences,
449 ) -> Result<(Plan, ResolvedIds), AdapterError> {
450 let entry = self.catalog().get_entry(source_name.item_id());
451 let source = entry.source().ok_or_else(|| {
452 AdapterError::internal(
453 "plan alter source",
454 format!("expected Source found {entry:?}"),
455 )
456 })?;
457 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
458 references: available_source_references,
459 };
460
461 Ok((
462 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
463 item_id: entry.id(),
464 ingestion_id: source.global_id(),
465 action,
466 }),
467 ResolvedIds::empty(),
468 ))
469 }
470
471 pub(crate) async fn plan_purified_create_source(
475 &mut self,
476 ctx: &ExecuteContext,
477 params: Params,
478 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
479 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
480 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
481 available_source_references: plan::SourceReferences,
482 ) -> Result<(Plan, ResolvedIds), AdapterError> {
483 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
484
485 if let Some(progress_stmt) = progress_stmt {
487 assert_none!(progress_stmt.of_source);
492 let (item_id, global_id) = self.allocate_user_id().await?;
493 let progress_plan =
494 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
495 let progress_full_name = self
496 .catalog()
497 .resolve_full_name(&progress_plan.plan.name, None);
498 let progress_subsource = ResolvedItemName::Item {
499 id: progress_plan.item_id,
500 qualifiers: progress_plan.plan.name.qualifiers.clone(),
501 full_name: progress_full_name,
502 print_id: true,
503 version: RelationVersionSelector::Latest,
504 };
505
506 create_source_plans.push(progress_plan);
507
508 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
509 }
510
511 let catalog = self.catalog().for_session(ctx.session());
512 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
513
514 let propagated_with_options: Vec<_> = source_stmt
515 .with_options
516 .iter()
517 .filter_map(|opt| match opt.name {
518 CreateSourceOptionName::TimestampInterval => None,
519 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
520 name: CreateSubsourceOptionName::RetainHistory,
521 value: opt.value.clone(),
522 }),
523 })
524 .collect();
525
526 let source_plan = match self.plan_statement(
528 ctx.session(),
529 Statement::CreateSource(source_stmt),
530 ¶ms,
531 &resolved_ids,
532 )? {
533 (Plan::CreateSource(plan), _sql_impl_ids) => plan,
534 (p, _) => unreachable!("s must be CreateSourcePlan but got {:?}", p),
535 };
536
537 let (item_id, global_id) = self.allocate_user_id().await?;
538
539 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
540 let of_source = ResolvedItemName::Item {
541 id: item_id,
542 qualifiers: source_plan.name.qualifiers.clone(),
543 full_name: source_full_name,
544 print_id: true,
545 version: RelationVersionSelector::Latest,
546 };
547
548 let conn_catalog = self.catalog().for_system_session();
550 let pcx = plan::PlanContext::zero();
551 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
552
553 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
554
555 for subsource_stmt in subsource_stmts.iter_mut() {
556 subsource_stmt
557 .with_options
558 .extend(propagated_with_options.iter().cloned())
559 }
560
561 create_source_plans.push(CreateSourcePlanBundle {
562 item_id,
563 global_id,
564 plan: source_plan,
565 resolved_ids: resolved_ids.clone(),
566 available_source_references: Some(available_source_references),
567 });
568
569 let ids = self
571 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
572 .await?;
573 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
574 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
575 create_source_plans.push(plan);
576 }
577
578 Ok((
579 Plan::CreateSources(create_source_plans),
580 ResolvedIds::empty(),
581 ))
582 }
583
584 #[instrument]
585 pub(super) async fn sequence_create_source(
586 &mut self,
587 ctx: &mut ExecuteContext,
588 plans: Vec<plan::CreateSourcePlanBundle>,
589 ) -> Result<ExecuteResponse, AdapterError> {
590 let CreateSourceInner {
591 ops,
592 sources,
593 if_not_exists_ids,
594 } = self.create_source_inner(ctx.session(), plans).await?;
595
596 let transact_result = self
597 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
598 .await;
599
600 for (item_id, source) in &sources {
602 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
603 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
604 ctx.session()
605 .add_notice(AdapterNotice::WebhookSourceCreated { url });
606 }
607 }
608 }
609
610 match transact_result {
611 Ok(()) => Ok(ExecuteResponse::CreatedSource),
612 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
613 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
614 })) if if_not_exists_ids.contains_key(&id) => {
615 ctx.session()
616 .add_notice(AdapterNotice::ObjectAlreadyExists {
617 name: if_not_exists_ids[&id].item.clone(),
618 ty: "source",
619 });
620 Ok(ExecuteResponse::CreatedSource)
621 }
622 Err(err) => Err(err),
623 }
624 }
625
626 #[instrument]
627 pub(super) async fn sequence_create_connection(
628 &mut self,
629 mut ctx: ExecuteContext,
630 plan: plan::CreateConnectionPlan,
631 resolved_ids: ResolvedIds,
632 ) {
633 let (connection_id, connection_gid) = match self.allocate_user_id().await {
634 Ok(item_id) => item_id,
635 Err(err) => return ctx.retire(Err(err)),
636 };
637
638 match &plan.connection.details {
639 ConnectionDetails::Ssh { key_1, key_2, .. } => {
640 let key_1 = match key_1.as_key_pair() {
641 Some(key_1) => key_1.clone(),
642 None => {
643 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
644 "the PUBLIC KEY 1 option cannot be explicitly specified"
645 ))));
646 }
647 };
648
649 let key_2 = match key_2.as_key_pair() {
650 Some(key_2) => key_2.clone(),
651 None => {
652 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
653 "the PUBLIC KEY 2 option cannot be explicitly specified"
654 ))));
655 }
656 };
657
658 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
659 let secret = key_set.to_bytes();
660 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
661 return ctx.retire(Err(err.into()));
662 }
663 self.caching_secrets_reader.invalidate(connection_id);
664 }
665 _ => (),
666 };
667
668 if plan.validate {
669 let internal_cmd_tx = self.internal_cmd_tx.clone();
670 let transient_revision = self.catalog().transient_revision();
671 let conn_id = ctx.session().conn_id().clone();
672 let otel_ctx = OpenTelemetryContext::obtain();
673 let role_metadata = ctx.session().role_metadata().clone();
674
675 let connection = plan
676 .connection
677 .details
678 .to_connection()
679 .into_inline_connection(self.catalog().state());
680
681 let current_storage_parameters = self.controller.storage.config().clone();
682 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
683 let result = match std::panic::AssertUnwindSafe(
684 connection.validate(connection_id, ¤t_storage_parameters),
685 )
686 .ore_catch_unwind()
687 .await
688 {
689 Ok(Ok(())) => Ok(plan),
690 Ok(Err(err)) => Err(err.into()),
691 Err(_panic) => {
692 tracing::error!("connection validation panicked");
693 Err(AdapterError::Internal(
694 "connection validation panicked".into(),
695 ))
696 }
697 };
698
699 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
701 CreateConnectionValidationReady {
702 ctx,
703 result,
704 connection_id,
705 connection_gid,
706 plan_validity: PlanValidity::new(
707 transient_revision,
708 resolved_ids.items().copied().collect(),
709 None,
710 None,
711 role_metadata,
712 ),
713 otel_ctx,
714 resolved_ids: resolved_ids.clone(),
715 },
716 ));
717 if let Err(e) = result {
718 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
719 }
720 });
721 } else {
722 let result = self
723 .sequence_create_connection_stage_finish(
724 &mut ctx,
725 connection_id,
726 connection_gid,
727 plan,
728 resolved_ids,
729 )
730 .await;
731 ctx.retire(result);
732 }
733 }
734
735 #[instrument]
736 pub(crate) async fn sequence_create_connection_stage_finish(
737 &mut self,
738 ctx: &mut ExecuteContext,
739 connection_id: CatalogItemId,
740 connection_gid: GlobalId,
741 plan: plan::CreateConnectionPlan,
742 resolved_ids: ResolvedIds,
743 ) -> Result<ExecuteResponse, AdapterError> {
744 let ops = vec![catalog::Op::CreateItem {
745 id: connection_id,
746 name: plan.name.clone(),
747 item: CatalogItem::Connection(Connection {
748 create_sql: plan.connection.create_sql,
749 global_id: connection_gid,
750 details: plan.connection.details.clone(),
751 resolved_ids,
752 }),
753 owner_id: *ctx.session().current_role_id(),
754 }];
755
756 let conn_id = ctx.session().conn_id().clone();
759 let transact_result = self
760 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
761 .await;
762
763 match transact_result {
764 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
765 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
766 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
767 })) if plan.if_not_exists => {
768 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
771 if let Err(e) = self.secrets_controller.delete(connection_id).await {
772 tracing::warn!(
773 "Dropping SSH secret for existing connection has encountered an error: {}",
774 e
775 );
776 } else {
777 self.caching_secrets_reader.invalidate(connection_id);
778 }
779 }
780 ctx.session()
781 .add_notice(AdapterNotice::ObjectAlreadyExists {
782 name: plan.name.item,
783 ty: "connection",
784 });
785 Ok(ExecuteResponse::CreatedConnection)
786 }
787 Err(err) => {
788 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
791 if let Err(e) = self.secrets_controller.delete(connection_id).await {
792 tracing::warn!(
793 "Dropping SSH secret for failed connection has encountered an error: {}",
794 e
795 );
796 } else {
797 self.caching_secrets_reader.invalidate(connection_id);
798 }
799 }
800 Err(err)
801 }
802 }
803 }
804
805 #[instrument]
806 pub(super) async fn sequence_create_database(
807 &mut self,
808 session: &Session,
809 plan: plan::CreateDatabasePlan,
810 ) -> Result<ExecuteResponse, AdapterError> {
811 let ops = vec![catalog::Op::CreateDatabase {
812 name: plan.name.clone(),
813 owner_id: *session.current_role_id(),
814 }];
815 match self.catalog_transact(Some(session), ops).await {
816 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
817 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
818 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
819 })) if plan.if_not_exists => {
820 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
821 Ok(ExecuteResponse::CreatedDatabase)
822 }
823 Err(err) => Err(err),
824 }
825 }
826
827 #[instrument]
828 pub(super) async fn sequence_create_schema(
829 &mut self,
830 session: &Session,
831 plan: plan::CreateSchemaPlan,
832 ) -> Result<ExecuteResponse, AdapterError> {
833 let op = catalog::Op::CreateSchema {
834 database_id: plan.database_spec,
835 schema_name: plan.schema_name.clone(),
836 owner_id: *session.current_role_id(),
837 };
838 match self.catalog_transact(Some(session), vec![op]).await {
839 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
840 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
841 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
842 })) if plan.if_not_exists => {
843 session.add_notice(AdapterNotice::SchemaAlreadyExists {
844 name: plan.schema_name,
845 });
846 Ok(ExecuteResponse::CreatedSchema)
847 }
848 Err(err) => Err(err),
849 }
850 }
851
852 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
854 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
855 if attributes.superuser.is_some() || attributes.password.is_some() {
856 return Err(AdapterError::UnavailableFeature {
857 feature: "SUPERUSER and PASSWORD attributes".to_string(),
858 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
859 });
860 }
861 }
862 Ok(())
863 }
864
865 #[instrument]
866 pub(super) async fn sequence_create_role(
867 &mut self,
868 conn_id: Option<&ConnectionId>,
869 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
870 ) -> Result<ExecuteResponse, AdapterError> {
871 self.validate_role_attributes(&attributes.clone())?;
872 let op = catalog::Op::CreateRole { name, attributes };
873 self.catalog_transact_with_context(conn_id, None, vec![op])
874 .await
875 .map(|_| ExecuteResponse::CreatedRole)
876 }
877
878 #[instrument]
879 pub(super) async fn sequence_create_network_policy(
880 &mut self,
881 session: &Session,
882 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
883 ) -> Result<ExecuteResponse, AdapterError> {
884 let op = catalog::Op::CreateNetworkPolicy {
885 rules,
886 name,
887 owner_id: *session.current_role_id(),
888 };
889 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
890 .await
891 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
892 }
893
894 #[instrument]
895 pub(super) async fn sequence_alter_network_policy(
896 &mut self,
897 session: &Session,
898 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
899 ) -> Result<ExecuteResponse, AdapterError> {
900 let current_network_policy_name =
902 self.catalog().system_config().default_network_policy_name();
903 if current_network_policy_name == name {
905 self.validate_alter_network_policy(session, &rules)?;
906 }
907
908 let op = catalog::Op::AlterNetworkPolicy {
909 id,
910 rules,
911 name,
912 owner_id: *session.current_role_id(),
913 };
914 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
915 .await
916 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
917 }
918
919 #[instrument]
920 pub(super) async fn sequence_create_table(
921 &mut self,
922 ctx: &mut ExecuteContext,
923 plan: plan::CreateTablePlan,
924 resolved_ids: ResolvedIds,
925 ) -> Result<ExecuteResponse, AdapterError> {
926 let plan::CreateTablePlan {
927 name,
928 table,
929 if_not_exists,
930 } = plan;
931
932 let conn_id = if table.temporary {
933 Some(ctx.session().conn_id())
934 } else {
935 None
936 };
937 let (table_id, global_id) = self.allocate_user_id().await?;
938 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
939
940 let data_source = match table.data_source {
941 plan::TableDataSource::TableWrites { defaults } => {
942 TableDataSource::TableWrites { defaults }
943 }
944 plan::TableDataSource::DataSource {
945 desc: data_source_plan,
946 timeline,
947 } => match data_source_plan {
948 plan::DataSourceDesc::IngestionExport {
949 ingestion_id,
950 external_reference,
951 details,
952 data_config,
953 } => TableDataSource::DataSource {
954 desc: DataSourceDesc::IngestionExport {
955 ingestion_id,
956 external_reference,
957 details,
958 data_config,
959 },
960 timeline,
961 },
962 plan::DataSourceDesc::Webhook {
963 validate_using,
964 body_format,
965 headers,
966 cluster_id,
967 } => TableDataSource::DataSource {
968 desc: DataSourceDesc::Webhook {
969 validate_using,
970 body_format,
971 headers,
972 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
973 },
974 timeline,
975 },
976 o => {
977 unreachable!("CREATE TABLE data source got {:?}", o)
978 }
979 },
980 };
981
982 let is_webhook = if let TableDataSource::DataSource {
983 desc: DataSourceDesc::Webhook { .. },
984 timeline: _,
985 } = &data_source
986 {
987 true
988 } else {
989 false
990 };
991
992 let table = Table {
993 create_sql: Some(table.create_sql),
994 desc: table.desc,
995 collections,
996 conn_id: conn_id.cloned(),
997 resolved_ids,
998 custom_logical_compaction_window: table.compaction_window,
999 is_retained_metrics_object: false,
1000 data_source,
1001 };
1002 let ops = vec![catalog::Op::CreateItem {
1003 id: table_id,
1004 name: name.clone(),
1005 item: CatalogItem::Table(table.clone()),
1006 owner_id: *ctx.session().current_role_id(),
1007 }];
1008
1009 let catalog_result = self
1010 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1011 .await;
1012
1013 if is_webhook {
1014 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1017 ctx.session()
1018 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1019 }
1020 }
1021
1022 match catalog_result {
1023 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1024 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1025 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1026 })) if if_not_exists => {
1027 ctx.session_mut()
1028 .add_notice(AdapterNotice::ObjectAlreadyExists {
1029 name: name.item,
1030 ty: "table",
1031 });
1032 Ok(ExecuteResponse::CreatedTable)
1033 }
1034 Err(err) => Err(err),
1035 }
1036 }
1037
1038 #[instrument]
1039 pub(super) async fn sequence_create_sink(
1040 &mut self,
1041 ctx: ExecuteContext,
1042 plan: plan::CreateSinkPlan,
1043 resolved_ids: ResolvedIds,
1044 ) {
1045 let plan::CreateSinkPlan {
1046 name,
1047 sink,
1048 with_snapshot,
1049 if_not_exists,
1050 in_cluster,
1051 } = plan;
1052
1053 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1055
1056 let catalog_sink = Sink {
1057 create_sql: sink.create_sql,
1058 global_id,
1059 from: sink.from,
1060 connection: sink.connection,
1061 envelope: sink.envelope,
1062 version: sink.version,
1063 with_snapshot,
1064 resolved_ids,
1065 cluster_id: in_cluster,
1066 commit_interval: sink.commit_interval,
1067 };
1068
1069 let ops = vec![catalog::Op::CreateItem {
1070 id: item_id,
1071 name: name.clone(),
1072 item: CatalogItem::Sink(catalog_sink.clone()),
1073 owner_id: *ctx.session().current_role_id(),
1074 }];
1075
1076 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1077
1078 match result {
1079 Ok(()) => {}
1080 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1081 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1082 })) if if_not_exists => {
1083 ctx.session()
1084 .add_notice(AdapterNotice::ObjectAlreadyExists {
1085 name: name.item,
1086 ty: "sink",
1087 });
1088 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1089 return;
1090 }
1091 Err(e) => {
1092 ctx.retire(Err(e));
1093 return;
1094 }
1095 };
1096
1097 self.create_storage_export(global_id, &catalog_sink)
1098 .await
1099 .unwrap_or_terminate("cannot fail to create exports");
1100
1101 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1102 .await;
1103
1104 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1105 }
1106
1107 pub(super) fn validate_system_column_references(
1130 &self,
1131 uses_ambiguous_columns: bool,
1132 depends_on: &BTreeSet<GlobalId>,
1133 ) -> Result<(), AdapterError> {
1134 if uses_ambiguous_columns
1135 && depends_on
1136 .iter()
1137 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1138 {
1139 Err(AdapterError::AmbiguousSystemColumnReference)
1140 } else {
1141 Ok(())
1142 }
1143 }
1144
1145 #[instrument]
1146 pub(super) async fn sequence_create_type(
1147 &mut self,
1148 session: &Session,
1149 plan: plan::CreateTypePlan,
1150 resolved_ids: ResolvedIds,
1151 ) -> Result<ExecuteResponse, AdapterError> {
1152 let (item_id, global_id) = self.allocate_user_id().await?;
1153 plan.typ
1155 .inner
1156 .desc(&self.catalog().for_session(session))
1157 .map_err(AdapterError::from)?;
1158 let typ = Type {
1159 create_sql: Some(plan.typ.create_sql),
1160 global_id,
1161 details: CatalogTypeDetails {
1162 array_id: None,
1163 typ: plan.typ.inner,
1164 pg_metadata: None,
1165 },
1166 resolved_ids,
1167 };
1168 let op = catalog::Op::CreateItem {
1169 id: item_id,
1170 name: plan.name,
1171 item: CatalogItem::Type(typ),
1172 owner_id: *session.current_role_id(),
1173 };
1174 match self.catalog_transact(Some(session), vec![op]).await {
1175 Ok(()) => Ok(ExecuteResponse::CreatedType),
1176 Err(err) => Err(err),
1177 }
1178 }
1179
1180 #[instrument]
1181 pub(super) async fn sequence_comment_on(
1182 &mut self,
1183 session: &Session,
1184 plan: plan::CommentPlan,
1185 ) -> Result<ExecuteResponse, AdapterError> {
1186 let op = catalog::Op::Comment {
1187 object_id: plan.object_id,
1188 sub_component: plan.sub_component,
1189 comment: plan.comment,
1190 };
1191 self.catalog_transact(Some(session), vec![op]).await?;
1192 Ok(ExecuteResponse::Comment)
1193 }
1194
1195 #[instrument]
1196 pub(super) async fn sequence_drop_objects(
1197 &mut self,
1198 ctx: &mut ExecuteContext,
1199 plan::DropObjectsPlan {
1200 drop_ids,
1201 object_type,
1202 referenced_ids,
1203 }: plan::DropObjectsPlan,
1204 ) -> Result<ExecuteResponse, AdapterError> {
1205 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1206 let mut objects = Vec::new();
1207 for obj_id in &drop_ids {
1208 if !referenced_ids_hashset.contains(obj_id) {
1209 let object_info = ErrorMessageObjectDescription::from_id(
1210 obj_id,
1211 &self.catalog().for_session(ctx.session()),
1212 )
1213 .to_string();
1214 objects.push(object_info);
1215 }
1216 }
1217
1218 if !objects.is_empty() {
1219 ctx.session()
1220 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1221 }
1222
1223 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1225 .iter()
1226 .filter_map(|id| match id {
1227 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1228 _ => None,
1229 })
1230 .flatten()
1231 .collect();
1232
1233 let DropOps {
1234 ops,
1235 dropped_active_db,
1236 dropped_active_cluster,
1237 dropped_in_use_indexes,
1238 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1239
1240 self.catalog_transact_with_context(None, Some(ctx), ops)
1241 .await?;
1242
1243 if !expr_cache_invalidate_ids.is_empty() {
1245 let _fut = self.catalog().update_expression_cache(
1246 Default::default(),
1247 Default::default(),
1248 expr_cache_invalidate_ids,
1249 );
1250 }
1251
1252 fail::fail_point!("after_sequencer_drop_replica");
1253
1254 if dropped_active_db {
1255 ctx.session()
1256 .add_notice(AdapterNotice::DroppedActiveDatabase {
1257 name: ctx.session().vars().database().to_string(),
1258 });
1259 }
1260 if dropped_active_cluster {
1261 ctx.session()
1262 .add_notice(AdapterNotice::DroppedActiveCluster {
1263 name: ctx.session().vars().cluster().to_string(),
1264 });
1265 }
1266 for dropped_in_use_index in dropped_in_use_indexes {
1267 ctx.session()
1268 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1269 self.metrics
1270 .optimization_notices
1271 .with_label_values(&["DroppedInUseIndex"])
1272 .inc_by(1);
1273 }
1274 Ok(ExecuteResponse::DroppedObject(object_type))
1275 }
1276
1277 fn validate_dropped_role_ownership(
1278 &self,
1279 session: &Session,
1280 dropped_roles: &BTreeMap<RoleId, &str>,
1281 ) -> Result<(), AdapterError> {
1282 fn privilege_check(
1283 privileges: &PrivilegeMap,
1284 dropped_roles: &BTreeMap<RoleId, &str>,
1285 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1286 object_id: &SystemObjectId,
1287 catalog: &ConnCatalog,
1288 ) {
1289 for privilege in privileges.all_values() {
1290 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1291 let grantor_name = catalog.get_role(&privilege.grantor).name();
1292 let object_description =
1293 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1294 dependent_objects
1295 .entry(role_name.to_string())
1296 .or_default()
1297 .push(format!(
1298 "privileges on {object_description} granted by {grantor_name}",
1299 ));
1300 }
1301 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1302 let grantee_name = catalog.get_role(&privilege.grantee).name();
1303 let object_description =
1304 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1305 dependent_objects
1306 .entry(role_name.to_string())
1307 .or_default()
1308 .push(format!(
1309 "privileges granted on {object_description} to {grantee_name}"
1310 ));
1311 }
1312 }
1313 }
1314
1315 let catalog = self.catalog().for_session(session);
1316 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1317 for entry in self.catalog.entries() {
1318 let id = SystemObjectId::Object(entry.id().into());
1319 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1320 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1321 dependent_objects
1322 .entry(role_name.to_string())
1323 .or_default()
1324 .push(format!("owner of {object_description}"));
1325 }
1326 privilege_check(
1327 entry.privileges(),
1328 dropped_roles,
1329 &mut dependent_objects,
1330 &id,
1331 &catalog,
1332 );
1333 }
1334 for database in self.catalog.databases() {
1335 let database_id = SystemObjectId::Object(database.id().into());
1336 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1337 let object_description =
1338 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1339 dependent_objects
1340 .entry(role_name.to_string())
1341 .or_default()
1342 .push(format!("owner of {object_description}"));
1343 }
1344 privilege_check(
1345 &database.privileges,
1346 dropped_roles,
1347 &mut dependent_objects,
1348 &database_id,
1349 &catalog,
1350 );
1351 for schema in database.schemas_by_id.values() {
1352 let schema_id = SystemObjectId::Object(
1353 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1354 );
1355 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1356 let object_description =
1357 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1358 dependent_objects
1359 .entry(role_name.to_string())
1360 .or_default()
1361 .push(format!("owner of {object_description}"));
1362 }
1363 privilege_check(
1364 &schema.privileges,
1365 dropped_roles,
1366 &mut dependent_objects,
1367 &schema_id,
1368 &catalog,
1369 );
1370 }
1371 }
1372 for cluster in self.catalog.clusters() {
1373 let cluster_id = SystemObjectId::Object(cluster.id().into());
1374 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1375 let object_description =
1376 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1377 dependent_objects
1378 .entry(role_name.to_string())
1379 .or_default()
1380 .push(format!("owner of {object_description}"));
1381 }
1382 privilege_check(
1383 &cluster.privileges,
1384 dropped_roles,
1385 &mut dependent_objects,
1386 &cluster_id,
1387 &catalog,
1388 );
1389 for replica in cluster.replicas() {
1390 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1391 let replica_id =
1392 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1393 let object_description =
1394 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1395 dependent_objects
1396 .entry(role_name.to_string())
1397 .or_default()
1398 .push(format!("owner of {object_description}"));
1399 }
1400 }
1401 }
1402 privilege_check(
1403 self.catalog().system_privileges(),
1404 dropped_roles,
1405 &mut dependent_objects,
1406 &SystemObjectId::System,
1407 &catalog,
1408 );
1409 for (default_privilege_object, default_privilege_acl_items) in
1410 self.catalog.default_privileges()
1411 {
1412 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1413 dependent_objects
1414 .entry(role_name.to_string())
1415 .or_default()
1416 .push(format!(
1417 "default privileges on {}S created by {}",
1418 default_privilege_object.object_type, role_name
1419 ));
1420 }
1421 for default_privilege_acl_item in default_privilege_acl_items {
1422 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1423 dependent_objects
1424 .entry(role_name.to_string())
1425 .or_default()
1426 .push(format!(
1427 "default privileges on {}S granted to {}",
1428 default_privilege_object.object_type, role_name
1429 ));
1430 }
1431 }
1432 }
1433
1434 if !dependent_objects.is_empty() {
1435 Err(AdapterError::DependentObject(dependent_objects))
1436 } else {
1437 Ok(())
1438 }
1439 }
1440
1441 #[instrument]
1442 pub(super) async fn sequence_drop_owned(
1443 &mut self,
1444 session: &Session,
1445 plan: plan::DropOwnedPlan,
1446 ) -> Result<ExecuteResponse, AdapterError> {
1447 for role_id in &plan.role_ids {
1448 self.catalog().ensure_not_reserved_role(role_id)?;
1449 }
1450
1451 let mut privilege_revokes = plan.privilege_revokes;
1452
1453 let session_catalog = self.catalog().for_session(session);
1455 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1456 && !session.is_superuser()
1457 {
1458 let role_membership =
1460 session_catalog.collect_role_membership(session.current_role_id());
1461 let invalid_revokes: BTreeSet<_> = privilege_revokes
1462 .extract_if(.., |(_, privilege)| {
1463 !role_membership.contains(&privilege.grantor)
1464 })
1465 .map(|(object_id, _)| object_id)
1466 .collect();
1467 for invalid_revoke in invalid_revokes {
1468 let object_description =
1469 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1470 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1471 }
1472 }
1473
1474 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1475 catalog::Op::UpdatePrivilege {
1476 target_id: object_id,
1477 privilege,
1478 variant: UpdatePrivilegeVariant::Revoke,
1479 }
1480 });
1481 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1482 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1483 privilege_object,
1484 privilege_acl_item,
1485 variant: UpdatePrivilegeVariant::Revoke,
1486 },
1487 );
1488 let DropOps {
1489 ops: drop_ops,
1490 dropped_active_db,
1491 dropped_active_cluster,
1492 dropped_in_use_indexes,
1493 } = self.sequence_drop_common(session, plan.drop_ids)?;
1494
1495 let ops = privilege_revoke_ops
1496 .chain(default_privilege_revoke_ops)
1497 .chain(drop_ops.into_iter())
1498 .collect();
1499
1500 self.catalog_transact(Some(session), ops).await?;
1501
1502 if dropped_active_db {
1503 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1504 name: session.vars().database().to_string(),
1505 });
1506 }
1507 if dropped_active_cluster {
1508 session.add_notice(AdapterNotice::DroppedActiveCluster {
1509 name: session.vars().cluster().to_string(),
1510 });
1511 }
1512 for dropped_in_use_index in dropped_in_use_indexes {
1513 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1514 }
1515 Ok(ExecuteResponse::DroppedOwned)
1516 }
1517
1518 fn sequence_drop_common(
1519 &self,
1520 session: &Session,
1521 ids: Vec<ObjectId>,
1522 ) -> Result<DropOps, AdapterError> {
1523 let mut dropped_active_db = false;
1524 let mut dropped_active_cluster = false;
1525 let mut dropped_in_use_indexes = Vec::new();
1526 let mut dropped_roles = BTreeMap::new();
1527 let mut dropped_databases = BTreeSet::new();
1528 let mut dropped_schemas = BTreeSet::new();
1529 let mut role_revokes = BTreeSet::new();
1533 let mut default_privilege_revokes = BTreeSet::new();
1536
1537 let mut clusters_to_drop = BTreeSet::new();
1539
1540 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1541 for id in &ids {
1542 match id {
1543 ObjectId::Database(id) => {
1544 let name = self.catalog().get_database(id).name();
1545 if name == session.vars().database() {
1546 dropped_active_db = true;
1547 }
1548 dropped_databases.insert(id);
1549 }
1550 ObjectId::Schema((_, spec)) => {
1551 if let SchemaSpecifier::Id(id) = spec {
1552 dropped_schemas.insert(id);
1553 }
1554 }
1555 ObjectId::Cluster(id) => {
1556 clusters_to_drop.insert(*id);
1557 if let Some(active_id) = self
1558 .catalog()
1559 .active_cluster(session)
1560 .ok()
1561 .map(|cluster| cluster.id())
1562 {
1563 if id == &active_id {
1564 dropped_active_cluster = true;
1565 }
1566 }
1567 }
1568 ObjectId::Role(id) => {
1569 let role = self.catalog().get_role(id);
1570 let name = role.name();
1571 dropped_roles.insert(*id, name);
1572 for (group_id, grantor_id) in &role.membership.map {
1574 role_revokes.insert((*group_id, *id, *grantor_id));
1575 }
1576 }
1577 ObjectId::Item(id) => {
1578 if let Some(index) = self.catalog().get_entry(id).index() {
1579 let humanizer = self.catalog().for_session(session);
1580 let dependants = self
1581 .controller
1582 .compute
1583 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1584 .ok()
1585 .into_iter()
1586 .flatten()
1587 .filter(|dependant_id| {
1588 if dependant_id.is_transient() {
1595 return false;
1596 }
1597 let Some(dependent_id) = humanizer
1599 .try_get_item_by_global_id(dependant_id)
1600 .map(|item| item.id())
1601 else {
1602 return false;
1603 };
1604 !ids_set.contains(&ObjectId::Item(dependent_id))
1607 })
1608 .flat_map(|dependant_id| {
1609 humanizer.humanize_id(dependant_id)
1613 })
1614 .collect_vec();
1615 if !dependants.is_empty() {
1616 dropped_in_use_indexes.push(DroppedInUseIndex {
1617 index_name: humanizer
1618 .humanize_id(index.global_id())
1619 .unwrap_or_else(|| id.to_string()),
1620 dependant_objects: dependants,
1621 });
1622 }
1623 }
1624 }
1625 _ => {}
1626 }
1627 }
1628
1629 for id in &ids {
1630 match id {
1631 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1635 if !clusters_to_drop.contains(cluster_id) {
1636 let cluster = self.catalog.get_cluster(*cluster_id);
1637 if cluster.is_managed() {
1638 let replica =
1639 cluster.replica(*replica_id).expect("Catalog out of sync");
1640 if !replica.config.location.internal() {
1641 coord_bail!("cannot drop replica of managed cluster");
1642 }
1643 }
1644 }
1645 }
1646 _ => {}
1647 }
1648 }
1649
1650 for role_id in dropped_roles.keys() {
1651 self.catalog().ensure_not_reserved_role(role_id)?;
1652 }
1653 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1654 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1656 for role in self.catalog().user_roles() {
1657 for dropped_role_id in
1658 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1659 {
1660 role_revokes.insert((
1661 **dropped_role_id,
1662 role.id(),
1663 *role
1664 .membership
1665 .map
1666 .get(*dropped_role_id)
1667 .expect("included in keys above"),
1668 ));
1669 }
1670 }
1671
1672 for (default_privilege_object, default_privilege_acls) in
1673 self.catalog().default_privileges()
1674 {
1675 if matches!(
1676 &default_privilege_object.database_id,
1677 Some(database_id) if dropped_databases.contains(database_id),
1678 ) || matches!(
1679 &default_privilege_object.schema_id,
1680 Some(schema_id) if dropped_schemas.contains(schema_id),
1681 ) {
1682 for default_privilege_acl in default_privilege_acls {
1683 default_privilege_revokes.insert((
1684 default_privilege_object.clone(),
1685 default_privilege_acl.clone(),
1686 ));
1687 }
1688 }
1689 }
1690
1691 let ops = role_revokes
1692 .into_iter()
1693 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1694 role_id,
1695 member_id,
1696 grantor_id,
1697 })
1698 .chain(default_privilege_revokes.into_iter().map(
1699 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1700 privilege_object,
1701 privilege_acl_item,
1702 variant: UpdatePrivilegeVariant::Revoke,
1703 },
1704 ))
1705 .chain(iter::once(catalog::Op::DropObjects(
1706 ids.into_iter()
1707 .map(DropObjectInfo::manual_drop_from_object_id)
1708 .collect(),
1709 )))
1710 .collect();
1711
1712 Ok(DropOps {
1713 ops,
1714 dropped_active_db,
1715 dropped_active_cluster,
1716 dropped_in_use_indexes,
1717 })
1718 }
1719
1720 pub(super) fn sequence_explain_schema(
1721 &self,
1722 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1723 ) -> Result<ExecuteResponse, AdapterError> {
1724 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1725 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1726 })?;
1727
1728 let json_string = json_string(&json_value);
1729 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1730 Ok(Self::send_immediate_rows(row))
1731 }
1732
1733 pub(super) fn sequence_show_all_variables(
1734 &self,
1735 session: &Session,
1736 ) -> Result<ExecuteResponse, AdapterError> {
1737 let mut rows = viewable_variables(self.catalog().state(), session)
1738 .map(|v| (v.name(), v.value(), v.description()))
1739 .collect::<Vec<_>>();
1740 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1741
1742 let rows: Vec<_> = rows
1744 .into_iter()
1745 .map(|(name, val, desc)| {
1746 Row::pack_slice(&[
1747 Datum::String(name),
1748 Datum::String(&val),
1749 Datum::String(desc),
1750 ])
1751 })
1752 .collect();
1753 Ok(Self::send_immediate_rows(rows))
1754 }
1755
1756 pub(super) fn sequence_show_variable(
1757 &self,
1758 session: &Session,
1759 plan: plan::ShowVariablePlan,
1760 ) -> Result<ExecuteResponse, AdapterError> {
1761 if &plan.name == SCHEMA_ALIAS {
1762 let schemas = self.catalog.resolve_search_path(session);
1763 let schema = schemas.first();
1764 return match schema {
1765 Some((database_spec, schema_spec)) => {
1766 let schema_name = &self
1767 .catalog
1768 .get_schema(database_spec, schema_spec, session.conn_id())
1769 .name()
1770 .schema;
1771 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1772 Ok(Self::send_immediate_rows(row))
1773 }
1774 None => {
1775 if session.vars().current_object_missing_warnings() {
1776 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1777 search_path: session
1778 .vars()
1779 .search_path()
1780 .into_iter()
1781 .map(|schema| schema.to_string())
1782 .collect(),
1783 });
1784 }
1785 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1786 }
1787 };
1788 }
1789
1790 let variable = session
1791 .vars()
1792 .get(self.catalog().system_config(), &plan.name)
1793 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1794
1795 variable.visible(session.user(), self.catalog().system_config())?;
1798
1799 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1800 if variable.name() == vars::DATABASE.name()
1801 && matches!(
1802 self.catalog().resolve_database(&variable.value()),
1803 Err(CatalogError::UnknownDatabase(_))
1804 )
1805 && session.vars().current_object_missing_warnings()
1806 {
1807 let name = variable.value();
1808 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1809 } else if variable.name() == vars::CLUSTER.name()
1810 && matches!(
1811 self.catalog().resolve_cluster(&variable.value()),
1812 Err(CatalogError::UnknownCluster(_))
1813 )
1814 && session.vars().current_object_missing_warnings()
1815 {
1816 let name = variable.value();
1817 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1818 }
1819 Ok(Self::send_immediate_rows(row))
1820 }
1821
1822 #[instrument]
1823 pub(super) async fn sequence_inspect_shard(
1824 &self,
1825 session: &Session,
1826 plan: plan::InspectShardPlan,
1827 ) -> Result<ExecuteResponse, AdapterError> {
1828 if !session.user().is_internal() {
1831 return Err(AdapterError::Unauthorized(
1832 rbac::UnauthorizedError::MzSystem {
1833 action: "inspect".into(),
1834 },
1835 ));
1836 }
1837 let state = self
1838 .controller
1839 .storage
1840 .inspect_persist_state(plan.id)
1841 .await?;
1842 let jsonb = Jsonb::from_serde_json(state)?;
1843 Ok(Self::send_immediate_rows(jsonb.into_row()))
1844 }
1845
1846 #[instrument]
1847 pub(super) fn sequence_set_variable(
1848 &self,
1849 session: &mut Session,
1850 plan: plan::SetVariablePlan,
1851 ) -> Result<ExecuteResponse, AdapterError> {
1852 let (name, local) = (plan.name, plan.local);
1853 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1854 self.validate_set_isolation_level(session)?;
1855 }
1856 if &name == vars::CLUSTER.name() {
1857 self.validate_set_cluster(session)?;
1858 }
1859
1860 let vars = session.vars_mut();
1861 let values = match plan.value {
1862 plan::VariableValue::Default => None,
1863 plan::VariableValue::Values(values) => Some(values),
1864 };
1865
1866 match values {
1867 Some(values) => {
1868 vars.set(
1869 self.catalog().system_config(),
1870 &name,
1871 VarInput::SqlSet(&values),
1872 local,
1873 )?;
1874
1875 let vars = session.vars();
1876
1877 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1880 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1881 } else if name == vars::CLUSTER.name()
1882 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1883 {
1884 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1885 }
1886
1887 if name.as_str() == vars::DATABASE.name()
1889 && matches!(
1890 self.catalog().resolve_database(vars.database()),
1891 Err(CatalogError::UnknownDatabase(_))
1892 )
1893 && session.vars().current_object_missing_warnings()
1894 {
1895 let name = vars.database().to_string();
1896 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1897 } else if name.as_str() == vars::CLUSTER.name()
1898 && matches!(
1899 self.catalog().resolve_cluster(vars.cluster()),
1900 Err(CatalogError::UnknownCluster(_))
1901 )
1902 && session.vars().current_object_missing_warnings()
1903 {
1904 let name = vars.cluster().to_string();
1905 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1906 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1907 let v = values.into_first().to_lowercase();
1908 if v == IsolationLevel::ReadUncommitted.as_str()
1909 || v == IsolationLevel::ReadCommitted.as_str()
1910 || v == IsolationLevel::RepeatableRead.as_str()
1911 {
1912 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1913 isolation_level: v,
1914 });
1915 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1916 session.add_notice(AdapterNotice::StrongSessionSerializable);
1917 }
1918 }
1919 }
1920 None => vars.reset(self.catalog().system_config(), &name, local)?,
1921 }
1922
1923 Ok(ExecuteResponse::SetVariable { name, reset: false })
1924 }
1925
1926 pub(super) fn sequence_reset_variable(
1927 &self,
1928 session: &mut Session,
1929 plan: plan::ResetVariablePlan,
1930 ) -> Result<ExecuteResponse, AdapterError> {
1931 let name = plan.name;
1932 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1933 self.validate_set_isolation_level(session)?;
1934 }
1935 if &name == vars::CLUSTER.name() {
1936 self.validate_set_cluster(session)?;
1937 }
1938 session
1939 .vars_mut()
1940 .reset(self.catalog().system_config(), &name, false)?;
1941 Ok(ExecuteResponse::SetVariable { name, reset: true })
1942 }
1943
1944 pub(super) fn sequence_set_transaction(
1945 &self,
1946 session: &mut Session,
1947 plan: plan::SetTransactionPlan,
1948 ) -> Result<ExecuteResponse, AdapterError> {
1949 for mode in plan.modes {
1951 match mode {
1952 TransactionMode::AccessMode(_) => {
1953 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1954 }
1955 TransactionMode::IsolationLevel(isolation_level) => {
1956 self.validate_set_isolation_level(session)?;
1957
1958 session.vars_mut().set(
1959 self.catalog().system_config(),
1960 TRANSACTION_ISOLATION_VAR_NAME,
1961 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1962 plan.local,
1963 )?
1964 }
1965 }
1966 }
1967 Ok(ExecuteResponse::SetVariable {
1968 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1969 reset: false,
1970 })
1971 }
1972
1973 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1974 if session.transaction().contains_ops() {
1975 Err(AdapterError::InvalidSetIsolationLevel)
1976 } else {
1977 Ok(())
1978 }
1979 }
1980
1981 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1982 if session.transaction().contains_ops() {
1983 Err(AdapterError::InvalidSetCluster)
1984 } else {
1985 Ok(())
1986 }
1987 }
1988
1989 #[instrument]
1990 pub(super) async fn sequence_end_transaction(
1991 &mut self,
1992 mut ctx: ExecuteContext,
1993 mut action: EndTransactionAction,
1994 ) {
1995 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1997 (&action, ctx.session().transaction())
1998 {
1999 action = EndTransactionAction::Rollback;
2000 }
2001 let response = match action {
2002 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2003 params: BTreeMap::new(),
2004 }),
2005 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2006 params: BTreeMap::new(),
2007 }),
2008 };
2009
2010 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2011
2012 let (response, action) = match result {
2013 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2014 (response, action)
2015 }
2016 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2017 let validated_locks = match write_lock_guards {
2021 None => None,
2022 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2023 Ok(locks) => Some(locks),
2024 Err(missing) => {
2025 tracing::error!(?missing, "programming error, missing write locks");
2026 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2027 }
2028 },
2029 };
2030
2031 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2032 for WriteOp { id, rows } in writes {
2033 let total_rows = collected_writes.entry(id).or_default();
2034 total_rows.push(rows);
2035 }
2036
2037 self.submit_write(PendingWriteTxn::User {
2038 span: Span::current(),
2039 writes: collected_writes,
2040 write_locks: validated_locks,
2041 responder: UserWriteResponder::Session(PendingTxn {
2042 ctx,
2043 response,
2044 action,
2045 }),
2046 });
2047 return;
2048 }
2049 Ok((
2050 Some(TransactionOps::Peeks {
2051 determination,
2052 requires_linearization: RequireLinearization::Required,
2053 ..
2054 }),
2055 _,
2056 )) if ctx.session().vars().transaction_isolation()
2057 == &IsolationLevel::StrictSerializable =>
2058 {
2059 let conn_id = ctx.session().conn_id().clone();
2060 let pending_read_txn = PendingReadTxn {
2061 txn: PendingRead::Read {
2062 txn: PendingTxn {
2063 ctx,
2064 response,
2065 action,
2066 },
2067 },
2068 timestamp_context: determination.timestamp_context,
2069 created: Instant::now(),
2070 num_requeues: 0,
2071 otel_ctx: OpenTelemetryContext::obtain(),
2072 };
2073 self.strict_serializable_reads_tx
2074 .send((conn_id, pending_read_txn))
2075 .expect("sending to strict_serializable_reads_tx cannot fail");
2076 return;
2077 }
2078 Ok((
2079 Some(TransactionOps::Peeks {
2080 determination,
2081 requires_linearization: RequireLinearization::Required,
2082 ..
2083 }),
2084 _,
2085 )) if ctx.session().vars().transaction_isolation()
2086 == &IsolationLevel::StrongSessionSerializable =>
2087 {
2088 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2089 ctx.session_mut()
2090 .ensure_timestamp_oracle(timeline.clone())
2091 .apply_write(*ts);
2092 }
2093 (response, action)
2094 }
2095 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2096 self.internal_cmd_tx
2097 .send(Message::ExecuteSingleStatementTransaction {
2098 ctx,
2099 otel_ctx: OpenTelemetryContext::obtain(),
2100 stmt,
2101 params,
2102 })
2103 .expect("must send");
2104 return;
2105 }
2106 Ok((_, _)) => (response, action),
2107 Err(err) => (Err(err), EndTransactionAction::Rollback),
2108 };
2109 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2110 let response = response.map(|mut r| {
2112 r.extend_params(changed);
2113 ExecuteResponse::from(r)
2114 });
2115
2116 ctx.retire(response);
2117 }
2118
2119 #[instrument]
2120 async fn sequence_end_transaction_inner(
2121 &mut self,
2122 ctx: &mut ExecuteContext,
2123 action: EndTransactionAction,
2124 ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2125 let txn = self.clear_transaction(ctx.session_mut()).await;
2126
2127 if let EndTransactionAction::Commit = action {
2128 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2129 match &mut ops {
2130 TransactionOps::Writes(writes) => {
2131 for WriteOp { id, .. } in &mut writes.iter() {
2132 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2134 AdapterError::Catalog(mz_catalog::memory::error::Error {
2135 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2136 })
2137 })?;
2138 }
2139
2140 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2142 }
2143 TransactionOps::DDL {
2144 ops,
2145 state: _,
2146 side_effects,
2147 revision,
2148 snapshot: _,
2149 } => {
2150 if *revision != self.catalog().transient_revision() {
2152 return Err(AdapterError::DDLTransactionRace);
2153 }
2154 let ops = std::mem::take(ops);
2156 let side_effects = std::mem::take(side_effects);
2157 self.catalog_transact_with_side_effects(
2158 Some(ctx),
2159 ops,
2160 move |a, mut ctx| {
2161 Box::pin(async move {
2162 for side_effect in side_effects {
2163 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2164 }
2165 })
2166 },
2167 )
2168 .await?;
2169 }
2170 _ => (),
2171 }
2172 return Ok((Some(ops), write_lock_guards));
2173 }
2174 }
2175
2176 Ok((None, None))
2177 }
2178
2179 pub(super) async fn sequence_side_effecting_func(
2180 &mut self,
2181 ctx: ExecuteContext,
2182 plan: SideEffectingFunc,
2183 ) {
2184 match plan {
2185 SideEffectingFunc::PgCancelBackend { connection_id } => {
2186 let Some(connection_id) = connection_id else {
2187 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[
2190 Datum::Null,
2191 ]))));
2192 return;
2193 };
2194
2195 if ctx.session().conn_id().unhandled() == connection_id {
2196 ctx.retire(Err(AdapterError::Canceled));
2200 return;
2201 }
2202
2203 let res = if let Some((id_handle, _conn_meta)) =
2204 self.active_conns.get_key_value(&connection_id)
2205 {
2206 self.handle_privileged_cancel(id_handle.clone()).await;
2208 Datum::True
2209 } else {
2210 Datum::False
2211 };
2212 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2213 }
2214 }
2215 }
2216
2217 pub(crate) async fn execute_side_effecting_func(
2226 &mut self,
2227 plan: SideEffectingFunc,
2228 conn_id: ConnectionId,
2229 current_role: RoleId,
2230 ) -> Result<ExecuteResponse, AdapterError> {
2231 match plan {
2232 SideEffectingFunc::PgCancelBackend { connection_id } => {
2233 let Some(connection_id) = connection_id else {
2234 return Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])));
2237 };
2238
2239 if conn_id.unhandled() == connection_id {
2240 return Err(AdapterError::Canceled);
2244 }
2245
2246 if let Some((_id_handle, conn_meta)) =
2249 self.active_conns.get_key_value(&connection_id)
2250 {
2251 let target_role = *conn_meta.authenticated_role_id();
2252 let role_membership = self
2253 .catalog()
2254 .state()
2255 .collect_role_membership(¤t_role);
2256 if !role_membership.contains(&target_role) {
2257 let target_role_name = self
2258 .catalog()
2259 .try_get_role(&target_role)
2260 .map(|role| role.name().to_string())
2261 .unwrap_or_else(|| target_role.to_string());
2262 return Err(AdapterError::Unauthorized(
2263 rbac::UnauthorizedError::RoleMembership {
2264 role_names: vec![target_role_name],
2265 },
2266 ));
2267 }
2268
2269 let id_handle = self
2271 .active_conns
2272 .get_key_value(&connection_id)
2273 .map(|(id, _)| id.clone())
2274 .expect("checked above");
2275 self.handle_privileged_cancel(id_handle).await;
2276 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2277 } else {
2278 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2280 }
2281 }
2282 }
2283 }
2284
2285 pub(crate) async fn determine_real_time_recent_timestamp(
2289 &self,
2290 source_ids: impl Iterator<Item = GlobalId>,
2291 real_time_recency_timeout: Duration,
2292 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2293 let item_ids = source_ids
2294 .map(|gid| {
2295 self.catalog
2296 .try_resolve_item_id(&gid)
2297 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2298 })
2299 .collect::<Result<Vec<_>, _>>()?;
2300
2301 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2307 if to_visit.is_empty() {
2310 return Ok(None);
2311 }
2312
2313 let mut timestamp_objects = BTreeSet::new();
2314
2315 while let Some(id) = to_visit.pop_front() {
2316 timestamp_objects.insert(id);
2317 to_visit.extend(
2318 self.catalog()
2319 .get_entry(&id)
2320 .uses()
2321 .into_iter()
2322 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2323 );
2324 }
2325 let timestamp_objects = timestamp_objects
2326 .into_iter()
2327 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2328 .collect();
2329
2330 let r = self
2331 .controller
2332 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2333 .await?;
2334
2335 Ok(Some(r))
2336 }
2337
2338 pub(crate) async fn await_real_time_recent_timestamp<F>(
2339 catalog: Arc<Catalog>,
2340 fut: F,
2341 ) -> Result<Timestamp, AdapterError>
2342 where
2343 F: Future<Output = Result<Timestamp, StorageError>>,
2344 {
2345 fut.await
2346 .map_err(|error| Self::real_time_recent_timestamp_error(&catalog, error))
2347 }
2348
2349 fn real_time_recent_timestamp_error(catalog: &Catalog, error: StorageError) -> AdapterError {
2350 let rtr_name = |id: &GlobalId| {
2351 catalog
2352 .try_get_entry_by_global_id(id)
2353 .map(|e| e.name().item.clone())
2354 .unwrap_or_else(|| id.to_string())
2355 };
2356
2357 match error {
2358 StorageError::RtrTimeout(id) => AdapterError::RtrTimeout(rtr_name(&id)),
2359 StorageError::RtrDropFailure(id) => AdapterError::RtrDropFailure(rtr_name(&id)),
2360 error => error.into(),
2361 }
2362 }
2363
2364 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2367 &self,
2368 session: &Session,
2369 source_ids: impl Iterator<Item = GlobalId>,
2370 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2371 let vars = session.vars();
2372
2373 if vars.real_time_recency()
2374 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2375 && !session.contains_read_timestamp()
2376 {
2377 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2378 .await
2379 } else {
2380 Ok(None)
2381 }
2382 }
2383
2384 #[instrument]
2385 pub(super) async fn sequence_explain_plan(
2386 &mut self,
2387 ctx: ExecuteContext,
2388 plan: plan::ExplainPlanPlan,
2389 target_cluster: TargetCluster,
2390 ) {
2391 match &plan.explainee {
2392 plan::Explainee::Statement(stmt) => match stmt {
2393 plan::ExplaineeStatement::CreateView { .. } => {
2394 self.explain_create_view(ctx, plan).await;
2395 }
2396 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2397 self.explain_create_materialized_view(ctx, plan).await;
2398 }
2399 plan::ExplaineeStatement::CreateIndex { .. } => {
2400 self.explain_create_index(ctx, plan).await;
2401 }
2402 plan::ExplaineeStatement::Select { .. } => {
2403 self.explain_peek(ctx, plan, target_cluster).await;
2404 }
2405 plan::ExplaineeStatement::Subscribe { .. } => {
2406 self.explain_subscribe(ctx, plan, target_cluster).await;
2407 }
2408 },
2409 plan::Explainee::View(_) => {
2410 let result = self.explain_view(&ctx, plan);
2411 ctx.retire(result);
2412 }
2413 plan::Explainee::MaterializedView(_) => {
2414 let result = self.explain_materialized_view(&ctx, plan);
2415 ctx.retire(result);
2416 }
2417 plan::Explainee::Index(_) => {
2418 let result = self.explain_index(&ctx, plan);
2419 ctx.retire(result);
2420 }
2421 plan::Explainee::ReplanView(_) => {
2422 self.explain_replan_view(ctx, plan).await;
2423 }
2424 plan::Explainee::ReplanMaterializedView(_) => {
2425 self.explain_replan_materialized_view(ctx, plan).await;
2426 }
2427 plan::Explainee::ReplanIndex(_) => {
2428 self.explain_replan_index(ctx, plan).await;
2429 }
2430 };
2431 }
2432
2433 pub(super) async fn sequence_explain_pushdown(
2434 &mut self,
2435 ctx: ExecuteContext,
2436 plan: plan::ExplainPushdownPlan,
2437 target_cluster: TargetCluster,
2438 ) {
2439 match plan.explainee {
2440 Explainee::Statement(ExplaineeStatement::Select {
2441 broken: false,
2442 plan,
2443 desc: _,
2444 }) => {
2445 let stage = return_if_err!(
2446 self.peek_validate(
2447 ctx.session(),
2448 plan,
2449 target_cluster,
2450 None,
2451 ExplainContext::Pushdown,
2452 Some(ctx.session().vars().max_query_result_size()),
2453 ),
2454 ctx
2455 );
2456 self.sequence_staged(ctx, Span::current(), stage).await;
2457 }
2458 Explainee::MaterializedView(item_id) => {
2459 self.explain_pushdown_materialized_view(ctx, item_id).await;
2460 }
2461 _ => {
2462 ctx.retire(Err(AdapterError::Unsupported(
2463 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2464 )));
2465 }
2466 };
2467 }
2468
2469 async fn execute_explain_pushdown_with_read_holds(
2471 &self,
2472 ctx: ExecuteContext,
2473 as_of: Antichain<Timestamp>,
2474 mz_now: ResultSpec<'static>,
2475 read_holds: Option<ReadHolds>,
2476 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2477 ) {
2478 let fut = self
2479 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2480 .await;
2481 task::spawn(|| "render explain pushdown", async move {
2482 let _read_holds = read_holds;
2484 let res = fut.await;
2485 ctx.retire(res);
2486 });
2487 }
2488
2489 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2491 &self,
2492 session: &Session,
2493 as_of: Antichain<Timestamp>,
2494 mz_now: ResultSpec<'static>,
2495 imports: I,
2496 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2497 super::explain_pushdown_future_inner(
2499 session,
2500 &self.catalog,
2501 &self.controller.storage_collections,
2502 as_of,
2503 mz_now,
2504 imports,
2505 )
2506 .await
2507 }
2508
2509 #[instrument]
2510 pub(super) async fn sequence_insert(
2511 &mut self,
2512 mut ctx: ExecuteContext,
2513 plan: plan::InsertPlan,
2514 ) {
2515 if !ctx.session_mut().transaction().allows_writes() {
2523 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2524 return;
2525 }
2526
2527 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2541 let expr = return_if_err!(
2544 plan.values
2545 .clone()
2546 .lower(self.catalog().system_config(), None),
2547 ctx
2548 );
2549 OptimizedMirRelationExpr(expr)
2550 } else {
2551 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2553
2554 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2556
2557 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2559 };
2560
2561 match optimized_mir.into_inner() {
2562 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2563 let catalog = self.owned_catalog();
2564 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2565 let result =
2566 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2567 ctx.retire(result);
2568 });
2569 }
2570 _ => {
2572 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2573 Some(table) => {
2574 let desc = table.relation_desc_latest().expect("table has a desc");
2576 desc.arity()
2577 }
2578 None => {
2579 ctx.retire(Err(AdapterError::Catalog(
2580 mz_catalog::memory::error::Error {
2581 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2582 plan.id.to_string(),
2583 )),
2584 },
2585 )));
2586 return;
2587 }
2588 };
2589
2590 let finishing = RowSetFinishing {
2591 order_by: vec![],
2592 limit: None,
2593 offset: 0,
2594 project: (0..desc_arity).collect(),
2595 };
2596
2597 let read_then_write_plan = plan::ReadThenWritePlan {
2598 id: plan.id,
2599 selection: plan.values,
2600 finishing,
2601 assignments: BTreeMap::new(),
2602 kind: MutationKind::Insert,
2603 returning: plan.returning,
2604 };
2605
2606 self.sequence_read_then_write(ctx, read_then_write_plan)
2607 .await;
2608 }
2609 }
2610 }
2611
2612 #[instrument]
2617 pub(super) async fn sequence_read_then_write(
2618 &mut self,
2619 mut ctx: ExecuteContext,
2620 plan: plan::ReadThenWritePlan,
2621 ) {
2622 let mut source_ids: BTreeSet<_> = plan
2623 .selection
2624 .depends_on()
2625 .into_iter()
2626 .map(|gid| self.catalog().resolve_item_id(&gid))
2627 .collect();
2628 source_ids.insert(plan.id);
2629
2630 if ctx.session().transaction().write_locks().is_none() {
2632 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2634
2635 for id in &source_ids {
2637 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2638 write_locks.insert_lock(*id, lock);
2639 }
2640 }
2641
2642 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2644 Ok(locks) => locks,
2645 Err(missing) => {
2646 let role_metadata = ctx.session().role_metadata().clone();
2648 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2649 let plan = DeferredPlan {
2650 ctx,
2651 plan: Plan::ReadThenWrite(plan),
2652 validity: PlanValidity::new(
2653 self.catalog.transient_revision(),
2654 source_ids.clone(),
2655 None,
2656 None,
2657 role_metadata,
2658 ),
2659 requires_locks: source_ids,
2660 };
2661 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2662 }
2663 };
2664
2665 ctx.session_mut()
2666 .try_grant_write_locks(write_locks)
2667 .expect("session has already been granted write locks");
2668 }
2669
2670 let plan::ReadThenWritePlan {
2671 id,
2672 kind,
2673 selection,
2674 mut assignments,
2675 finishing,
2676 mut returning,
2677 } = plan;
2678
2679 let desc = match self.catalog().try_get_entry(&id) {
2681 Some(table) => {
2682 table
2684 .relation_desc_latest()
2685 .expect("table has a desc")
2686 .into_owned()
2687 }
2688 None => {
2689 ctx.retire(Err(AdapterError::Catalog(
2690 mz_catalog::memory::error::Error {
2691 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2692 },
2693 )));
2694 return;
2695 }
2696 };
2697
2698 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2700 || assignments.values().any(|e| e.contains_temporal())
2701 || returning.iter().any(|e| e.contains_temporal());
2702 if contains_temporal {
2703 ctx.retire(Err(AdapterError::Unsupported(
2704 "calls to mz_now in write statements",
2705 )));
2706 return;
2707 }
2708
2709 for gid in selection.depends_on() {
2711 let item_id = self.catalog().resolve_item_id(&gid);
2712 if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2713 ctx.retire(Err(err));
2714 return;
2715 }
2716 }
2717
2718 let (peek_tx, peek_rx) = oneshot::channel();
2719 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2720 let (tx, _, session, extra) = ctx.into_parts();
2721 let peek_ctx = ExecuteContext::from_parts(
2733 peek_client_tx,
2734 self.internal_cmd_tx.clone(),
2735 session,
2736 Default::default(),
2737 );
2738
2739 self.sequence_peek(
2740 peek_ctx,
2741 plan::SelectPlan {
2742 select: None,
2743 source: selection,
2744 when: QueryWhen::FreshestTableWrite,
2745 finishing,
2746 copy_to: None,
2747 },
2748 TargetCluster::Active,
2749 None,
2750 )
2751 .await;
2752
2753 let internal_cmd_tx = self.internal_cmd_tx.clone();
2754 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2755 let catalog = self.owned_catalog();
2756 let max_result_size = self.catalog().system_config().max_result_size();
2757
2758 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2759 let (peek_response, session) = match peek_rx.await {
2760 Ok(Response {
2761 result: Ok(resp),
2762 session,
2763 otel_ctx,
2764 }) => {
2765 otel_ctx.attach_as_parent();
2766 (resp, session)
2767 }
2768 Ok(Response {
2769 result: Err(e),
2770 session,
2771 otel_ctx,
2772 }) => {
2773 let ctx =
2774 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2775 otel_ctx.attach_as_parent();
2776 ctx.retire(Err(e));
2777 return;
2778 }
2779 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2781 };
2782 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2783 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2784
2785 if timeout_dur == Duration::ZERO {
2787 timeout_dur = Duration::MAX;
2788 }
2789
2790 let style = ExprPrepOneShot {
2791 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2793 catalog_state: catalog.state(),
2794 };
2795 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2796 return_if_err!(style.prep_scalar_expr(expr), ctx);
2797 }
2798
2799 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2800 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2801 let arena = RowArena::new();
2802 let mut diffs = Vec::new();
2803 let mut datum_vec = mz_repr::DatumVec::new();
2804
2805 while let Some(row) = rows.next() {
2806 if !assignments.is_empty() {
2807 assert!(
2808 matches!(kind, MutationKind::Update),
2809 "only updates support assignments"
2810 );
2811 let mut datums = datum_vec.borrow_with(row);
2812 let mut updates = vec![];
2813 for (idx, expr) in &assignments {
2814 let updated = match expr.eval(&datums, &arena) {
2815 Ok(updated) => updated,
2816 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2817 };
2818 updates.push((*idx, updated));
2819 }
2820 for (idx, new_value) in updates {
2821 datums[idx] = new_value;
2822 }
2823 let updated = Row::pack_slice(&datums);
2824 diffs.push((updated, Diff::ONE));
2825 }
2826 match kind {
2827 MutationKind::Update | MutationKind::Delete => {
2831 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2832 }
2833 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2834 }
2835 }
2836
2837 let mut byte_size: u64 = 0;
2840 for (row, diff) in &diffs {
2841 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2842 if diff.is_positive() {
2843 for (idx, datum) in row.iter().enumerate() {
2844 desc.constraints_met(idx, &datum)?;
2845 }
2846 }
2847 }
2848 Ok((diffs, byte_size))
2849 };
2850
2851 let diffs = match peek_response {
2852 ExecuteResponse::SendingRowsStreaming {
2853 rows: mut rows_stream,
2854 ..
2855 } => {
2856 let mut byte_size: u64 = 0;
2857 let mut diffs = Vec::new();
2858 let result = loop {
2859 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2860 Ok(Some(res)) => match res {
2861 PeekResponseUnary::Rows(new_rows) => {
2862 match make_diffs(new_rows) {
2863 Ok((mut new_diffs, new_byte_size)) => {
2864 byte_size = byte_size.saturating_add(new_byte_size);
2865 if byte_size > max_result_size {
2866 break Err(AdapterError::ResultSize(format!(
2867 "result exceeds max size of {max_result_size}"
2868 )));
2869 }
2870 diffs.append(&mut new_diffs)
2871 }
2872 Err(e) => break Err(e),
2873 };
2874 }
2875 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2876 PeekResponseUnary::Error(e) => {
2877 break Err(AdapterError::Unstructured(anyhow!(e)));
2878 }
2879 PeekResponseUnary::DependencyDropped(dep) => {
2880 break Err(dep.to_concurrent_dependency_drop());
2881 }
2882 },
2883 Ok(None) => break Ok(diffs),
2884 Err(_) => {
2885 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2890 conn_id: ctx.session().conn_id().clone(),
2891 });
2892 if let Err(e) = result {
2893 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2894 }
2895 break Err(AdapterError::StatementTimeout);
2896 }
2897 }
2898 };
2899
2900 result
2901 }
2902 ExecuteResponse::SendingRowsImmediate { rows } => {
2903 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2904 }
2905 resp => Err(AdapterError::Unstructured(anyhow!(
2906 "unexpected peek response: {resp:?}"
2907 ))),
2908 };
2909
2910 let mut returning_rows = Vec::new();
2911 let mut diff_err: Option<AdapterError> = None;
2912 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2913 let arena = RowArena::new();
2914 for (row, diff) in diffs {
2915 if !diff.is_positive() {
2916 continue;
2917 }
2918 let mut returning_row = Row::with_capacity(returning.len());
2919 let mut packer = returning_row.packer();
2920 for expr in &returning {
2921 let datums: Vec<_> = row.iter().collect();
2922 match expr.eval(&datums, &arena) {
2923 Ok(datum) => {
2924 packer.push(datum);
2925 }
2926 Err(err) => {
2927 diff_err = Some(err.into());
2928 break;
2929 }
2930 }
2931 }
2932 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2933 let diff = match NonZeroUsize::try_from(diff) {
2934 Ok(diff) => diff,
2935 Err(err) => {
2936 diff_err = Some(err.into());
2937 break;
2938 }
2939 };
2940 returning_rows.push((returning_row, diff));
2941 if diff_err.is_some() {
2942 break;
2943 }
2944 }
2945 }
2946 let diffs = if let Some(err) = diff_err {
2947 Err(err)
2948 } else {
2949 diffs
2950 };
2951
2952 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2955 if let Some(timestamp_context) = timestamp_context {
2964 let (tx, rx) = tokio::sync::oneshot::channel();
2965 let conn_id = ctx.session().conn_id().clone();
2966 let pending_read_txn = PendingReadTxn {
2967 txn: PendingRead::ReadThenWrite { ctx, tx },
2968 timestamp_context,
2969 created: Instant::now(),
2970 num_requeues: 0,
2971 otel_ctx: OpenTelemetryContext::obtain(),
2972 };
2973 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2974 if let Err(e) = result {
2976 warn!(
2977 "strict_serializable_reads_tx dropped before we could send: {:?}",
2978 e
2979 );
2980 return;
2981 }
2982 let result = rx.await;
2983 ctx = match result {
2985 Ok(Some(ctx)) => ctx,
2986 Ok(None) => {
2987 return;
2990 }
2991 Err(e) => {
2992 warn!(
2993 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2994 e
2995 );
2996 return;
2997 }
2998 };
2999 }
3000
3001 match diffs {
3002 Ok(diffs) => {
3003 let result = Self::send_diffs(
3004 ctx.session_mut(),
3005 plan::SendDiffsPlan {
3006 id,
3007 updates: diffs,
3008 kind,
3009 returning: returning_rows,
3010 max_result_size,
3011 },
3012 );
3013 ctx.retire(result);
3014 }
3015 Err(e) => {
3016 ctx.retire(Err(e));
3017 }
3018 }
3019 });
3020 }
3021
3022 #[instrument]
3023 pub(super) async fn sequence_alter_item_rename(
3024 &mut self,
3025 ctx: &mut ExecuteContext,
3026 plan: plan::AlterItemRenamePlan,
3027 ) -> Result<ExecuteResponse, AdapterError> {
3028 let op = catalog::Op::RenameItem {
3029 id: plan.id,
3030 current_full_name: plan.current_full_name,
3031 to_name: plan.to_name,
3032 };
3033 match self
3034 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3035 .await
3036 {
3037 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3038 Err(err) => Err(err),
3039 }
3040 }
3041
3042 #[instrument]
3043 pub(super) async fn sequence_alter_retain_history(
3044 &mut self,
3045 ctx: &mut ExecuteContext,
3046 plan: plan::AlterRetainHistoryPlan,
3047 ) -> Result<ExecuteResponse, AdapterError> {
3048 let ops = vec![catalog::Op::AlterRetainHistory {
3049 id: plan.id,
3050 value: plan.value,
3051 window: plan.window,
3052 }];
3053 self.catalog_transact_with_context(None, Some(ctx), ops)
3054 .await?;
3055 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3056 }
3057
3058 #[instrument]
3059 pub(super) async fn sequence_alter_source_timestamp_interval(
3060 &mut self,
3061 ctx: &mut ExecuteContext,
3062 plan: plan::AlterSourceTimestampIntervalPlan,
3063 ) -> Result<ExecuteResponse, AdapterError> {
3064 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3065 id: plan.id,
3066 value: plan.value,
3067 interval: plan.interval,
3068 }];
3069 self.catalog_transact_with_context(None, Some(ctx), ops)
3070 .await?;
3071 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3072 }
3073
3074 #[instrument]
3075 pub(super) async fn sequence_alter_schema_rename(
3076 &mut self,
3077 ctx: &mut ExecuteContext,
3078 plan: plan::AlterSchemaRenamePlan,
3079 ) -> Result<ExecuteResponse, AdapterError> {
3080 let (database_spec, schema_spec) = plan.cur_schema_spec;
3081 let op = catalog::Op::RenameSchema {
3082 database_spec,
3083 schema_spec,
3084 new_name: plan.new_schema_name,
3085 check_reserved_names: true,
3086 };
3087 match self
3088 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3089 .await
3090 {
3091 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3092 Err(err) => Err(err),
3093 }
3094 }
3095
3096 #[instrument]
3097 pub(super) async fn sequence_alter_schema_swap(
3098 &mut self,
3099 ctx: &mut ExecuteContext,
3100 plan: plan::AlterSchemaSwapPlan,
3101 ) -> Result<ExecuteResponse, AdapterError> {
3102 let plan::AlterSchemaSwapPlan {
3103 schema_a_spec: (schema_a_db, schema_a),
3104 schema_a_name,
3105 schema_b_spec: (schema_b_db, schema_b),
3106 schema_b_name,
3107 name_temp,
3108 } = plan;
3109
3110 let op_a = catalog::Op::RenameSchema {
3111 database_spec: schema_a_db,
3112 schema_spec: schema_a,
3113 new_name: name_temp,
3114 check_reserved_names: false,
3115 };
3116 let op_b = catalog::Op::RenameSchema {
3117 database_spec: schema_b_db,
3118 schema_spec: schema_b,
3119 new_name: schema_a_name,
3120 check_reserved_names: false,
3121 };
3122 let op_c = catalog::Op::RenameSchema {
3123 database_spec: schema_a_db,
3124 schema_spec: schema_a,
3125 new_name: schema_b_name,
3126 check_reserved_names: false,
3127 };
3128
3129 match self
3130 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3131 Box::pin(async {})
3132 })
3133 .await
3134 {
3135 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3136 Err(err) => Err(err),
3137 }
3138 }
3139
3140 #[instrument]
3141 pub(super) async fn sequence_alter_role(
3142 &mut self,
3143 session: &Session,
3144 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3145 ) -> Result<ExecuteResponse, AdapterError> {
3146 let catalog = self.catalog().for_session(session);
3147 let role = catalog.get_role(&id);
3148
3149 let mut notices = vec![];
3151
3152 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3154 let mut vars = role.vars().clone();
3155
3156 let mut nopassword = false;
3159
3160 match option {
3162 PlannedAlterRoleOption::Attributes(attrs) => {
3163 self.validate_role_attributes(&attrs.clone().into())?;
3164
3165 if let Some(inherit) = attrs.inherit {
3166 attributes.inherit = inherit;
3167 }
3168
3169 if let Some(password) = attrs.password {
3170 attributes.password = Some(password);
3171 attributes.scram_iterations =
3172 Some(self.catalog().system_config().scram_iterations())
3173 }
3174
3175 if let Some(superuser) = attrs.superuser {
3176 attributes.superuser = Some(superuser);
3177 }
3178
3179 if let Some(login) = attrs.login {
3180 attributes.login = Some(login);
3181 }
3182
3183 if attrs.nopassword.unwrap_or(false) {
3184 nopassword = true;
3185 }
3186
3187 if let Some(notice) = self.should_emit_rbac_notice(session) {
3188 notices.push(notice);
3189 }
3190 }
3191 PlannedAlterRoleOption::Variable(variable) => {
3192 let session_var = session.vars().inspect(variable.name())?;
3194 session_var.visible(session.user(), catalog.system_vars())?;
3196
3197 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3200 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3201 } else if let PlannedRoleVariable::Set {
3202 name,
3203 value: VariableValue::Values(vals),
3204 } = &variable
3205 {
3206 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3207 notices.push(AdapterNotice::IntrospectionClusterUsage);
3208 }
3209 }
3210
3211 let var_name = match variable {
3212 PlannedRoleVariable::Set { name, value } => {
3213 match &value {
3215 VariableValue::Default => {
3216 vars.remove(&name);
3217 }
3218 VariableValue::Values(vals) => {
3219 let var = match &vals[..] {
3220 [val] => OwnedVarInput::Flat(val.clone()),
3221 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3222 };
3223 session_var.check(var.borrow())?;
3225
3226 vars.insert(name.clone(), var);
3227 }
3228 };
3229 name
3230 }
3231 PlannedRoleVariable::Reset { name } => {
3232 vars.remove(&name);
3234 name
3235 }
3236 };
3237
3238 notices.push(AdapterNotice::VarDefaultUpdated {
3240 role: Some(name.clone()),
3241 var_name: Some(var_name),
3242 });
3243 }
3244 }
3245
3246 let op = catalog::Op::AlterRole {
3247 id,
3248 name,
3249 attributes,
3250 nopassword,
3251 vars: RoleVars { map: vars },
3252 };
3253 let response = self
3254 .catalog_transact(Some(session), vec![op])
3255 .await
3256 .map(|_| ExecuteResponse::AlteredRole)?;
3257
3258 session.add_notices(notices);
3260
3261 Ok(response)
3262 }
3263
3264 #[instrument]
3265 pub(super) async fn sequence_alter_sink_prepare(
3266 &mut self,
3267 ctx: ExecuteContext,
3268 plan: plan::AlterSinkPlan,
3269 ) {
3270 let id_bundle = crate::CollectionIdBundle {
3272 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3273 compute_ids: BTreeMap::new(),
3274 };
3275 let read_hold = self.acquire_read_holds(&id_bundle);
3276
3277 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3278 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3279 return;
3280 };
3281
3282 let otel_ctx = OpenTelemetryContext::obtain();
3283 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3284
3285 let plan_validity = PlanValidity::new(
3286 self.catalog().transient_revision(),
3287 BTreeSet::from_iter([plan.item_id, from_item_id]),
3288 Some(plan.in_cluster),
3289 None,
3290 ctx.session().role_metadata().clone(),
3291 );
3292
3293 info!(
3294 "preparing alter sink for {}: frontiers={:?} export={:?}",
3295 plan.global_id,
3296 self.controller
3297 .storage_collections
3298 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3299 self.controller.storage.export(plan.global_id)
3300 );
3301
3302 self.install_storage_watch_set(
3308 ctx.session().conn_id().clone(),
3309 BTreeSet::from_iter([plan.global_id]),
3310 read_ts,
3311 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3312 ctx: Some(ctx),
3313 otel_ctx,
3314 plan,
3315 plan_validity,
3316 read_hold,
3317 }),
3318 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3319 }
3320
3321 #[instrument]
3322 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3323 ctx.otel_ctx.attach_as_parent();
3324
3325 let plan::AlterSinkPlan {
3326 item_id,
3327 global_id,
3328 sink: sink_plan,
3329 with_snapshot,
3330 in_cluster,
3331 } = ctx.plan.clone();
3332
3333 match ctx.plan_validity.check(self.catalog()) {
3342 Ok(()) => {}
3343 Err(err) => {
3344 ctx.retire(Err(err));
3345 return;
3346 }
3347 }
3348
3349 let entry = self.catalog().get_entry(&item_id);
3350 let CatalogItem::Sink(old_sink) = entry.item() else {
3351 panic!("invalid item kind for `AlterSinkPlan`");
3352 };
3353
3354 if sink_plan.version != old_sink.version + 1 {
3355 ctx.retire(Err(AdapterError::ChangedPlan(
3356 "sink was altered concurrently".into(),
3357 )));
3358 return;
3359 }
3360
3361 info!(
3362 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3363 self.controller
3364 .storage_collections
3365 .collections_frontiers(vec![global_id, sink_plan.from]),
3366 self.controller.storage.export(global_id),
3367 );
3368
3369 let write_frontier = &self
3372 .controller
3373 .storage
3374 .export(global_id)
3375 .expect("sink known to exist")
3376 .write_frontier;
3377 let as_of = ctx.read_hold.least_valid_read();
3378 assert!(
3379 write_frontier.iter().all(|t| as_of.less_than(t)),
3380 "{:?} should be strictly less than {:?}",
3381 &*as_of,
3382 &**write_frontier
3383 );
3384
3385 let create_sql = &old_sink.create_sql;
3391 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3392 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3393 unreachable!("invalid statement kind for sink");
3394 };
3395
3396 stmt.with_options
3398 .retain(|o| o.name != CreateSinkOptionName::Version);
3399 stmt.with_options.push(CreateSinkOption {
3400 name: CreateSinkOptionName::Version,
3401 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3402 sink_plan.version.to_string(),
3403 ))),
3404 });
3405
3406 let conn_catalog = self.catalog().for_system_session();
3407 let (mut stmt, resolved_ids) =
3408 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3409
3410 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3412 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3413 stmt.from = ResolvedItemName::Item {
3414 id: from_entry.id(),
3415 qualifiers: from_entry.name.qualifiers.clone(),
3416 full_name,
3417 print_id: true,
3418 version: from_entry.version,
3419 };
3420
3421 let new_sink = Sink {
3422 create_sql: stmt.to_ast_string_stable(),
3423 global_id,
3424 from: sink_plan.from,
3425 connection: sink_plan.connection.clone(),
3426 envelope: sink_plan.envelope,
3427 version: sink_plan.version,
3428 with_snapshot,
3429 resolved_ids: resolved_ids.clone(),
3430 cluster_id: in_cluster,
3431 commit_interval: sink_plan.commit_interval,
3432 };
3433
3434 let ops = vec![catalog::Op::UpdateItem {
3435 id: item_id,
3436 name: entry.name().clone(),
3437 to_item: CatalogItem::Sink(new_sink),
3438 }];
3439
3440 match self
3441 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3442 .await
3443 {
3444 Ok(()) => {}
3445 Err(err) => {
3446 ctx.retire(Err(err));
3447 return;
3448 }
3449 }
3450
3451 let storage_sink_desc = StorageSinkDesc {
3452 from: sink_plan.from,
3453 from_desc: from_entry
3454 .relation_desc()
3455 .expect("sinks can only be built on items with descs")
3456 .into_owned(),
3457 connection: sink_plan
3458 .connection
3459 .clone()
3460 .into_inline_connection(self.catalog().state()),
3461 envelope: sink_plan.envelope,
3462 as_of,
3463 with_snapshot,
3464 version: sink_plan.version,
3465 from_storage_metadata: (),
3466 to_storage_metadata: (),
3467 commit_interval: sink_plan.commit_interval,
3468 };
3469
3470 self.controller
3471 .storage
3472 .alter_export(
3473 global_id,
3474 ExportDescription {
3475 sink: storage_sink_desc,
3476 instance_id: in_cluster,
3477 },
3478 )
3479 .await
3480 .unwrap_or_terminate("cannot fail to alter source desc");
3481
3482 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3483 }
3484
3485 #[instrument]
3486 pub(super) async fn sequence_alter_connection(
3487 &mut self,
3488 ctx: ExecuteContext,
3489 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3490 ) {
3491 match action {
3492 AlterConnectionAction::RotateKeys => {
3493 self.sequence_rotate_keys(ctx, id).await;
3494 }
3495 AlterConnectionAction::AlterOptions {
3496 set_options,
3497 drop_options,
3498 validate,
3499 } => {
3500 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3501 .await
3502 }
3503 }
3504 }
3505
3506 #[instrument]
3507 async fn sequence_alter_connection_options(
3508 &mut self,
3509 mut ctx: ExecuteContext,
3510 id: CatalogItemId,
3511 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3512 drop_options: BTreeSet<ConnectionOptionName>,
3513 validate: bool,
3514 ) {
3515 let cur_entry = self.catalog().get_entry(&id);
3516 let cur_conn = cur_entry.connection().expect("known to be connection");
3517 let connection_gid = cur_conn.global_id();
3518
3519 let inner = || -> Result<Connection, AdapterError> {
3520 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3522 .expect("invalid create sql persisted to catalog")
3523 .into_element()
3524 .ast
3525 {
3526 Statement::CreateConnection(stmt) => stmt,
3527 _ => unreachable!("proved type is source"),
3528 };
3529
3530 let catalog = self.catalog().for_system_session();
3531
3532 let (mut create_conn_stmt, resolved_ids) =
3534 mz_sql::names::resolve(&catalog, create_conn_stmt)
3535 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3536
3537 create_conn_stmt
3539 .values
3540 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3541
3542 create_conn_stmt.values.extend(
3544 set_options
3545 .into_iter()
3546 .map(|(name, value)| ConnectionOption { name, value }),
3547 );
3548
3549 let mut catalog = self.catalog().for_system_session();
3552 catalog.mark_id_unresolvable_for_replanning(id);
3553
3554 let plan = match mz_sql::plan::plan(
3556 None,
3557 &catalog,
3558 Statement::CreateConnection(create_conn_stmt),
3559 &Params::empty(),
3560 &resolved_ids,
3561 )
3562 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3563 {
3564 (Plan::CreateConnection(plan), _sql_impl_ids) => plan,
3565 (p, _) => {
3566 unreachable!("create connection plan is only valid response, got {:?}", p)
3567 }
3568 };
3569
3570 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3572 .expect("invalid create sql persisted to catalog")
3573 .into_element()
3574 .ast
3575 {
3576 Statement::CreateConnection(stmt) => stmt,
3577 _ => unreachable!("proved type is source"),
3578 };
3579
3580 let catalog = self.catalog().for_system_session();
3581
3582 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3584 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3585
3586 Ok(Connection {
3587 create_sql: plan.connection.create_sql,
3588 global_id: cur_conn.global_id,
3589 details: plan.connection.details,
3590 resolved_ids: new_deps,
3591 })
3592 };
3593
3594 let conn = match inner() {
3595 Ok(conn) => conn,
3596 Err(e) => {
3597 return ctx.retire(Err(e));
3598 }
3599 };
3600
3601 if validate {
3602 let connection = conn
3603 .details
3604 .to_connection()
3605 .into_inline_connection(self.catalog().state());
3606
3607 let internal_cmd_tx = self.internal_cmd_tx.clone();
3608 let transient_revision = self.catalog().transient_revision();
3609 let conn_id = ctx.session().conn_id().clone();
3610 let otel_ctx = OpenTelemetryContext::obtain();
3611 let role_metadata = ctx.session().role_metadata().clone();
3612 let current_storage_parameters = self.controller.storage.config().clone();
3613
3614 task::spawn(
3615 || format!("validate_alter_connection:{conn_id}"),
3616 async move {
3617 let resolved_ids = conn.resolved_ids.clone();
3618 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3619 let result = match std::panic::AssertUnwindSafe(
3620 connection.validate(id, ¤t_storage_parameters),
3621 )
3622 .ore_catch_unwind()
3623 .await
3624 {
3625 Ok(Ok(())) => Ok(conn),
3626 Ok(Err(err)) => Err(err.into()),
3627 Err(_panic) => {
3628 tracing::error!("alter connection validation panicked");
3629 Err(AdapterError::Internal(
3630 "connection validation panicked".into(),
3631 ))
3632 }
3633 };
3634
3635 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3637 AlterConnectionValidationReady {
3638 ctx,
3639 result,
3640 connection_id: id,
3641 connection_gid,
3642 plan_validity: PlanValidity::new(
3643 transient_revision,
3644 dependency_ids.clone(),
3645 None,
3646 None,
3647 role_metadata,
3648 ),
3649 otel_ctx,
3650 resolved_ids,
3651 },
3652 ));
3653 if let Err(e) = result {
3654 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3655 }
3656 },
3657 );
3658 } else {
3659 let result = self
3660 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3661 .await;
3662 ctx.retire(result);
3663 }
3664 }
3665
3666 #[instrument]
3667 pub(crate) async fn sequence_alter_connection_stage_finish(
3668 &mut self,
3669 session: &Session,
3670 id: CatalogItemId,
3671 connection: Connection,
3672 ) -> Result<ExecuteResponse, AdapterError> {
3673 match self.catalog.get_entry(&id).item() {
3674 CatalogItem::Connection(curr_conn) => {
3675 curr_conn
3676 .details
3677 .to_connection()
3678 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3679 .map_err(StorageError::from)?;
3680 }
3681 _ => unreachable!("known to be a connection"),
3682 };
3683
3684 let ops = vec![catalog::Op::UpdateItem {
3685 id,
3686 name: self.catalog.get_entry(&id).name().clone(),
3687 to_item: CatalogItem::Connection(connection.clone()),
3688 }];
3689
3690 self.catalog_transact(Some(session), ops).await?;
3691
3692 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3699 }
3700
3701 #[instrument]
3702 pub(super) async fn sequence_alter_source(
3703 &mut self,
3704 session: &Session,
3705 plan::AlterSourcePlan {
3706 item_id,
3707 ingestion_id,
3708 action,
3709 }: plan::AlterSourcePlan,
3710 ) -> Result<ExecuteResponse, AdapterError> {
3711 let cur_entry = self.catalog().get_entry(&item_id);
3712 let cur_source = cur_entry.source().expect("known to be source");
3713
3714 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3715 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3717 .expect("invalid create sql persisted to catalog")
3718 .into_element()
3719 .ast
3720 {
3721 Statement::CreateSource(stmt) => stmt,
3722 _ => unreachable!("proved type is source"),
3723 };
3724
3725 let catalog = coord.catalog().for_system_session();
3726
3727 mz_sql::names::resolve(&catalog, create_source_stmt)
3729 .map_err(|e| AdapterError::internal(err_cx, e))
3730 };
3731
3732 match action {
3733 plan::AlterSourceAction::AddSubsourceExports {
3734 subsources,
3735 options,
3736 } => {
3737 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3738
3739 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3740 text_columns: mut new_text_columns,
3741 exclude_columns: mut new_exclude_columns,
3742 ..
3743 } = options.try_into()?;
3744
3745 let (mut create_source_stmt, resolved_ids) =
3747 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3748
3749 let catalog = self.catalog();
3751 let curr_references: BTreeSet<_> = catalog
3752 .get_entry(&item_id)
3753 .used_by()
3754 .into_iter()
3755 .filter_map(|subsource| {
3756 catalog
3757 .get_entry(subsource)
3758 .subsource_details()
3759 .map(|(_id, reference, _details)| reference)
3760 })
3761 .collect();
3762
3763 let purification_err =
3766 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3767
3768 match &mut create_source_stmt.connection {
3772 CreateSourceConnection::Postgres {
3773 options: curr_options,
3774 ..
3775 } => {
3776 let mz_sql::plan::PgConfigOptionExtracted {
3777 mut text_columns, ..
3778 } = curr_options.clone().try_into()?;
3779
3780 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3783
3784 text_columns.retain(|column_qualified_reference| {
3786 mz_ore::soft_assert_eq_or_log!(
3787 column_qualified_reference.0.len(),
3788 4,
3789 "all TEXT COLUMNS values must be column-qualified references"
3790 );
3791 let mut table = column_qualified_reference.clone();
3792 table.0.truncate(3);
3793 curr_references.contains(&table)
3794 });
3795
3796 new_text_columns.extend(text_columns);
3798
3799 if !new_text_columns.is_empty() {
3801 new_text_columns.sort();
3802 let new_text_columns = new_text_columns
3803 .into_iter()
3804 .map(WithOptionValue::UnresolvedItemName)
3805 .collect();
3806
3807 curr_options.push(PgConfigOption {
3808 name: PgConfigOptionName::TextColumns,
3809 value: Some(WithOptionValue::Sequence(new_text_columns)),
3810 });
3811 }
3812 }
3813 CreateSourceConnection::MySql {
3814 options: curr_options,
3815 ..
3816 } => {
3817 let mz_sql::plan::MySqlConfigOptionExtracted {
3818 mut text_columns,
3819 mut exclude_columns,
3820 ..
3821 } = curr_options.clone().try_into()?;
3822
3823 curr_options.retain(|o| {
3826 !matches!(
3827 o.name,
3828 MySqlConfigOptionName::TextColumns
3829 | MySqlConfigOptionName::ExcludeColumns
3830 )
3831 });
3832
3833 let column_referenced =
3835 |column_qualified_reference: &UnresolvedItemName| {
3836 mz_ore::soft_assert_eq_or_log!(
3837 column_qualified_reference.0.len(),
3838 3,
3839 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3840 );
3841 let mut table = column_qualified_reference.clone();
3842 table.0.truncate(2);
3843 curr_references.contains(&table)
3844 };
3845 text_columns.retain(column_referenced);
3846 exclude_columns.retain(column_referenced);
3847
3848 new_text_columns.extend(text_columns);
3850 new_exclude_columns.extend(exclude_columns);
3851
3852 if !new_text_columns.is_empty() {
3854 new_text_columns.sort();
3855 let new_text_columns = new_text_columns
3856 .into_iter()
3857 .map(WithOptionValue::UnresolvedItemName)
3858 .collect();
3859
3860 curr_options.push(MySqlConfigOption {
3861 name: MySqlConfigOptionName::TextColumns,
3862 value: Some(WithOptionValue::Sequence(new_text_columns)),
3863 });
3864 }
3865 if !new_exclude_columns.is_empty() {
3867 new_exclude_columns.sort();
3868 let new_exclude_columns = new_exclude_columns
3869 .into_iter()
3870 .map(WithOptionValue::UnresolvedItemName)
3871 .collect();
3872
3873 curr_options.push(MySqlConfigOption {
3874 name: MySqlConfigOptionName::ExcludeColumns,
3875 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3876 });
3877 }
3878 }
3879 CreateSourceConnection::SqlServer {
3880 options: curr_options,
3881 ..
3882 } => {
3883 let mz_sql::plan::SqlServerConfigOptionExtracted {
3884 mut text_columns,
3885 mut exclude_columns,
3886 ..
3887 } = curr_options.clone().try_into()?;
3888
3889 curr_options.retain(|o| {
3892 !matches!(
3893 o.name,
3894 SqlServerConfigOptionName::TextColumns
3895 | SqlServerConfigOptionName::ExcludeColumns
3896 )
3897 });
3898
3899 let column_referenced =
3905 |column_qualified_reference: &UnresolvedItemName| {
3906 mz_ore::soft_assert_eq_or_log!(
3907 column_qualified_reference.0.len(),
3908 3,
3909 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3910 );
3911 let mut table = column_qualified_reference.clone();
3912 table.0.truncate(2);
3913 curr_references.iter().any(|r| r.0.ends_with(&table.0))
3914 };
3915 text_columns.retain(column_referenced);
3916 exclude_columns.retain(column_referenced);
3917
3918 new_text_columns.extend(text_columns);
3920 new_exclude_columns.extend(exclude_columns);
3921
3922 if !new_text_columns.is_empty() {
3924 new_text_columns.sort();
3925 let new_text_columns = new_text_columns
3926 .into_iter()
3927 .map(WithOptionValue::UnresolvedItemName)
3928 .collect();
3929
3930 curr_options.push(SqlServerConfigOption {
3931 name: SqlServerConfigOptionName::TextColumns,
3932 value: Some(WithOptionValue::Sequence(new_text_columns)),
3933 });
3934 }
3935 if !new_exclude_columns.is_empty() {
3937 new_exclude_columns.sort();
3938 let new_exclude_columns = new_exclude_columns
3939 .into_iter()
3940 .map(WithOptionValue::UnresolvedItemName)
3941 .collect();
3942
3943 curr_options.push(SqlServerConfigOption {
3944 name: SqlServerConfigOptionName::ExcludeColumns,
3945 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3946 });
3947 }
3948 }
3949 _ => return Err(purification_err()),
3950 };
3951
3952 let mut catalog = self.catalog().for_system_session();
3953 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3954
3955 let planned = mz_sql::plan::plan(
3957 None,
3958 &catalog,
3959 Statement::CreateSource(create_source_stmt),
3960 &Params::empty(),
3961 &resolved_ids,
3962 )
3963 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3964 let plan = match planned {
3965 (Plan::CreateSource(plan), _sql_impl_ids) => plan,
3966 (p, _) => {
3967 unreachable!("create source plan is only valid response, got {:?}", p)
3968 }
3969 };
3970
3971 let source = Source::new(
3975 plan,
3976 cur_source.global_id,
3977 resolved_ids,
3978 cur_source.custom_logical_compaction_window,
3979 cur_source.is_retained_metrics_object,
3980 );
3981
3982 let desc = match &source.data_source {
3984 DataSourceDesc::Ingestion { desc, .. }
3985 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3986 desc.clone().into_inline_connection(self.catalog().state())
3987 }
3988 _ => unreachable!("already verified of type ingestion"),
3989 };
3990
3991 self.controller
3992 .storage
3993 .check_alter_ingestion_source_desc(ingestion_id, &desc)
3994 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
3995
3996 let mut ops = vec![catalog::Op::UpdateItem {
3999 id: item_id,
4000 name: self.catalog.get_entry(&item_id).name().clone(),
4003 to_item: CatalogItem::Source(source),
4004 }];
4005
4006 let CreateSourceInner {
4007 ops: new_ops,
4008 sources: _,
4009 if_not_exists_ids,
4010 } = self.create_source_inner(session, subsources).await?;
4011
4012 ops.extend(new_ops.into_iter());
4013
4014 assert!(
4015 if_not_exists_ids.is_empty(),
4016 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4017 );
4018
4019 self.catalog_transact(Some(session), ops).await?;
4020 }
4021 plan::AlterSourceAction::RefreshReferences { references } => {
4022 self.catalog_transact(
4023 Some(session),
4024 vec![catalog::Op::UpdateSourceReferences {
4025 source_id: item_id,
4026 references: references.into(),
4027 }],
4028 )
4029 .await?;
4030 }
4031 }
4032
4033 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4034 }
4035
4036 #[instrument]
4037 pub(super) async fn sequence_alter_system_set(
4038 &mut self,
4039 session: &Session,
4040 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4041 ) -> Result<ExecuteResponse, AdapterError> {
4042 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4043 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4045 self.validate_alter_system_network_policy(session, &value)?;
4046 }
4047
4048 let op = match value {
4049 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4050 name: name.clone(),
4051 value: OwnedVarInput::SqlSet(values),
4052 },
4053 plan::VariableValue::Default => {
4054 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4055 }
4056 };
4057 self.catalog_transact(Some(session), vec![op]).await?;
4058
4059 session.add_notice(AdapterNotice::VarDefaultUpdated {
4060 role: None,
4061 var_name: Some(name),
4062 });
4063 Ok(ExecuteResponse::AlteredSystemConfiguration)
4064 }
4065
4066 #[instrument]
4067 pub(super) async fn sequence_alter_system_reset(
4068 &mut self,
4069 session: &Session,
4070 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4071 ) -> Result<ExecuteResponse, AdapterError> {
4072 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4073 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4074 self.catalog_transact(Some(session), vec![op]).await?;
4075 session.add_notice(AdapterNotice::VarDefaultUpdated {
4076 role: None,
4077 var_name: Some(name),
4078 });
4079 Ok(ExecuteResponse::AlteredSystemConfiguration)
4080 }
4081
4082 #[instrument]
4083 pub(super) async fn sequence_alter_system_reset_all(
4084 &mut self,
4085 session: &Session,
4086 _: plan::AlterSystemResetAllPlan,
4087 ) -> Result<ExecuteResponse, AdapterError> {
4088 self.is_user_allowed_to_alter_system(session, None)?;
4089 let op = catalog::Op::ResetAllSystemConfiguration;
4090 self.catalog_transact(Some(session), vec![op]).await?;
4091 session.add_notice(AdapterNotice::VarDefaultUpdated {
4092 role: None,
4093 var_name: None,
4094 });
4095 Ok(ExecuteResponse::AlteredSystemConfiguration)
4096 }
4097
4098 fn is_user_allowed_to_alter_system(
4100 &self,
4101 session: &Session,
4102 var_name: Option<&str>,
4103 ) -> Result<(), AdapterError> {
4104 match (session.user().kind(), var_name) {
4105 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4107 (UserKind::Superuser, Some(name))
4109 if session.user().is_internal()
4110 || self.catalog().system_config().user_modifiable(name) =>
4111 {
4112 let var = self.catalog().system_config().get(name)?;
4115 var.visible(session.user(), self.catalog().system_config())?;
4116 Ok(())
4117 }
4118 (UserKind::Regular, Some(name))
4121 if self.catalog().system_config().user_modifiable(name) =>
4122 {
4123 Err(AdapterError::Unauthorized(
4124 rbac::UnauthorizedError::Superuser {
4125 action: format!("toggle the '{name}' system configuration parameter"),
4126 },
4127 ))
4128 }
4129 _ => Err(AdapterError::Unauthorized(
4130 rbac::UnauthorizedError::MzSystem {
4131 action: "alter system".into(),
4132 },
4133 )),
4134 }
4135 }
4136
4137 fn validate_alter_system_network_policy(
4138 &self,
4139 session: &Session,
4140 policy_value: &plan::VariableValue,
4141 ) -> Result<(), AdapterError> {
4142 let policy_name = match &policy_value {
4143 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4145 plan::VariableValue::Values(values) if values.len() == 1 => {
4146 values.iter().next().cloned()
4147 }
4148 plan::VariableValue::Values(values) => {
4149 tracing::warn!(?values, "can't set multiple network policies at once");
4150 None
4151 }
4152 };
4153 let maybe_network_policy = policy_name
4154 .as_ref()
4155 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4156 let Some(network_policy) = maybe_network_policy else {
4157 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4158 VarError::InvalidParameterValue {
4159 name: NETWORK_POLICY.name(),
4160 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4161 reason: "no network policy with such name exists".to_string(),
4162 },
4163 )));
4164 };
4165 self.validate_alter_network_policy(session, &network_policy.rules)
4166 }
4167
4168 fn validate_alter_network_policy(
4173 &self,
4174 session: &Session,
4175 policy_rules: &Vec<NetworkPolicyRule>,
4176 ) -> Result<(), AdapterError> {
4177 if session.user().is_internal() {
4180 return Ok(());
4181 }
4182 if let Some(ip) = session.meta().client_ip() {
4183 validate_ip_with_policy_rules(ip, policy_rules)
4184 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4185 } else {
4186 return Err(AdapterError::NetworkPolicyDenied(
4189 NetworkPolicyError::MissingIp,
4190 ));
4191 }
4192 Ok(())
4193 }
4194
4195 #[instrument]
4197 pub(super) fn sequence_execute(
4198 &self,
4199 session: &mut Session,
4200 plan: plan::ExecutePlan,
4201 ) -> Result<String, AdapterError> {
4202 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4204 let ps = session
4205 .get_prepared_statement_unverified(&plan.name)
4206 .expect("known to exist");
4207 let stmt = ps.stmt().cloned();
4208 let desc = ps.desc().clone();
4209 let state_revision = ps.state_revision;
4210 let logging = Arc::clone(ps.logging());
4211 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4212 }
4213
4214 #[instrument]
4215 pub(super) async fn sequence_grant_privileges(
4216 &mut self,
4217 session: &Session,
4218 plan::GrantPrivilegesPlan {
4219 update_privileges,
4220 grantees,
4221 }: plan::GrantPrivilegesPlan,
4222 ) -> Result<ExecuteResponse, AdapterError> {
4223 self.sequence_update_privileges(
4224 session,
4225 update_privileges,
4226 grantees,
4227 UpdatePrivilegeVariant::Grant,
4228 )
4229 .await
4230 }
4231
4232 #[instrument]
4233 pub(super) async fn sequence_revoke_privileges(
4234 &mut self,
4235 session: &Session,
4236 plan::RevokePrivilegesPlan {
4237 update_privileges,
4238 revokees,
4239 }: plan::RevokePrivilegesPlan,
4240 ) -> Result<ExecuteResponse, AdapterError> {
4241 self.sequence_update_privileges(
4242 session,
4243 update_privileges,
4244 revokees,
4245 UpdatePrivilegeVariant::Revoke,
4246 )
4247 .await
4248 }
4249
4250 #[instrument]
4251 async fn sequence_update_privileges(
4252 &mut self,
4253 session: &Session,
4254 update_privileges: Vec<UpdatePrivilege>,
4255 grantees: Vec<RoleId>,
4256 variant: UpdatePrivilegeVariant,
4257 ) -> Result<ExecuteResponse, AdapterError> {
4258 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4259 let mut warnings = Vec::new();
4260 let catalog = self.catalog().for_session(session);
4261
4262 for UpdatePrivilege {
4263 acl_mode,
4264 target_id,
4265 grantor,
4266 } in update_privileges
4267 {
4268 let actual_object_type = catalog.get_system_object_type(&target_id);
4269 if actual_object_type.is_relation() {
4272 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4273 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4274 if !non_applicable_privileges.is_empty() {
4275 let object_description =
4276 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4277 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4278 non_applicable_privileges,
4279 object_description,
4280 })
4281 }
4282 }
4283
4284 if let SystemObjectId::Object(object_id) = &target_id {
4285 self.catalog()
4286 .ensure_not_reserved_object(object_id, session.conn_id())?;
4287 }
4288
4289 let privileges = self
4290 .catalog()
4291 .get_privileges(&target_id, session.conn_id())
4292 .ok_or(AdapterError::Unsupported(
4295 "GRANTs/REVOKEs on an object type with no privileges",
4296 ))?;
4297
4298 for grantee in &grantees {
4299 self.catalog().ensure_not_system_role(grantee)?;
4300 self.catalog().ensure_not_predefined_role(grantee)?;
4301 let existing_privilege = privileges
4302 .get_acl_item(grantee, &grantor)
4303 .map(Cow::Borrowed)
4304 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4305
4306 match variant {
4307 UpdatePrivilegeVariant::Grant
4308 if !existing_privilege.acl_mode.contains(acl_mode) =>
4309 {
4310 ops.push(catalog::Op::UpdatePrivilege {
4311 target_id: target_id.clone(),
4312 privilege: MzAclItem {
4313 grantee: *grantee,
4314 grantor,
4315 acl_mode,
4316 },
4317 variant,
4318 });
4319 }
4320 UpdatePrivilegeVariant::Revoke
4321 if !existing_privilege
4322 .acl_mode
4323 .intersection(acl_mode)
4324 .is_empty() =>
4325 {
4326 ops.push(catalog::Op::UpdatePrivilege {
4327 target_id: target_id.clone(),
4328 privilege: MzAclItem {
4329 grantee: *grantee,
4330 grantor,
4331 acl_mode,
4332 },
4333 variant,
4334 });
4335 }
4336 _ => {}
4338 }
4339 }
4340 }
4341
4342 if ops.is_empty() {
4343 session.add_notices(warnings);
4344 return Ok(variant.into());
4345 }
4346
4347 let res = self
4348 .catalog_transact(Some(session), ops)
4349 .await
4350 .map(|_| match variant {
4351 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4352 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4353 });
4354 if res.is_ok() {
4355 session.add_notices(warnings);
4356 }
4357 res
4358 }
4359
4360 #[instrument]
4361 pub(super) async fn sequence_alter_default_privileges(
4362 &mut self,
4363 session: &Session,
4364 plan::AlterDefaultPrivilegesPlan {
4365 privilege_objects,
4366 privilege_acl_items,
4367 is_grant,
4368 }: plan::AlterDefaultPrivilegesPlan,
4369 ) -> Result<ExecuteResponse, AdapterError> {
4370 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4371 let variant = if is_grant {
4372 UpdatePrivilegeVariant::Grant
4373 } else {
4374 UpdatePrivilegeVariant::Revoke
4375 };
4376 for privilege_object in &privilege_objects {
4377 self.catalog()
4378 .ensure_not_system_role(&privilege_object.role_id)?;
4379 self.catalog()
4380 .ensure_not_predefined_role(&privilege_object.role_id)?;
4381 if let Some(database_id) = privilege_object.database_id {
4382 self.catalog()
4383 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4384 }
4385 if let Some(schema_id) = privilege_object.schema_id {
4386 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4387 let schema_spec: SchemaSpecifier = schema_id.into();
4388
4389 self.catalog().ensure_not_reserved_object(
4390 &(database_spec, schema_spec).into(),
4391 session.conn_id(),
4392 )?;
4393 }
4394 for privilege_acl_item in &privilege_acl_items {
4395 self.catalog()
4396 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4397 self.catalog()
4398 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4399 ops.push(catalog::Op::UpdateDefaultPrivilege {
4400 privilege_object: privilege_object.clone(),
4401 privilege_acl_item: privilege_acl_item.clone(),
4402 variant,
4403 })
4404 }
4405 }
4406
4407 self.catalog_transact(Some(session), ops).await?;
4408 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4409 }
4410
4411 #[instrument]
4412 pub(super) async fn sequence_grant_role(
4413 &mut self,
4414 session: &Session,
4415 plan::GrantRolePlan {
4416 role_ids,
4417 member_ids,
4418 grantor_id,
4419 }: plan::GrantRolePlan,
4420 ) -> Result<ExecuteResponse, AdapterError> {
4421 let catalog = self.catalog();
4422 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4423 for role_id in role_ids {
4424 for member_id in &member_ids {
4425 let member_membership: BTreeSet<_> =
4426 catalog.get_role(member_id).membership().keys().collect();
4427 if member_membership.contains(&role_id) {
4428 let role_name = catalog.get_role(&role_id).name().to_string();
4429 let member_name = catalog.get_role(member_id).name().to_string();
4430 catalog.ensure_not_reserved_role(member_id)?;
4432 catalog.ensure_grantable_role(&role_id)?;
4433 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4434 role_name,
4435 member_name,
4436 });
4437 } else {
4438 ops.push(catalog::Op::GrantRole {
4439 role_id,
4440 member_id: *member_id,
4441 grantor_id,
4442 });
4443 }
4444 }
4445 }
4446
4447 if ops.is_empty() {
4448 return Ok(ExecuteResponse::GrantedRole);
4449 }
4450
4451 self.catalog_transact(Some(session), ops)
4452 .await
4453 .map(|_| ExecuteResponse::GrantedRole)
4454 }
4455
4456 #[instrument]
4457 pub(super) async fn sequence_revoke_role(
4458 &mut self,
4459 session: &Session,
4460 plan::RevokeRolePlan {
4461 role_ids,
4462 member_ids,
4463 grantor_id,
4464 }: plan::RevokeRolePlan,
4465 ) -> Result<ExecuteResponse, AdapterError> {
4466 let catalog = self.catalog();
4467 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4468 for role_id in role_ids {
4469 for member_id in &member_ids {
4470 let member_membership: BTreeSet<_> =
4471 catalog.get_role(member_id).membership().keys().collect();
4472 if !member_membership.contains(&role_id) {
4473 let role_name = catalog.get_role(&role_id).name().to_string();
4474 let member_name = catalog.get_role(member_id).name().to_string();
4475 catalog.ensure_not_reserved_role(member_id)?;
4477 catalog.ensure_grantable_role(&role_id)?;
4478 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4479 role_name,
4480 member_name,
4481 });
4482 } else {
4483 ops.push(catalog::Op::RevokeRole {
4484 role_id,
4485 member_id: *member_id,
4486 grantor_id,
4487 });
4488 }
4489 }
4490 }
4491
4492 if ops.is_empty() {
4493 return Ok(ExecuteResponse::RevokedRole);
4494 }
4495
4496 self.catalog_transact(Some(session), ops)
4497 .await
4498 .map(|_| ExecuteResponse::RevokedRole)
4499 }
4500
4501 #[instrument]
4502 pub(super) async fn sequence_alter_owner(
4503 &mut self,
4504 session: &Session,
4505 plan::AlterOwnerPlan {
4506 id,
4507 object_type,
4508 new_owner,
4509 }: plan::AlterOwnerPlan,
4510 ) -> Result<ExecuteResponse, AdapterError> {
4511 let mut ops = vec![catalog::Op::UpdateOwner {
4512 id: id.clone(),
4513 new_owner,
4514 }];
4515
4516 match &id {
4517 ObjectId::Item(global_id) => {
4518 let entry = self.catalog().get_entry(global_id);
4519
4520 if entry.is_index() {
4522 let name = self
4523 .catalog()
4524 .resolve_full_name(entry.name(), Some(session.conn_id()))
4525 .to_string();
4526 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4527 return Ok(ExecuteResponse::AlteredObject(object_type));
4528 }
4529
4530 let dependent_index_ops = entry
4532 .used_by()
4533 .into_iter()
4534 .filter(|id| self.catalog().get_entry(id).is_index())
4535 .map(|id| catalog::Op::UpdateOwner {
4536 id: ObjectId::Item(*id),
4537 new_owner,
4538 });
4539 ops.extend(dependent_index_ops);
4540
4541 let dependent_subsources =
4543 entry
4544 .progress_id()
4545 .into_iter()
4546 .map(|item_id| catalog::Op::UpdateOwner {
4547 id: ObjectId::Item(item_id),
4548 new_owner,
4549 });
4550 ops.extend(dependent_subsources);
4551 }
4552 ObjectId::Cluster(cluster_id) => {
4553 let cluster = self.catalog().get_cluster(*cluster_id);
4554 let managed_cluster_replica_ops =
4556 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4557 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4558 new_owner,
4559 });
4560 ops.extend(managed_cluster_replica_ops);
4561 }
4562 _ => {}
4563 }
4564
4565 self.catalog_transact(Some(session), ops)
4566 .await
4567 .map(|_| ExecuteResponse::AlteredObject(object_type))
4568 }
4569
4570 #[instrument]
4571 pub(super) async fn sequence_reassign_owned(
4572 &mut self,
4573 session: &Session,
4574 plan::ReassignOwnedPlan {
4575 old_roles,
4576 new_role,
4577 reassign_ids,
4578 }: plan::ReassignOwnedPlan,
4579 ) -> Result<ExecuteResponse, AdapterError> {
4580 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4581 self.catalog().ensure_not_reserved_role(role_id)?;
4582 }
4583
4584 let ops = reassign_ids
4585 .into_iter()
4586 .map(|id| catalog::Op::UpdateOwner {
4587 id,
4588 new_owner: new_role,
4589 })
4590 .collect();
4591
4592 self.catalog_transact(Some(session), ops)
4593 .await
4594 .map(|_| ExecuteResponse::ReassignOwned)
4595 }
4596
4597 #[instrument]
4598 pub(crate) async fn handle_deferred_statement(&mut self) {
4599 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4603 return;
4604 };
4605 match ps {
4606 crate::coord::PlanStatement::Statement { stmt, params } => {
4607 self.handle_execute_inner(stmt, params, ctx).await;
4608 }
4609 crate::coord::PlanStatement::Plan {
4610 plan,
4611 resolved_ids,
4612 sql_impl_resolved_ids,
4613 } => {
4614 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_resolved_ids)
4615 .await;
4616 }
4617 }
4618 }
4619
4620 #[instrument]
4621 #[allow(clippy::unused_async)]
4623 pub(super) async fn sequence_alter_table(
4624 &mut self,
4625 ctx: &mut ExecuteContext,
4626 plan: plan::AlterTablePlan,
4627 ) -> Result<ExecuteResponse, AdapterError> {
4628 let plan::AlterTablePlan {
4629 relation_id,
4630 column_name,
4631 column_type,
4632 raw_sql_type,
4633 } = plan;
4634
4635 let (_, new_global_id) = self.allocate_user_id().await?;
4637 let ops = vec![catalog::Op::AlterAddColumn {
4638 id: relation_id,
4639 new_global_id,
4640 name: column_name,
4641 typ: column_type,
4642 sql: raw_sql_type,
4643 }];
4644
4645 self.catalog_transact_with_context(None, Some(ctx), ops)
4646 .await?;
4647
4648 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4649 }
4650
4651 #[instrument]
4653 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4654 &mut self,
4655 ctx: ExecuteContext,
4656 plan: AlterMaterializedViewApplyReplacementPlan,
4657 ) {
4658 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4668
4669 let plan_validity = PlanValidity::new(
4670 self.catalog().transient_revision(),
4671 BTreeSet::from_iter([id, replacement_id]),
4672 None,
4673 None,
4674 ctx.session().role_metadata().clone(),
4675 );
4676
4677 let target = self.catalog.get_entry(&id);
4678 let target_gid = target.latest_global_id();
4679
4680 let replacement = self.catalog.get_entry(&replacement_id);
4681 let replacement_gid = replacement.latest_global_id();
4682
4683 let target_upper = self
4684 .controller
4685 .storage_collections
4686 .collection_frontiers(target_gid)
4687 .expect("target MV exists")
4688 .write_frontier;
4689 let replacement_upper = self
4690 .controller
4691 .compute
4692 .collection_frontiers(replacement_gid, replacement.cluster_id())
4693 .expect("replacement MV exists")
4694 .write_frontier;
4695
4696 info!(
4697 %id, %replacement_id, ?target_upper, ?replacement_upper,
4698 "preparing materialized view replacement application",
4699 );
4700
4701 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4702 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4711 name: target.name().item.clone(),
4712 }));
4713 return;
4714 };
4715
4716 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4720
4721 self.install_storage_watch_set(
4725 ctx.session().conn_id().clone(),
4726 BTreeSet::from_iter([target_gid]),
4727 replacement_upper_ts,
4728 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4729 ctx: Some(ctx),
4730 otel_ctx: OpenTelemetryContext::obtain(),
4731 plan,
4732 plan_validity,
4733 }),
4734 )
4735 .expect("target collection exists");
4736 }
4737
4738 #[instrument]
4740 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4741 &mut self,
4742 mut ctx: AlterMaterializedViewReadyContext,
4743 ) {
4744 ctx.otel_ctx.attach_as_parent();
4745
4746 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4747
4748 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4753 ctx.retire(Err(err));
4754 return;
4755 }
4756
4757 info!(
4758 %id, %replacement_id,
4759 "finishing materialized view replacement application",
4760 );
4761
4762 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4763 match self
4764 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4765 .await
4766 {
4767 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4768 ObjectType::MaterializedView,
4769 ))),
4770 Err(err) => ctx.retire(Err(err)),
4771 }
4772 }
4773
4774 pub(super) async fn statistics_oracle(
4775 &self,
4776 session: &Session,
4777 source_ids: &BTreeSet<GlobalId>,
4778 query_as_of: &Antichain<Timestamp>,
4779 is_oneshot: bool,
4780 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4781 super::statistics_oracle(
4782 session,
4783 source_ids,
4784 query_as_of,
4785 is_oneshot,
4786 self.catalog().system_config(),
4787 self.controller.storage_collections.as_ref(),
4788 )
4789 .await
4790 }
4791}
4792
4793impl Coordinator {
4794 pub(crate) fn emit_raw_optimizer_notices_to_user(
4802 &self,
4803 ctx: &ExecuteContext,
4804 notices: &[RawOptimizerNotice],
4805 ) {
4806 emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4807 }
4808
4809 async fn persist_dataflow_metainfo(
4819 &mut self,
4820 df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4821 export_id: GlobalId,
4822 ) -> Option<BuiltinTableAppendNotify> {
4823 if self.catalog().state().system_config().enable_mz_notices()
4826 && !df_meta.optimizer_notices.is_empty()
4827 {
4828 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4829 self.catalog().state().pack_optimizer_notices(
4830 &mut builtin_table_updates,
4831 df_meta.optimizer_notices.iter(),
4832 Diff::ONE,
4833 );
4834
4835 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4837
4838 Some(
4839 self.builtin_table_update()
4840 .execute(builtin_table_updates)
4841 .await
4842 .0,
4843 )
4844 } else {
4845 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4847
4848 None
4849 }
4850 }
4851}