1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43 RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48 SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58 SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62 StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106 WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114 WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = return_if_err!(res, ctx);
255 f(ctx, next);
256 }
257 _ = rx, if cancel_enabled => {
258 ctx.retire(Err(AdapterError::Canceled));
259 }
260 }
261 });
262 }
263
264 async fn create_source_inner(
265 &self,
266 session: &Session,
267 plans: Vec<plan::CreateSourcePlanBundle>,
268 ) -> Result<CreateSourceInner, AdapterError> {
269 let mut ops = vec![];
270 let mut sources = vec![];
271
272 let if_not_exists_ids = plans
273 .iter()
274 .filter_map(
275 |plan::CreateSourcePlanBundle {
276 item_id,
277 global_id: _,
278 plan,
279 resolved_ids: _,
280 available_source_references: _,
281 }| {
282 if plan.if_not_exists {
283 Some((*item_id, plan.name.clone()))
284 } else {
285 None
286 }
287 },
288 )
289 .collect::<BTreeMap<_, _>>();
290
291 for plan::CreateSourcePlanBundle {
292 item_id,
293 global_id,
294 mut plan,
295 resolved_ids,
296 available_source_references,
297 } in plans
298 {
299 let name = plan.name.clone();
300
301 match plan.source.data_source {
302 plan::DataSourceDesc::Ingestion(ref desc)
303 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304 let cluster_id = plan
305 .in_cluster
306 .expect("ingestion plans must specify cluster");
307 match desc.connection {
308 GenericSourceConnection::Postgres(_)
309 | GenericSourceConnection::MySql(_)
310 | GenericSourceConnection::SqlServer(_)
311 | GenericSourceConnection::Kafka(_)
312 | GenericSourceConnection::LoadGenerator(_) => {
313 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315 .get(self.catalog().system_config().dyncfgs());
316
317 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318 {
319 return Err(AdapterError::Unsupported(
320 "sources in clusters with >1 replicas",
321 ));
322 }
323 }
324 }
325 }
326 }
327 plan::DataSourceDesc::Webhook { .. } => {
328 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331 .get(self.catalog().system_config().dyncfgs());
332
333 if !enable_multi_replica_sources {
334 if cluster.replica_ids().len() > 1 {
335 return Err(AdapterError::Unsupported(
336 "webhook sources in clusters with >1 replicas",
337 ));
338 }
339 }
340 }
341 }
342 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343 }
344
345 if let mz_sql::plan::DataSourceDesc::Webhook {
347 validate_using: Some(validate),
348 ..
349 } = &mut plan.source.data_source
350 {
351 if let Err(reason) = validate.reduce_expression().await {
352 self.metrics
353 .webhook_validation_reduce_failures
354 .with_label_values(&[reason])
355 .inc();
356 return Err(AdapterError::Internal(format!(
357 "failed to reduce check expression, {reason}"
358 )));
359 }
360 }
361
362 let mut reference_ops = vec![];
365 if let Some(references) = &available_source_references {
366 reference_ops.push(catalog::Op::UpdateSourceReferences {
367 source_id: item_id,
368 references: references.clone().into(),
369 });
370 }
371
372 let source = Source::new(plan, global_id, resolved_ids, None, false);
373 ops.push(catalog::Op::CreateItem {
374 id: item_id,
375 name,
376 item: CatalogItem::Source(source.clone()),
377 owner_id: *session.current_role_id(),
378 });
379 sources.push((item_id, source));
380 ops.extend(reference_ops);
382 }
383
384 Ok(CreateSourceInner {
385 ops,
386 sources,
387 if_not_exists_ids,
388 })
389 }
390
391 pub(crate) fn plan_subsource(
399 &self,
400 session: &Session,
401 params: &mz_sql::plan::Params,
402 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403 item_id: CatalogItemId,
404 global_id: GlobalId,
405 ) -> Result<CreateSourcePlanBundle, AdapterError> {
406 let catalog = self.catalog().for_session(session);
407 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409 let plan = self.plan_statement(
410 session,
411 Statement::CreateSubsource(subsource_stmt),
412 params,
413 &resolved_ids,
414 )?;
415 let plan = match plan {
416 Plan::CreateSource(plan) => plan,
417 _ => unreachable!(),
418 };
419 Ok(CreateSourcePlanBundle {
420 item_id,
421 global_id,
422 plan,
423 resolved_ids,
424 available_source_references: None,
425 })
426 }
427
428 pub(crate) async fn plan_purified_alter_source_add_subsource(
430 &mut self,
431 session: &Session,
432 params: Params,
433 source_name: ResolvedItemName,
434 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436 ) -> Result<(Plan, ResolvedIds), AdapterError> {
437 let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439 let conn_catalog = self.catalog().for_system_session();
441 let pcx = plan::PlanContext::zero();
442 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444 let entry = self.catalog().get_entry(source_name.item_id());
445 let source = entry.source().ok_or_else(|| {
446 AdapterError::internal(
447 "plan alter source",
448 format!("expected Source found {entry:?}"),
449 )
450 })?;
451
452 let item_id = entry.id();
453 let ingestion_id = source.global_id();
454 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456 let id_ts = self.get_catalog_write_ts().await;
457 let ids = self
458 .catalog()
459 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
460 .await?;
461 for (subsource_stmt, (item_id, global_id)) in
462 subsource_stmts.into_iter().zip_eq(ids.into_iter())
463 {
464 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
465 subsource_plans.push(s);
466 }
467
468 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
469 subsources: subsource_plans,
470 options,
471 };
472
473 Ok((
474 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
475 item_id,
476 ingestion_id,
477 action,
478 }),
479 ResolvedIds::empty(),
480 ))
481 }
482
483 pub(crate) fn plan_purified_alter_source_refresh_references(
485 &self,
486 _session: &Session,
487 _params: Params,
488 source_name: ResolvedItemName,
489 available_source_references: plan::SourceReferences,
490 ) -> Result<(Plan, ResolvedIds), AdapterError> {
491 let entry = self.catalog().get_entry(source_name.item_id());
492 let source = entry.source().ok_or_else(|| {
493 AdapterError::internal(
494 "plan alter source",
495 format!("expected Source found {entry:?}"),
496 )
497 })?;
498 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
499 references: available_source_references,
500 };
501
502 Ok((
503 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
504 item_id: entry.id(),
505 ingestion_id: source.global_id(),
506 action,
507 }),
508 ResolvedIds::empty(),
509 ))
510 }
511
512 pub(crate) async fn plan_purified_create_source(
516 &mut self,
517 ctx: &ExecuteContext,
518 params: Params,
519 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
520 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
521 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
522 available_source_references: plan::SourceReferences,
523 ) -> Result<(Plan, ResolvedIds), AdapterError> {
524 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
525
526 if let Some(progress_stmt) = progress_stmt {
528 assert_none!(progress_stmt.of_source);
533 let id_ts = self.get_catalog_write_ts().await;
534 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
535 let progress_plan =
536 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
537 let progress_full_name = self
538 .catalog()
539 .resolve_full_name(&progress_plan.plan.name, None);
540 let progress_subsource = ResolvedItemName::Item {
541 id: progress_plan.item_id,
542 qualifiers: progress_plan.plan.name.qualifiers.clone(),
543 full_name: progress_full_name,
544 print_id: true,
545 version: RelationVersionSelector::Latest,
546 };
547
548 create_source_plans.push(progress_plan);
549
550 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
551 }
552
553 let catalog = self.catalog().for_session(ctx.session());
554 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
555
556 let propagated_with_options: Vec<_> = source_stmt
557 .with_options
558 .iter()
559 .filter_map(|opt| match opt.name {
560 CreateSourceOptionName::TimestampInterval => None,
561 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
562 name: CreateSubsourceOptionName::RetainHistory,
563 value: opt.value.clone(),
564 }),
565 })
566 .collect();
567
568 let source_plan = match self.plan_statement(
570 ctx.session(),
571 Statement::CreateSource(source_stmt),
572 ¶ms,
573 &resolved_ids,
574 )? {
575 Plan::CreateSource(plan) => plan,
576 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
577 };
578
579 let id_ts = self.get_catalog_write_ts().await;
580 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
581
582 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
583 let of_source = ResolvedItemName::Item {
584 id: item_id,
585 qualifiers: source_plan.name.qualifiers.clone(),
586 full_name: source_full_name,
587 print_id: true,
588 version: RelationVersionSelector::Latest,
589 };
590
591 let conn_catalog = self.catalog().for_system_session();
593 let pcx = plan::PlanContext::zero();
594 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
595
596 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
597
598 for subsource_stmt in subsource_stmts.iter_mut() {
599 subsource_stmt
600 .with_options
601 .extend(propagated_with_options.iter().cloned())
602 }
603
604 create_source_plans.push(CreateSourcePlanBundle {
605 item_id,
606 global_id,
607 plan: source_plan,
608 resolved_ids: resolved_ids.clone(),
609 available_source_references: Some(available_source_references),
610 });
611
612 let id_ts = self.get_catalog_write_ts().await;
614 let ids = self
615 .catalog()
616 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
617 .await?;
618 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
619 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
620 create_source_plans.push(plan);
621 }
622
623 Ok((
624 Plan::CreateSources(create_source_plans),
625 ResolvedIds::empty(),
626 ))
627 }
628
629 #[instrument]
630 pub(super) async fn sequence_create_source(
631 &mut self,
632 ctx: &mut ExecuteContext,
633 plans: Vec<plan::CreateSourcePlanBundle>,
634 ) -> Result<ExecuteResponse, AdapterError> {
635 let CreateSourceInner {
636 ops,
637 sources,
638 if_not_exists_ids,
639 } = self.create_source_inner(ctx.session(), plans).await?;
640
641 let transact_result = self
642 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
643 .await;
644
645 for (item_id, source) in &sources {
647 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
648 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
649 ctx.session()
650 .add_notice(AdapterNotice::WebhookSourceCreated { url });
651 }
652 }
653 }
654
655 match transact_result {
656 Ok(()) => Ok(ExecuteResponse::CreatedSource),
657 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
658 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
659 })) if if_not_exists_ids.contains_key(&id) => {
660 ctx.session()
661 .add_notice(AdapterNotice::ObjectAlreadyExists {
662 name: if_not_exists_ids[&id].item.clone(),
663 ty: "source",
664 });
665 Ok(ExecuteResponse::CreatedSource)
666 }
667 Err(err) => Err(err),
668 }
669 }
670
671 #[instrument]
672 pub(super) async fn sequence_create_connection(
673 &mut self,
674 mut ctx: ExecuteContext,
675 plan: plan::CreateConnectionPlan,
676 resolved_ids: ResolvedIds,
677 ) {
678 let id_ts = self.get_catalog_write_ts().await;
679 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
680 Ok(item_id) => item_id,
681 Err(err) => return ctx.retire(Err(err.into())),
682 };
683
684 match &plan.connection.details {
685 ConnectionDetails::Ssh { key_1, key_2, .. } => {
686 let key_1 = match key_1.as_key_pair() {
687 Some(key_1) => key_1.clone(),
688 None => {
689 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
690 "the PUBLIC KEY 1 option cannot be explicitly specified"
691 ))));
692 }
693 };
694
695 let key_2 = match key_2.as_key_pair() {
696 Some(key_2) => key_2.clone(),
697 None => {
698 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
699 "the PUBLIC KEY 2 option cannot be explicitly specified"
700 ))));
701 }
702 };
703
704 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
705 let secret = key_set.to_bytes();
706 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
707 return ctx.retire(Err(err.into()));
708 }
709 }
710 _ => (),
711 };
712
713 if plan.validate {
714 let internal_cmd_tx = self.internal_cmd_tx.clone();
715 let transient_revision = self.catalog().transient_revision();
716 let conn_id = ctx.session().conn_id().clone();
717 let otel_ctx = OpenTelemetryContext::obtain();
718 let role_metadata = ctx.session().role_metadata().clone();
719
720 let connection = plan
721 .connection
722 .details
723 .to_connection()
724 .into_inline_connection(self.catalog().state());
725
726 let current_storage_parameters = self.controller.storage.config().clone();
727 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
728 let result = match connection
729 .validate(connection_id, ¤t_storage_parameters)
730 .await
731 {
732 Ok(()) => Ok(plan),
733 Err(err) => Err(err.into()),
734 };
735
736 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
738 CreateConnectionValidationReady {
739 ctx,
740 result,
741 connection_id,
742 connection_gid,
743 plan_validity: PlanValidity::new(
744 transient_revision,
745 resolved_ids.items().copied().collect(),
746 None,
747 None,
748 role_metadata,
749 ),
750 otel_ctx,
751 resolved_ids: resolved_ids.clone(),
752 },
753 ));
754 if let Err(e) = result {
755 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
756 }
757 });
758 } else {
759 let result = self
760 .sequence_create_connection_stage_finish(
761 &mut ctx,
762 connection_id,
763 connection_gid,
764 plan,
765 resolved_ids,
766 )
767 .await;
768 ctx.retire(result);
769 }
770 }
771
772 #[instrument]
773 pub(crate) async fn sequence_create_connection_stage_finish(
774 &mut self,
775 ctx: &mut ExecuteContext,
776 connection_id: CatalogItemId,
777 connection_gid: GlobalId,
778 plan: plan::CreateConnectionPlan,
779 resolved_ids: ResolvedIds,
780 ) -> Result<ExecuteResponse, AdapterError> {
781 let ops = vec![catalog::Op::CreateItem {
782 id: connection_id,
783 name: plan.name.clone(),
784 item: CatalogItem::Connection(Connection {
785 create_sql: plan.connection.create_sql,
786 global_id: connection_gid,
787 details: plan.connection.details.clone(),
788 resolved_ids,
789 }),
790 owner_id: *ctx.session().current_role_id(),
791 }];
792
793 let conn_id = ctx.session().conn_id().clone();
796 let transact_result = self
797 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
798 .await;
799
800 match transact_result {
801 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
802 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
803 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
804 })) if plan.if_not_exists => {
805 ctx.session()
806 .add_notice(AdapterNotice::ObjectAlreadyExists {
807 name: plan.name.item,
808 ty: "connection",
809 });
810 Ok(ExecuteResponse::CreatedConnection)
811 }
812 Err(err) => Err(err),
813 }
814 }
815
816 #[instrument]
817 pub(super) async fn sequence_create_database(
818 &mut self,
819 session: &Session,
820 plan: plan::CreateDatabasePlan,
821 ) -> Result<ExecuteResponse, AdapterError> {
822 let ops = vec![catalog::Op::CreateDatabase {
823 name: plan.name.clone(),
824 owner_id: *session.current_role_id(),
825 }];
826 match self.catalog_transact(Some(session), ops).await {
827 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
828 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
829 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
830 })) if plan.if_not_exists => {
831 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
832 Ok(ExecuteResponse::CreatedDatabase)
833 }
834 Err(err) => Err(err),
835 }
836 }
837
838 #[instrument]
839 pub(super) async fn sequence_create_schema(
840 &mut self,
841 session: &Session,
842 plan: plan::CreateSchemaPlan,
843 ) -> Result<ExecuteResponse, AdapterError> {
844 let op = catalog::Op::CreateSchema {
845 database_id: plan.database_spec,
846 schema_name: plan.schema_name.clone(),
847 owner_id: *session.current_role_id(),
848 };
849 match self.catalog_transact(Some(session), vec![op]).await {
850 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
851 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
852 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853 })) if plan.if_not_exists => {
854 session.add_notice(AdapterNotice::SchemaAlreadyExists {
855 name: plan.schema_name,
856 });
857 Ok(ExecuteResponse::CreatedSchema)
858 }
859 Err(err) => Err(err),
860 }
861 }
862
863 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866 if attributes.superuser.is_some()
867 || attributes.password.is_some()
868 || attributes.login.is_some()
869 {
870 return Err(AdapterError::UnavailableFeature {
871 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873 });
874 }
875 }
876 Ok(())
877 }
878
879 #[instrument]
880 pub(super) async fn sequence_create_role(
881 &mut self,
882 conn_id: Option<&ConnectionId>,
883 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884 ) -> Result<ExecuteResponse, AdapterError> {
885 self.validate_role_attributes(&attributes.clone())?;
886 let op = catalog::Op::CreateRole { name, attributes };
887 self.catalog_transact_with_context(conn_id, None, vec![op])
888 .await
889 .map(|_| ExecuteResponse::CreatedRole)
890 }
891
892 #[instrument]
893 pub(super) async fn sequence_create_network_policy(
894 &mut self,
895 session: &Session,
896 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897 ) -> Result<ExecuteResponse, AdapterError> {
898 let op = catalog::Op::CreateNetworkPolicy {
899 rules,
900 name,
901 owner_id: *session.current_role_id(),
902 };
903 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904 .await
905 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906 }
907
908 #[instrument]
909 pub(super) async fn sequence_alter_network_policy(
910 &mut self,
911 session: &Session,
912 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913 ) -> Result<ExecuteResponse, AdapterError> {
914 let current_network_policy_name =
916 self.catalog().system_config().default_network_policy_name();
917 if current_network_policy_name == name {
919 self.validate_alter_network_policy(session, &rules)?;
920 }
921
922 let op = catalog::Op::AlterNetworkPolicy {
923 id,
924 rules,
925 name,
926 owner_id: *session.current_role_id(),
927 };
928 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929 .await
930 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931 }
932
933 #[instrument]
934 pub(super) async fn sequence_create_table(
935 &mut self,
936 ctx: &mut ExecuteContext,
937 plan: plan::CreateTablePlan,
938 resolved_ids: ResolvedIds,
939 ) -> Result<ExecuteResponse, AdapterError> {
940 let plan::CreateTablePlan {
941 name,
942 table,
943 if_not_exists,
944 } = plan;
945
946 let conn_id = if table.temporary {
947 Some(ctx.session().conn_id())
948 } else {
949 None
950 };
951 let id_ts = self.get_catalog_write_ts().await;
952 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955 let data_source = match table.data_source {
956 plan::TableDataSource::TableWrites { defaults } => {
957 TableDataSource::TableWrites { defaults }
958 }
959 plan::TableDataSource::DataSource {
960 desc: data_source_plan,
961 timeline,
962 } => match data_source_plan {
963 plan::DataSourceDesc::IngestionExport {
964 ingestion_id,
965 external_reference,
966 details,
967 data_config,
968 } => TableDataSource::DataSource {
969 desc: DataSourceDesc::IngestionExport {
970 ingestion_id,
971 external_reference,
972 details,
973 data_config,
974 },
975 timeline,
976 },
977 plan::DataSourceDesc::Webhook {
978 validate_using,
979 body_format,
980 headers,
981 cluster_id,
982 } => TableDataSource::DataSource {
983 desc: DataSourceDesc::Webhook {
984 validate_using,
985 body_format,
986 headers,
987 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988 },
989 timeline,
990 },
991 o => {
992 unreachable!("CREATE TABLE data source got {:?}", o)
993 }
994 },
995 };
996
997 let is_webhook = if let TableDataSource::DataSource {
998 desc: DataSourceDesc::Webhook { .. },
999 timeline: _,
1000 } = &data_source
1001 {
1002 true
1003 } else {
1004 false
1005 };
1006
1007 let table = Table {
1008 create_sql: Some(table.create_sql),
1009 desc: table.desc,
1010 collections,
1011 conn_id: conn_id.cloned(),
1012 resolved_ids,
1013 custom_logical_compaction_window: table.compaction_window,
1014 is_retained_metrics_object: false,
1015 data_source,
1016 };
1017 let ops = vec![catalog::Op::CreateItem {
1018 id: table_id,
1019 name: name.clone(),
1020 item: CatalogItem::Table(table.clone()),
1021 owner_id: *ctx.session().current_role_id(),
1022 }];
1023
1024 let catalog_result = self
1025 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026 .await;
1027
1028 if is_webhook {
1029 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032 ctx.session()
1033 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034 }
1035 }
1036
1037 match catalog_result {
1038 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1041 })) if if_not_exists => {
1042 ctx.session_mut()
1043 .add_notice(AdapterNotice::ObjectAlreadyExists {
1044 name: name.item,
1045 ty: "table",
1046 });
1047 Ok(ExecuteResponse::CreatedTable)
1048 }
1049 Err(err) => Err(err),
1050 }
1051 }
1052
1053 #[instrument]
1054 pub(super) async fn sequence_create_sink(
1055 &mut self,
1056 ctx: ExecuteContext,
1057 plan: plan::CreateSinkPlan,
1058 resolved_ids: ResolvedIds,
1059 ) {
1060 let plan::CreateSinkPlan {
1061 name,
1062 sink,
1063 with_snapshot,
1064 if_not_exists,
1065 in_cluster,
1066 } = plan;
1067
1068 let id_ts = self.get_catalog_write_ts().await;
1070 let (item_id, global_id) =
1071 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1072
1073 let catalog_sink = Sink {
1074 create_sql: sink.create_sql,
1075 global_id,
1076 from: sink.from,
1077 connection: sink.connection,
1078 envelope: sink.envelope,
1079 version: sink.version,
1080 with_snapshot,
1081 resolved_ids,
1082 cluster_id: in_cluster,
1083 commit_interval: sink.commit_interval,
1084 };
1085
1086 let ops = vec![catalog::Op::CreateItem {
1087 id: item_id,
1088 name: name.clone(),
1089 item: CatalogItem::Sink(catalog_sink.clone()),
1090 owner_id: *ctx.session().current_role_id(),
1091 }];
1092
1093 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1094
1095 match result {
1096 Ok(()) => {}
1097 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1098 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1099 })) if if_not_exists => {
1100 ctx.session()
1101 .add_notice(AdapterNotice::ObjectAlreadyExists {
1102 name: name.item,
1103 ty: "sink",
1104 });
1105 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1106 return;
1107 }
1108 Err(e) => {
1109 ctx.retire(Err(e));
1110 return;
1111 }
1112 };
1113
1114 self.create_storage_export(global_id, &catalog_sink)
1115 .await
1116 .unwrap_or_terminate("cannot fail to create exports");
1117
1118 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1119 .await;
1120
1121 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1122 }
1123
1124 pub(super) fn validate_system_column_references(
1147 &self,
1148 uses_ambiguous_columns: bool,
1149 depends_on: &BTreeSet<GlobalId>,
1150 ) -> Result<(), AdapterError> {
1151 if uses_ambiguous_columns
1152 && depends_on
1153 .iter()
1154 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1155 {
1156 Err(AdapterError::AmbiguousSystemColumnReference)
1157 } else {
1158 Ok(())
1159 }
1160 }
1161
1162 #[instrument]
1163 pub(super) async fn sequence_create_type(
1164 &mut self,
1165 session: &Session,
1166 plan: plan::CreateTypePlan,
1167 resolved_ids: ResolvedIds,
1168 ) -> Result<ExecuteResponse, AdapterError> {
1169 let id_ts = self.get_catalog_write_ts().await;
1170 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1171 plan.typ
1173 .inner
1174 .desc(&self.catalog().for_session(session))
1175 .map_err(AdapterError::from)?;
1176 let typ = Type {
1177 create_sql: Some(plan.typ.create_sql),
1178 global_id,
1179 details: CatalogTypeDetails {
1180 array_id: None,
1181 typ: plan.typ.inner,
1182 pg_metadata: None,
1183 },
1184 resolved_ids,
1185 };
1186 let op = catalog::Op::CreateItem {
1187 id: item_id,
1188 name: plan.name,
1189 item: CatalogItem::Type(typ),
1190 owner_id: *session.current_role_id(),
1191 };
1192 match self.catalog_transact(Some(session), vec![op]).await {
1193 Ok(()) => Ok(ExecuteResponse::CreatedType),
1194 Err(err) => Err(err),
1195 }
1196 }
1197
1198 #[instrument]
1199 pub(super) async fn sequence_comment_on(
1200 &mut self,
1201 session: &Session,
1202 plan: plan::CommentPlan,
1203 ) -> Result<ExecuteResponse, AdapterError> {
1204 let op = catalog::Op::Comment {
1205 object_id: plan.object_id,
1206 sub_component: plan.sub_component,
1207 comment: plan.comment,
1208 };
1209 self.catalog_transact(Some(session), vec![op]).await?;
1210 Ok(ExecuteResponse::Comment)
1211 }
1212
1213 #[instrument]
1214 pub(super) async fn sequence_drop_objects(
1215 &mut self,
1216 ctx: &mut ExecuteContext,
1217 plan::DropObjectsPlan {
1218 drop_ids,
1219 object_type,
1220 referenced_ids,
1221 }: plan::DropObjectsPlan,
1222 ) -> Result<ExecuteResponse, AdapterError> {
1223 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1224 let mut objects = Vec::new();
1225 for obj_id in &drop_ids {
1226 if !referenced_ids_hashset.contains(obj_id) {
1227 let object_info = ErrorMessageObjectDescription::from_id(
1228 obj_id,
1229 &self.catalog().for_session(ctx.session()),
1230 )
1231 .to_string();
1232 objects.push(object_info);
1233 }
1234 }
1235
1236 if !objects.is_empty() {
1237 ctx.session()
1238 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1239 }
1240
1241 let DropOps {
1242 ops,
1243 dropped_active_db,
1244 dropped_active_cluster,
1245 dropped_in_use_indexes,
1246 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1247
1248 self.catalog_transact_with_context(None, Some(ctx), ops)
1249 .await?;
1250
1251 fail::fail_point!("after_sequencer_drop_replica");
1252
1253 if dropped_active_db {
1254 ctx.session()
1255 .add_notice(AdapterNotice::DroppedActiveDatabase {
1256 name: ctx.session().vars().database().to_string(),
1257 });
1258 }
1259 if dropped_active_cluster {
1260 ctx.session()
1261 .add_notice(AdapterNotice::DroppedActiveCluster {
1262 name: ctx.session().vars().cluster().to_string(),
1263 });
1264 }
1265 for dropped_in_use_index in dropped_in_use_indexes {
1266 ctx.session()
1267 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1268 self.metrics
1269 .optimization_notices
1270 .with_label_values(&["DroppedInUseIndex"])
1271 .inc_by(1);
1272 }
1273 Ok(ExecuteResponse::DroppedObject(object_type))
1274 }
1275
1276 fn validate_dropped_role_ownership(
1277 &self,
1278 session: &Session,
1279 dropped_roles: &BTreeMap<RoleId, &str>,
1280 ) -> Result<(), AdapterError> {
1281 fn privilege_check(
1282 privileges: &PrivilegeMap,
1283 dropped_roles: &BTreeMap<RoleId, &str>,
1284 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1285 object_id: &SystemObjectId,
1286 catalog: &ConnCatalog,
1287 ) {
1288 for privilege in privileges.all_values() {
1289 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1290 let grantor_name = catalog.get_role(&privilege.grantor).name();
1291 let object_description =
1292 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1293 dependent_objects
1294 .entry(role_name.to_string())
1295 .or_default()
1296 .push(format!(
1297 "privileges on {object_description} granted by {grantor_name}",
1298 ));
1299 }
1300 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1301 let grantee_name = catalog.get_role(&privilege.grantee).name();
1302 let object_description =
1303 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1304 dependent_objects
1305 .entry(role_name.to_string())
1306 .or_default()
1307 .push(format!(
1308 "privileges granted on {object_description} to {grantee_name}"
1309 ));
1310 }
1311 }
1312 }
1313
1314 let catalog = self.catalog().for_session(session);
1315 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1316 for entry in self.catalog.entries() {
1317 let id = SystemObjectId::Object(entry.id().into());
1318 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1319 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1320 dependent_objects
1321 .entry(role_name.to_string())
1322 .or_default()
1323 .push(format!("owner of {object_description}"));
1324 }
1325 privilege_check(
1326 entry.privileges(),
1327 dropped_roles,
1328 &mut dependent_objects,
1329 &id,
1330 &catalog,
1331 );
1332 }
1333 for database in self.catalog.databases() {
1334 let database_id = SystemObjectId::Object(database.id().into());
1335 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1336 let object_description =
1337 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1338 dependent_objects
1339 .entry(role_name.to_string())
1340 .or_default()
1341 .push(format!("owner of {object_description}"));
1342 }
1343 privilege_check(
1344 &database.privileges,
1345 dropped_roles,
1346 &mut dependent_objects,
1347 &database_id,
1348 &catalog,
1349 );
1350 for schema in database.schemas_by_id.values() {
1351 let schema_id = SystemObjectId::Object(
1352 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1353 );
1354 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1355 let object_description =
1356 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1357 dependent_objects
1358 .entry(role_name.to_string())
1359 .or_default()
1360 .push(format!("owner of {object_description}"));
1361 }
1362 privilege_check(
1363 &schema.privileges,
1364 dropped_roles,
1365 &mut dependent_objects,
1366 &schema_id,
1367 &catalog,
1368 );
1369 }
1370 }
1371 for cluster in self.catalog.clusters() {
1372 let cluster_id = SystemObjectId::Object(cluster.id().into());
1373 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1374 let object_description =
1375 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1376 dependent_objects
1377 .entry(role_name.to_string())
1378 .or_default()
1379 .push(format!("owner of {object_description}"));
1380 }
1381 privilege_check(
1382 &cluster.privileges,
1383 dropped_roles,
1384 &mut dependent_objects,
1385 &cluster_id,
1386 &catalog,
1387 );
1388 for replica in cluster.replicas() {
1389 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1390 let replica_id =
1391 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1392 let object_description =
1393 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1394 dependent_objects
1395 .entry(role_name.to_string())
1396 .or_default()
1397 .push(format!("owner of {object_description}"));
1398 }
1399 }
1400 }
1401 privilege_check(
1402 self.catalog().system_privileges(),
1403 dropped_roles,
1404 &mut dependent_objects,
1405 &SystemObjectId::System,
1406 &catalog,
1407 );
1408 for (default_privilege_object, default_privilege_acl_items) in
1409 self.catalog.default_privileges()
1410 {
1411 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1412 dependent_objects
1413 .entry(role_name.to_string())
1414 .or_default()
1415 .push(format!(
1416 "default privileges on {}S created by {}",
1417 default_privilege_object.object_type, role_name
1418 ));
1419 }
1420 for default_privilege_acl_item in default_privilege_acl_items {
1421 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1422 dependent_objects
1423 .entry(role_name.to_string())
1424 .or_default()
1425 .push(format!(
1426 "default privileges on {}S granted to {}",
1427 default_privilege_object.object_type, role_name
1428 ));
1429 }
1430 }
1431 }
1432
1433 if !dependent_objects.is_empty() {
1434 Err(AdapterError::DependentObject(dependent_objects))
1435 } else {
1436 Ok(())
1437 }
1438 }
1439
1440 #[instrument]
1441 pub(super) async fn sequence_drop_owned(
1442 &mut self,
1443 session: &Session,
1444 plan: plan::DropOwnedPlan,
1445 ) -> Result<ExecuteResponse, AdapterError> {
1446 for role_id in &plan.role_ids {
1447 self.catalog().ensure_not_reserved_role(role_id)?;
1448 }
1449
1450 let mut privilege_revokes = plan.privilege_revokes;
1451
1452 let session_catalog = self.catalog().for_session(session);
1454 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1455 && !session.is_superuser()
1456 {
1457 let role_membership =
1459 session_catalog.collect_role_membership(session.current_role_id());
1460 let invalid_revokes: BTreeSet<_> = privilege_revokes
1461 .extract_if(.., |(_, privilege)| {
1462 !role_membership.contains(&privilege.grantor)
1463 })
1464 .map(|(object_id, _)| object_id)
1465 .collect();
1466 for invalid_revoke in invalid_revokes {
1467 let object_description =
1468 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1469 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1470 }
1471 }
1472
1473 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1474 catalog::Op::UpdatePrivilege {
1475 target_id: object_id,
1476 privilege,
1477 variant: UpdatePrivilegeVariant::Revoke,
1478 }
1479 });
1480 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1481 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1482 privilege_object,
1483 privilege_acl_item,
1484 variant: UpdatePrivilegeVariant::Revoke,
1485 },
1486 );
1487 let DropOps {
1488 ops: drop_ops,
1489 dropped_active_db,
1490 dropped_active_cluster,
1491 dropped_in_use_indexes,
1492 } = self.sequence_drop_common(session, plan.drop_ids)?;
1493
1494 let ops = privilege_revoke_ops
1495 .chain(default_privilege_revoke_ops)
1496 .chain(drop_ops.into_iter())
1497 .collect();
1498
1499 self.catalog_transact(Some(session), ops).await?;
1500
1501 if dropped_active_db {
1502 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1503 name: session.vars().database().to_string(),
1504 });
1505 }
1506 if dropped_active_cluster {
1507 session.add_notice(AdapterNotice::DroppedActiveCluster {
1508 name: session.vars().cluster().to_string(),
1509 });
1510 }
1511 for dropped_in_use_index in dropped_in_use_indexes {
1512 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1513 }
1514 Ok(ExecuteResponse::DroppedOwned)
1515 }
1516
1517 fn sequence_drop_common(
1518 &self,
1519 session: &Session,
1520 ids: Vec<ObjectId>,
1521 ) -> Result<DropOps, AdapterError> {
1522 let mut dropped_active_db = false;
1523 let mut dropped_active_cluster = false;
1524 let mut dropped_in_use_indexes = Vec::new();
1525 let mut dropped_roles = BTreeMap::new();
1526 let mut dropped_databases = BTreeSet::new();
1527 let mut dropped_schemas = BTreeSet::new();
1528 let mut role_revokes = BTreeSet::new();
1532 let mut default_privilege_revokes = BTreeSet::new();
1535
1536 let mut clusters_to_drop = BTreeSet::new();
1538
1539 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1540 for id in &ids {
1541 match id {
1542 ObjectId::Database(id) => {
1543 let name = self.catalog().get_database(id).name();
1544 if name == session.vars().database() {
1545 dropped_active_db = true;
1546 }
1547 dropped_databases.insert(id);
1548 }
1549 ObjectId::Schema((_, spec)) => {
1550 if let SchemaSpecifier::Id(id) = spec {
1551 dropped_schemas.insert(id);
1552 }
1553 }
1554 ObjectId::Cluster(id) => {
1555 clusters_to_drop.insert(*id);
1556 if let Some(active_id) = self
1557 .catalog()
1558 .active_cluster(session)
1559 .ok()
1560 .map(|cluster| cluster.id())
1561 {
1562 if id == &active_id {
1563 dropped_active_cluster = true;
1564 }
1565 }
1566 }
1567 ObjectId::Role(id) => {
1568 let role = self.catalog().get_role(id);
1569 let name = role.name();
1570 dropped_roles.insert(*id, name);
1571 for (group_id, grantor_id) in &role.membership.map {
1573 role_revokes.insert((*group_id, *id, *grantor_id));
1574 }
1575 }
1576 ObjectId::Item(id) => {
1577 if let Some(index) = self.catalog().get_entry(id).index() {
1578 let humanizer = self.catalog().for_session(session);
1579 let dependants = self
1580 .controller
1581 .compute
1582 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1583 .ok()
1584 .into_iter()
1585 .flatten()
1586 .filter(|dependant_id| {
1587 if dependant_id.is_transient() {
1594 return false;
1595 }
1596 let Some(dependent_id) = humanizer
1598 .try_get_item_by_global_id(dependant_id)
1599 .map(|item| item.id())
1600 else {
1601 return false;
1602 };
1603 !ids_set.contains(&ObjectId::Item(dependent_id))
1606 })
1607 .flat_map(|dependant_id| {
1608 humanizer.humanize_id(dependant_id)
1612 })
1613 .collect_vec();
1614 if !dependants.is_empty() {
1615 dropped_in_use_indexes.push(DroppedInUseIndex {
1616 index_name: humanizer
1617 .humanize_id(index.global_id())
1618 .unwrap_or_else(|| id.to_string()),
1619 dependant_objects: dependants,
1620 });
1621 }
1622 }
1623 }
1624 _ => {}
1625 }
1626 }
1627
1628 for id in &ids {
1629 match id {
1630 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1634 if !clusters_to_drop.contains(cluster_id) {
1635 let cluster = self.catalog.get_cluster(*cluster_id);
1636 if cluster.is_managed() {
1637 let replica =
1638 cluster.replica(*replica_id).expect("Catalog out of sync");
1639 if !replica.config.location.internal() {
1640 coord_bail!("cannot drop replica of managed cluster");
1641 }
1642 }
1643 }
1644 }
1645 _ => {}
1646 }
1647 }
1648
1649 for role_id in dropped_roles.keys() {
1650 self.catalog().ensure_not_reserved_role(role_id)?;
1651 }
1652 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1653 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1655 for role in self.catalog().user_roles() {
1656 for dropped_role_id in
1657 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1658 {
1659 role_revokes.insert((
1660 **dropped_role_id,
1661 role.id(),
1662 *role
1663 .membership
1664 .map
1665 .get(*dropped_role_id)
1666 .expect("included in keys above"),
1667 ));
1668 }
1669 }
1670
1671 for (default_privilege_object, default_privilege_acls) in
1672 self.catalog().default_privileges()
1673 {
1674 if matches!(
1675 &default_privilege_object.database_id,
1676 Some(database_id) if dropped_databases.contains(database_id),
1677 ) || matches!(
1678 &default_privilege_object.schema_id,
1679 Some(schema_id) if dropped_schemas.contains(schema_id),
1680 ) {
1681 for default_privilege_acl in default_privilege_acls {
1682 default_privilege_revokes.insert((
1683 default_privilege_object.clone(),
1684 default_privilege_acl.clone(),
1685 ));
1686 }
1687 }
1688 }
1689
1690 let ops = role_revokes
1691 .into_iter()
1692 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1693 role_id,
1694 member_id,
1695 grantor_id,
1696 })
1697 .chain(default_privilege_revokes.into_iter().map(
1698 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1699 privilege_object,
1700 privilege_acl_item,
1701 variant: UpdatePrivilegeVariant::Revoke,
1702 },
1703 ))
1704 .chain(iter::once(catalog::Op::DropObjects(
1705 ids.into_iter()
1706 .map(DropObjectInfo::manual_drop_from_object_id)
1707 .collect(),
1708 )))
1709 .collect();
1710
1711 Ok(DropOps {
1712 ops,
1713 dropped_active_db,
1714 dropped_active_cluster,
1715 dropped_in_use_indexes,
1716 })
1717 }
1718
1719 pub(super) fn sequence_explain_schema(
1720 &self,
1721 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1722 ) -> Result<ExecuteResponse, AdapterError> {
1723 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1724 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1725 })?;
1726
1727 let json_string = json_string(&json_value);
1728 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1729 Ok(Self::send_immediate_rows(row))
1730 }
1731
1732 pub(super) fn sequence_show_all_variables(
1733 &self,
1734 session: &Session,
1735 ) -> Result<ExecuteResponse, AdapterError> {
1736 let mut rows = viewable_variables(self.catalog().state(), session)
1737 .map(|v| (v.name(), v.value(), v.description()))
1738 .collect::<Vec<_>>();
1739 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1740
1741 let rows: Vec<_> = rows
1743 .into_iter()
1744 .map(|(name, val, desc)| {
1745 Row::pack_slice(&[
1746 Datum::String(name),
1747 Datum::String(&val),
1748 Datum::String(desc),
1749 ])
1750 })
1751 .collect();
1752 Ok(Self::send_immediate_rows(rows))
1753 }
1754
1755 pub(super) fn sequence_show_variable(
1756 &self,
1757 session: &Session,
1758 plan: plan::ShowVariablePlan,
1759 ) -> Result<ExecuteResponse, AdapterError> {
1760 if &plan.name == SCHEMA_ALIAS {
1761 let schemas = self.catalog.resolve_search_path(session);
1762 let schema = schemas.first();
1763 return match schema {
1764 Some((database_spec, schema_spec)) => {
1765 let schema_name = &self
1766 .catalog
1767 .get_schema(database_spec, schema_spec, session.conn_id())
1768 .name()
1769 .schema;
1770 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1771 Ok(Self::send_immediate_rows(row))
1772 }
1773 None => {
1774 if session.vars().current_object_missing_warnings() {
1775 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1776 search_path: session
1777 .vars()
1778 .search_path()
1779 .into_iter()
1780 .map(|schema| schema.to_string())
1781 .collect(),
1782 });
1783 }
1784 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1785 }
1786 };
1787 }
1788
1789 let variable = session
1790 .vars()
1791 .get(self.catalog().system_config(), &plan.name)
1792 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1793
1794 variable.visible(session.user(), self.catalog().system_config())?;
1797
1798 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1799 if variable.name() == vars::DATABASE.name()
1800 && matches!(
1801 self.catalog().resolve_database(&variable.value()),
1802 Err(CatalogError::UnknownDatabase(_))
1803 )
1804 && session.vars().current_object_missing_warnings()
1805 {
1806 let name = variable.value();
1807 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1808 } else if variable.name() == vars::CLUSTER.name()
1809 && matches!(
1810 self.catalog().resolve_cluster(&variable.value()),
1811 Err(CatalogError::UnknownCluster(_))
1812 )
1813 && session.vars().current_object_missing_warnings()
1814 {
1815 let name = variable.value();
1816 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1817 }
1818 Ok(Self::send_immediate_rows(row))
1819 }
1820
1821 #[instrument]
1822 pub(super) async fn sequence_inspect_shard(
1823 &self,
1824 session: &Session,
1825 plan: plan::InspectShardPlan,
1826 ) -> Result<ExecuteResponse, AdapterError> {
1827 if !session.user().is_internal() {
1830 return Err(AdapterError::Unauthorized(
1831 rbac::UnauthorizedError::MzSystem {
1832 action: "inspect".into(),
1833 },
1834 ));
1835 }
1836 let state = self
1837 .controller
1838 .storage
1839 .inspect_persist_state(plan.id)
1840 .await?;
1841 let jsonb = Jsonb::from_serde_json(state)?;
1842 Ok(Self::send_immediate_rows(jsonb.into_row()))
1843 }
1844
1845 #[instrument]
1846 pub(super) fn sequence_set_variable(
1847 &self,
1848 session: &mut Session,
1849 plan: plan::SetVariablePlan,
1850 ) -> Result<ExecuteResponse, AdapterError> {
1851 let (name, local) = (plan.name, plan.local);
1852 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1853 self.validate_set_isolation_level(session)?;
1854 }
1855 if &name == vars::CLUSTER.name() {
1856 self.validate_set_cluster(session)?;
1857 }
1858
1859 let vars = session.vars_mut();
1860 let values = match plan.value {
1861 plan::VariableValue::Default => None,
1862 plan::VariableValue::Values(values) => Some(values),
1863 };
1864
1865 match values {
1866 Some(values) => {
1867 vars.set(
1868 self.catalog().system_config(),
1869 &name,
1870 VarInput::SqlSet(&values),
1871 local,
1872 )?;
1873
1874 let vars = session.vars();
1875
1876 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1879 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1880 } else if name == vars::CLUSTER.name()
1881 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1882 {
1883 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1884 }
1885
1886 if name.as_str() == vars::DATABASE.name()
1888 && matches!(
1889 self.catalog().resolve_database(vars.database()),
1890 Err(CatalogError::UnknownDatabase(_))
1891 )
1892 && session.vars().current_object_missing_warnings()
1893 {
1894 let name = vars.database().to_string();
1895 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1896 } else if name.as_str() == vars::CLUSTER.name()
1897 && matches!(
1898 self.catalog().resolve_cluster(vars.cluster()),
1899 Err(CatalogError::UnknownCluster(_))
1900 )
1901 && session.vars().current_object_missing_warnings()
1902 {
1903 let name = vars.cluster().to_string();
1904 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1905 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1906 let v = values.into_first().to_lowercase();
1907 if v == IsolationLevel::ReadUncommitted.as_str()
1908 || v == IsolationLevel::ReadCommitted.as_str()
1909 || v == IsolationLevel::RepeatableRead.as_str()
1910 {
1911 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1912 isolation_level: v,
1913 });
1914 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1915 session.add_notice(AdapterNotice::StrongSessionSerializable);
1916 }
1917 }
1918 }
1919 None => vars.reset(self.catalog().system_config(), &name, local)?,
1920 }
1921
1922 Ok(ExecuteResponse::SetVariable { name, reset: false })
1923 }
1924
1925 pub(super) fn sequence_reset_variable(
1926 &self,
1927 session: &mut Session,
1928 plan: plan::ResetVariablePlan,
1929 ) -> Result<ExecuteResponse, AdapterError> {
1930 let name = plan.name;
1931 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1932 self.validate_set_isolation_level(session)?;
1933 }
1934 if &name == vars::CLUSTER.name() {
1935 self.validate_set_cluster(session)?;
1936 }
1937 session
1938 .vars_mut()
1939 .reset(self.catalog().system_config(), &name, false)?;
1940 Ok(ExecuteResponse::SetVariable { name, reset: true })
1941 }
1942
1943 pub(super) fn sequence_set_transaction(
1944 &self,
1945 session: &mut Session,
1946 plan: plan::SetTransactionPlan,
1947 ) -> Result<ExecuteResponse, AdapterError> {
1948 for mode in plan.modes {
1950 match mode {
1951 TransactionMode::AccessMode(_) => {
1952 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1953 }
1954 TransactionMode::IsolationLevel(isolation_level) => {
1955 self.validate_set_isolation_level(session)?;
1956
1957 session.vars_mut().set(
1958 self.catalog().system_config(),
1959 TRANSACTION_ISOLATION_VAR_NAME,
1960 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1961 plan.local,
1962 )?
1963 }
1964 }
1965 }
1966 Ok(ExecuteResponse::SetVariable {
1967 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1968 reset: false,
1969 })
1970 }
1971
1972 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1973 if session.transaction().contains_ops() {
1974 Err(AdapterError::InvalidSetIsolationLevel)
1975 } else {
1976 Ok(())
1977 }
1978 }
1979
1980 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1981 if session.transaction().contains_ops() {
1982 Err(AdapterError::InvalidSetCluster)
1983 } else {
1984 Ok(())
1985 }
1986 }
1987
1988 #[instrument]
1989 pub(super) async fn sequence_end_transaction(
1990 &mut self,
1991 mut ctx: ExecuteContext,
1992 mut action: EndTransactionAction,
1993 ) {
1994 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1996 (&action, ctx.session().transaction())
1997 {
1998 action = EndTransactionAction::Rollback;
1999 }
2000 let response = match action {
2001 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2002 params: BTreeMap::new(),
2003 }),
2004 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2005 params: BTreeMap::new(),
2006 }),
2007 };
2008
2009 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2010
2011 let (response, action) = match result {
2012 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2013 (response, action)
2014 }
2015 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2016 let validated_locks = match write_lock_guards {
2020 None => None,
2021 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2022 Ok(locks) => Some(locks),
2023 Err(missing) => {
2024 tracing::error!(?missing, "programming error, missing write locks");
2025 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2026 }
2027 },
2028 };
2029
2030 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2031 for WriteOp { id, rows } in writes {
2032 let total_rows = collected_writes.entry(id).or_default();
2033 total_rows.push(rows);
2034 }
2035
2036 self.submit_write(PendingWriteTxn::User {
2037 span: Span::current(),
2038 writes: collected_writes,
2039 write_locks: validated_locks,
2040 pending_txn: PendingTxn {
2041 ctx,
2042 response,
2043 action,
2044 },
2045 });
2046 return;
2047 }
2048 Ok((
2049 Some(TransactionOps::Peeks {
2050 determination,
2051 requires_linearization: RequireLinearization::Required,
2052 ..
2053 }),
2054 _,
2055 )) if ctx.session().vars().transaction_isolation()
2056 == &IsolationLevel::StrictSerializable =>
2057 {
2058 let conn_id = ctx.session().conn_id().clone();
2059 let pending_read_txn = PendingReadTxn {
2060 txn: PendingRead::Read {
2061 txn: PendingTxn {
2062 ctx,
2063 response,
2064 action,
2065 },
2066 },
2067 timestamp_context: determination.timestamp_context,
2068 created: Instant::now(),
2069 num_requeues: 0,
2070 otel_ctx: OpenTelemetryContext::obtain(),
2071 };
2072 self.strict_serializable_reads_tx
2073 .send((conn_id, pending_read_txn))
2074 .expect("sending to strict_serializable_reads_tx cannot fail");
2075 return;
2076 }
2077 Ok((
2078 Some(TransactionOps::Peeks {
2079 determination,
2080 requires_linearization: RequireLinearization::Required,
2081 ..
2082 }),
2083 _,
2084 )) if ctx.session().vars().transaction_isolation()
2085 == &IsolationLevel::StrongSessionSerializable =>
2086 {
2087 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2088 ctx.session_mut()
2089 .ensure_timestamp_oracle(timeline.clone())
2090 .apply_write(*ts);
2091 }
2092 (response, action)
2093 }
2094 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2095 self.internal_cmd_tx
2096 .send(Message::ExecuteSingleStatementTransaction {
2097 ctx,
2098 otel_ctx: OpenTelemetryContext::obtain(),
2099 stmt,
2100 params,
2101 })
2102 .expect("must send");
2103 return;
2104 }
2105 Ok((_, _)) => (response, action),
2106 Err(err) => (Err(err), EndTransactionAction::Rollback),
2107 };
2108 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2109 let response = response.map(|mut r| {
2111 r.extend_params(changed);
2112 ExecuteResponse::from(r)
2113 });
2114
2115 ctx.retire(response);
2116 }
2117
2118 #[instrument]
2119 async fn sequence_end_transaction_inner(
2120 &mut self,
2121 ctx: &mut ExecuteContext,
2122 action: EndTransactionAction,
2123 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2124 let txn = self.clear_transaction(ctx.session_mut()).await;
2125
2126 if let EndTransactionAction::Commit = action {
2127 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2128 match &mut ops {
2129 TransactionOps::Writes(writes) => {
2130 for WriteOp { id, .. } in &mut writes.iter() {
2131 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2133 AdapterError::Catalog(mz_catalog::memory::error::Error {
2134 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2135 })
2136 })?;
2137 }
2138
2139 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2141 }
2142 TransactionOps::DDL {
2143 ops,
2144 state: _,
2145 side_effects,
2146 revision,
2147 } => {
2148 if *revision != self.catalog().transient_revision() {
2150 return Err(AdapterError::DDLTransactionRace);
2151 }
2152 let ops = std::mem::take(ops);
2154 let side_effects = std::mem::take(side_effects);
2155 self.catalog_transact_with_side_effects(
2156 Some(ctx),
2157 ops,
2158 move |a, mut ctx| {
2159 Box::pin(async move {
2160 for side_effect in side_effects {
2161 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2162 }
2163 })
2164 },
2165 )
2166 .await?;
2167 }
2168 _ => (),
2169 }
2170 return Ok((Some(ops), write_lock_guards));
2171 }
2172 }
2173
2174 Ok((None, None))
2175 }
2176
2177 pub(super) async fn sequence_side_effecting_func(
2178 &mut self,
2179 ctx: ExecuteContext,
2180 plan: SideEffectingFunc,
2181 ) {
2182 match plan {
2183 SideEffectingFunc::PgCancelBackend { connection_id } => {
2184 if ctx.session().conn_id().unhandled() == connection_id {
2185 ctx.retire(Err(AdapterError::Canceled));
2189 return;
2190 }
2191
2192 let res = if let Some((id_handle, _conn_meta)) =
2193 self.active_conns.get_key_value(&connection_id)
2194 {
2195 self.handle_privileged_cancel(id_handle.clone()).await;
2197 Datum::True
2198 } else {
2199 Datum::False
2200 };
2201 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2202 }
2203 }
2204 }
2205
2206 pub(crate) async fn execute_side_effecting_func(
2215 &mut self,
2216 plan: SideEffectingFunc,
2217 conn_id: ConnectionId,
2218 current_role: RoleId,
2219 ) -> Result<ExecuteResponse, AdapterError> {
2220 match plan {
2221 SideEffectingFunc::PgCancelBackend { connection_id } => {
2222 if conn_id.unhandled() == connection_id {
2223 return Err(AdapterError::Canceled);
2227 }
2228
2229 if let Some((_id_handle, conn_meta)) =
2232 self.active_conns.get_key_value(&connection_id)
2233 {
2234 let target_role = *conn_meta.authenticated_role_id();
2235 let role_membership = self
2236 .catalog()
2237 .state()
2238 .collect_role_membership(¤t_role);
2239 if !role_membership.contains(&target_role) {
2240 let target_role_name = self
2241 .catalog()
2242 .try_get_role(&target_role)
2243 .map(|role| role.name().to_string())
2244 .unwrap_or_else(|| target_role.to_string());
2245 return Err(AdapterError::Unauthorized(
2246 rbac::UnauthorizedError::RoleMembership {
2247 role_names: vec![target_role_name],
2248 },
2249 ));
2250 }
2251
2252 let id_handle = self
2254 .active_conns
2255 .get_key_value(&connection_id)
2256 .map(|(id, _)| id.clone())
2257 .expect("checked above");
2258 self.handle_privileged_cancel(id_handle).await;
2259 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2260 } else {
2261 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2263 }
2264 }
2265 }
2266 }
2267
2268 pub(crate) async fn determine_real_time_recent_timestamp(
2272 &self,
2273 source_ids: impl Iterator<Item = GlobalId>,
2274 real_time_recency_timeout: Duration,
2275 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2276 let item_ids = source_ids
2277 .map(|gid| {
2278 self.catalog
2279 .try_resolve_item_id(&gid)
2280 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2281 })
2282 .collect::<Result<Vec<_>, _>>()?;
2283
2284 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2290 if to_visit.is_empty() {
2293 return Ok(None);
2294 }
2295
2296 let mut timestamp_objects = BTreeSet::new();
2297
2298 while let Some(id) = to_visit.pop_front() {
2299 timestamp_objects.insert(id);
2300 to_visit.extend(
2301 self.catalog()
2302 .get_entry(&id)
2303 .uses()
2304 .into_iter()
2305 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2306 );
2307 }
2308 let timestamp_objects = timestamp_objects
2309 .into_iter()
2310 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2311 .collect();
2312
2313 let r = self
2314 .controller
2315 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2316 .await?;
2317
2318 Ok(Some(r))
2319 }
2320
2321 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2324 &self,
2325 session: &Session,
2326 source_ids: impl Iterator<Item = GlobalId>,
2327 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2328 let vars = session.vars();
2329
2330 if vars.real_time_recency()
2331 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2332 && !session.contains_read_timestamp()
2333 {
2334 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2335 .await
2336 } else {
2337 Ok(None)
2338 }
2339 }
2340
2341 #[instrument]
2342 pub(super) async fn sequence_explain_plan(
2343 &mut self,
2344 ctx: ExecuteContext,
2345 plan: plan::ExplainPlanPlan,
2346 target_cluster: TargetCluster,
2347 ) {
2348 match &plan.explainee {
2349 plan::Explainee::Statement(stmt) => match stmt {
2350 plan::ExplaineeStatement::CreateView { .. } => {
2351 self.explain_create_view(ctx, plan).await;
2352 }
2353 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2354 self.explain_create_materialized_view(ctx, plan).await;
2355 }
2356 plan::ExplaineeStatement::CreateIndex { .. } => {
2357 self.explain_create_index(ctx, plan).await;
2358 }
2359 plan::ExplaineeStatement::Select { .. } => {
2360 self.explain_peek(ctx, plan, target_cluster).await;
2361 }
2362 plan::ExplaineeStatement::Subscribe { .. } => {
2363 self.explain_subscribe(ctx, plan, target_cluster).await;
2364 }
2365 },
2366 plan::Explainee::View(_) => {
2367 let result = self.explain_view(&ctx, plan);
2368 ctx.retire(result);
2369 }
2370 plan::Explainee::MaterializedView(_) => {
2371 let result = self.explain_materialized_view(&ctx, plan);
2372 ctx.retire(result);
2373 }
2374 plan::Explainee::Index(_) => {
2375 let result = self.explain_index(&ctx, plan);
2376 ctx.retire(result);
2377 }
2378 plan::Explainee::ReplanView(_) => {
2379 self.explain_replan_view(ctx, plan).await;
2380 }
2381 plan::Explainee::ReplanMaterializedView(_) => {
2382 self.explain_replan_materialized_view(ctx, plan).await;
2383 }
2384 plan::Explainee::ReplanIndex(_) => {
2385 self.explain_replan_index(ctx, plan).await;
2386 }
2387 };
2388 }
2389
2390 pub(super) async fn sequence_explain_pushdown(
2391 &mut self,
2392 ctx: ExecuteContext,
2393 plan: plan::ExplainPushdownPlan,
2394 target_cluster: TargetCluster,
2395 ) {
2396 match plan.explainee {
2397 Explainee::Statement(ExplaineeStatement::Select {
2398 broken: false,
2399 plan,
2400 desc: _,
2401 }) => {
2402 let stage = return_if_err!(
2403 self.peek_validate(
2404 ctx.session(),
2405 plan,
2406 target_cluster,
2407 None,
2408 ExplainContext::Pushdown,
2409 Some(ctx.session().vars().max_query_result_size()),
2410 ),
2411 ctx
2412 );
2413 self.sequence_staged(ctx, Span::current(), stage).await;
2414 }
2415 Explainee::MaterializedView(item_id) => {
2416 self.explain_pushdown_materialized_view(ctx, item_id).await;
2417 }
2418 _ => {
2419 ctx.retire(Err(AdapterError::Unsupported(
2420 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2421 )));
2422 }
2423 };
2424 }
2425
2426 async fn execute_explain_pushdown_with_read_holds(
2428 &self,
2429 ctx: ExecuteContext,
2430 as_of: Antichain<Timestamp>,
2431 mz_now: ResultSpec<'static>,
2432 read_holds: Option<ReadHolds<Timestamp>>,
2433 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2434 ) {
2435 let fut = self
2436 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2437 .await;
2438 task::spawn(|| "render explain pushdown", async move {
2439 let _read_holds = read_holds;
2441 let res = fut.await;
2442 ctx.retire(res);
2443 });
2444 }
2445
2446 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2448 &self,
2449 session: &Session,
2450 as_of: Antichain<Timestamp>,
2451 mz_now: ResultSpec<'static>,
2452 imports: I,
2453 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2454 super::explain_pushdown_future_inner(
2456 session,
2457 &self.catalog,
2458 &self.controller.storage_collections,
2459 as_of,
2460 mz_now,
2461 imports,
2462 )
2463 .await
2464 }
2465
2466 #[instrument]
2467 pub(super) async fn sequence_insert(
2468 &mut self,
2469 mut ctx: ExecuteContext,
2470 plan: plan::InsertPlan,
2471 ) {
2472 if !ctx.session_mut().transaction().allows_writes() {
2480 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2481 return;
2482 }
2483
2484 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2498 let expr = return_if_err!(
2501 plan.values
2502 .clone()
2503 .lower(self.catalog().system_config(), None),
2504 ctx
2505 );
2506 OptimizedMirRelationExpr(expr)
2507 } else {
2508 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2510
2511 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2513
2514 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2516 };
2517
2518 match optimized_mir.into_inner() {
2519 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2520 let catalog = self.owned_catalog();
2521 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2522 let result =
2523 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2524 ctx.retire(result);
2525 });
2526 }
2527 _ => {
2529 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2530 Some(table) => {
2531 let desc = table.relation_desc_latest().expect("table has a desc");
2533 desc.arity()
2534 }
2535 None => {
2536 ctx.retire(Err(AdapterError::Catalog(
2537 mz_catalog::memory::error::Error {
2538 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2539 plan.id.to_string(),
2540 )),
2541 },
2542 )));
2543 return;
2544 }
2545 };
2546
2547 let finishing = RowSetFinishing {
2548 order_by: vec![],
2549 limit: None,
2550 offset: 0,
2551 project: (0..desc_arity).collect(),
2552 };
2553
2554 let read_then_write_plan = plan::ReadThenWritePlan {
2555 id: plan.id,
2556 selection: plan.values,
2557 finishing,
2558 assignments: BTreeMap::new(),
2559 kind: MutationKind::Insert,
2560 returning: plan.returning,
2561 };
2562
2563 self.sequence_read_then_write(ctx, read_then_write_plan)
2564 .await;
2565 }
2566 }
2567 }
2568
2569 #[instrument]
2574 pub(super) async fn sequence_read_then_write(
2575 &mut self,
2576 mut ctx: ExecuteContext,
2577 plan: plan::ReadThenWritePlan,
2578 ) {
2579 let mut source_ids: BTreeSet<_> = plan
2580 .selection
2581 .depends_on()
2582 .into_iter()
2583 .map(|gid| self.catalog().resolve_item_id(&gid))
2584 .collect();
2585 source_ids.insert(plan.id);
2586
2587 if ctx.session().transaction().write_locks().is_none() {
2589 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2591
2592 for id in &source_ids {
2594 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2595 write_locks.insert_lock(*id, lock);
2596 }
2597 }
2598
2599 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2601 Ok(locks) => locks,
2602 Err(missing) => {
2603 let role_metadata = ctx.session().role_metadata().clone();
2605 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2606 let plan = DeferredPlan {
2607 ctx,
2608 plan: Plan::ReadThenWrite(plan),
2609 validity: PlanValidity::new(
2610 self.catalog.transient_revision(),
2611 source_ids.clone(),
2612 None,
2613 None,
2614 role_metadata,
2615 ),
2616 requires_locks: source_ids,
2617 };
2618 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2619 }
2620 };
2621
2622 ctx.session_mut()
2623 .try_grant_write_locks(write_locks)
2624 .expect("session has already been granted write locks");
2625 }
2626
2627 let plan::ReadThenWritePlan {
2628 id,
2629 kind,
2630 selection,
2631 mut assignments,
2632 finishing,
2633 mut returning,
2634 } = plan;
2635
2636 let desc = match self.catalog().try_get_entry(&id) {
2638 Some(table) => {
2639 table
2641 .relation_desc_latest()
2642 .expect("table has a desc")
2643 .into_owned()
2644 }
2645 None => {
2646 ctx.retire(Err(AdapterError::Catalog(
2647 mz_catalog::memory::error::Error {
2648 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2649 },
2650 )));
2651 return;
2652 }
2653 };
2654
2655 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2657 || assignments.values().any(|e| e.contains_temporal())
2658 || returning.iter().any(|e| e.contains_temporal());
2659 if contains_temporal {
2660 ctx.retire(Err(AdapterError::Unsupported(
2661 "calls to mz_now in write statements",
2662 )));
2663 return;
2664 }
2665
2666 fn validate_read_dependencies(
2674 catalog: &Catalog,
2675 id: &CatalogItemId,
2676 ) -> Result<(), AdapterError> {
2677 use CatalogItemType::*;
2678 use mz_catalog::memory::objects;
2679 let mut ids_to_check = Vec::new();
2680 let valid = match catalog.try_get_entry(id) {
2681 Some(entry) => {
2682 if let CatalogItem::View(objects::View { optimized_expr, .. })
2683 | CatalogItem::MaterializedView(objects::MaterializedView {
2684 optimized_expr,
2685 ..
2686 }) = entry.item()
2687 {
2688 if optimized_expr.contains_temporal() {
2689 return Err(AdapterError::Unsupported(
2690 "calls to mz_now in write statements",
2691 ));
2692 }
2693 }
2694 match entry.item().typ() {
2695 typ @ (Func | View | MaterializedView | ContinualTask) => {
2696 ids_to_check.extend(entry.uses());
2697 let valid_id = id.is_user() || matches!(typ, Func);
2698 valid_id
2699 }
2700 Source | Secret | Connection => false,
2701 Sink | Index => unreachable!(),
2703 Table => {
2704 if !id.is_user() {
2705 false
2707 } else {
2708 entry.source_export_details().is_none()
2710 }
2711 }
2712 Type => true,
2713 }
2714 }
2715 None => false,
2716 };
2717 if !valid {
2718 let (object_name, object_type) = match catalog.try_get_entry(id) {
2719 Some(entry) => {
2720 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2721 let object_type = match entry.item().typ() {
2722 Source => "source",
2724 Secret => "secret",
2725 Connection => "connection",
2726 Table => {
2727 if !id.is_user() {
2728 "system table"
2729 } else {
2730 "source-export table"
2731 }
2732 }
2733 View => "system view",
2734 MaterializedView => "system materialized view",
2735 ContinualTask => "system task",
2736 _ => "invalid dependency",
2737 };
2738 (object_name, object_type.to_string())
2739 }
2740 None => (id.to_string(), "unknown".to_string()),
2741 };
2742 return Err(AdapterError::InvalidTableMutationSelection {
2743 object_name,
2744 object_type,
2745 });
2746 }
2747 for id in ids_to_check {
2748 validate_read_dependencies(catalog, &id)?;
2749 }
2750 Ok(())
2751 }
2752
2753 for gid in selection.depends_on() {
2754 let item_id = self.catalog().resolve_item_id(&gid);
2755 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2756 ctx.retire(Err(err));
2757 return;
2758 }
2759 }
2760
2761 let (peek_tx, peek_rx) = oneshot::channel();
2762 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2763 let (tx, _, session, extra) = ctx.into_parts();
2764 let peek_ctx = ExecuteContext::from_parts(
2776 peek_client_tx,
2777 self.internal_cmd_tx.clone(),
2778 session,
2779 Default::default(),
2780 );
2781
2782 self.sequence_peek(
2783 peek_ctx,
2784 plan::SelectPlan {
2785 select: None,
2786 source: selection,
2787 when: QueryWhen::FreshestTableWrite,
2788 finishing,
2789 copy_to: None,
2790 },
2791 TargetCluster::Active,
2792 None,
2793 )
2794 .await;
2795
2796 let internal_cmd_tx = self.internal_cmd_tx.clone();
2797 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2798 let catalog = self.owned_catalog();
2799 let max_result_size = self.catalog().system_config().max_result_size();
2800
2801 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2802 let (peek_response, session) = match peek_rx.await {
2803 Ok(Response {
2804 result: Ok(resp),
2805 session,
2806 otel_ctx,
2807 }) => {
2808 otel_ctx.attach_as_parent();
2809 (resp, session)
2810 }
2811 Ok(Response {
2812 result: Err(e),
2813 session,
2814 otel_ctx,
2815 }) => {
2816 let ctx =
2817 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2818 otel_ctx.attach_as_parent();
2819 ctx.retire(Err(e));
2820 return;
2821 }
2822 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2824 };
2825 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2826 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2827
2828 if timeout_dur == Duration::ZERO {
2830 timeout_dur = Duration::MAX;
2831 }
2832
2833 let style = ExprPrepOneShot {
2834 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2836 catalog_state: catalog.state(),
2837 };
2838 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2839 return_if_err!(style.prep_scalar_expr(expr), ctx);
2840 }
2841
2842 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2843 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2844 let arena = RowArena::new();
2845 let mut diffs = Vec::new();
2846 let mut datum_vec = mz_repr::DatumVec::new();
2847
2848 while let Some(row) = rows.next() {
2849 if !assignments.is_empty() {
2850 assert!(
2851 matches!(kind, MutationKind::Update),
2852 "only updates support assignments"
2853 );
2854 let mut datums = datum_vec.borrow_with(row);
2855 let mut updates = vec![];
2856 for (idx, expr) in &assignments {
2857 let updated = match expr.eval(&datums, &arena) {
2858 Ok(updated) => updated,
2859 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2860 };
2861 updates.push((*idx, updated));
2862 }
2863 for (idx, new_value) in updates {
2864 datums[idx] = new_value;
2865 }
2866 let updated = Row::pack_slice(&datums);
2867 diffs.push((updated, Diff::ONE));
2868 }
2869 match kind {
2870 MutationKind::Update | MutationKind::Delete => {
2874 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2875 }
2876 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2877 }
2878 }
2879
2880 let mut byte_size: u64 = 0;
2883 for (row, diff) in &diffs {
2884 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2885 if diff.is_positive() {
2886 for (idx, datum) in row.iter().enumerate() {
2887 desc.constraints_met(idx, &datum)?;
2888 }
2889 }
2890 }
2891 Ok((diffs, byte_size))
2892 };
2893
2894 let diffs = match peek_response {
2895 ExecuteResponse::SendingRowsStreaming {
2896 rows: mut rows_stream,
2897 ..
2898 } => {
2899 let mut byte_size: u64 = 0;
2900 let mut diffs = Vec::new();
2901 let result = loop {
2902 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2903 Ok(Some(res)) => match res {
2904 PeekResponseUnary::Rows(new_rows) => {
2905 match make_diffs(new_rows) {
2906 Ok((mut new_diffs, new_byte_size)) => {
2907 byte_size = byte_size.saturating_add(new_byte_size);
2908 if byte_size > max_result_size {
2909 break Err(AdapterError::ResultSize(format!(
2910 "result exceeds max size of {max_result_size}"
2911 )));
2912 }
2913 diffs.append(&mut new_diffs)
2914 }
2915 Err(e) => break Err(e),
2916 };
2917 }
2918 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2919 PeekResponseUnary::Error(e) => {
2920 break Err(AdapterError::Unstructured(anyhow!(e)));
2921 }
2922 },
2923 Ok(None) => break Ok(diffs),
2924 Err(_) => {
2925 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2930 conn_id: ctx.session().conn_id().clone(),
2931 });
2932 if let Err(e) = result {
2933 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2934 }
2935 break Err(AdapterError::StatementTimeout);
2936 }
2937 }
2938 };
2939
2940 result
2941 }
2942 ExecuteResponse::SendingRowsImmediate { rows } => {
2943 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2944 }
2945 resp => Err(AdapterError::Unstructured(anyhow!(
2946 "unexpected peek response: {resp:?}"
2947 ))),
2948 };
2949
2950 let mut returning_rows = Vec::new();
2951 let mut diff_err: Option<AdapterError> = None;
2952 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2953 let arena = RowArena::new();
2954 for (row, diff) in diffs {
2955 if !diff.is_positive() {
2956 continue;
2957 }
2958 let mut returning_row = Row::with_capacity(returning.len());
2959 let mut packer = returning_row.packer();
2960 for expr in &returning {
2961 let datums: Vec<_> = row.iter().collect();
2962 match expr.eval(&datums, &arena) {
2963 Ok(datum) => {
2964 packer.push(datum);
2965 }
2966 Err(err) => {
2967 diff_err = Some(err.into());
2968 break;
2969 }
2970 }
2971 }
2972 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2973 let diff = match NonZeroUsize::try_from(diff) {
2974 Ok(diff) => diff,
2975 Err(err) => {
2976 diff_err = Some(err.into());
2977 break;
2978 }
2979 };
2980 returning_rows.push((returning_row, diff));
2981 if diff_err.is_some() {
2982 break;
2983 }
2984 }
2985 }
2986 let diffs = if let Some(err) = diff_err {
2987 Err(err)
2988 } else {
2989 diffs
2990 };
2991
2992 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2995 if let Some(timestamp_context) = timestamp_context {
3004 let (tx, rx) = tokio::sync::oneshot::channel();
3005 let conn_id = ctx.session().conn_id().clone();
3006 let pending_read_txn = PendingReadTxn {
3007 txn: PendingRead::ReadThenWrite { ctx, tx },
3008 timestamp_context,
3009 created: Instant::now(),
3010 num_requeues: 0,
3011 otel_ctx: OpenTelemetryContext::obtain(),
3012 };
3013 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3014 if let Err(e) = result {
3016 warn!(
3017 "strict_serializable_reads_tx dropped before we could send: {:?}",
3018 e
3019 );
3020 return;
3021 }
3022 let result = rx.await;
3023 ctx = match result {
3025 Ok(Some(ctx)) => ctx,
3026 Ok(None) => {
3027 return;
3030 }
3031 Err(e) => {
3032 warn!(
3033 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3034 e
3035 );
3036 return;
3037 }
3038 };
3039 }
3040
3041 match diffs {
3042 Ok(diffs) => {
3043 let result = Self::send_diffs(
3044 ctx.session_mut(),
3045 plan::SendDiffsPlan {
3046 id,
3047 updates: diffs,
3048 kind,
3049 returning: returning_rows,
3050 max_result_size,
3051 },
3052 );
3053 ctx.retire(result);
3054 }
3055 Err(e) => {
3056 ctx.retire(Err(e));
3057 }
3058 }
3059 });
3060 }
3061
3062 #[instrument]
3063 pub(super) async fn sequence_alter_item_rename(
3064 &mut self,
3065 ctx: &mut ExecuteContext,
3066 plan: plan::AlterItemRenamePlan,
3067 ) -> Result<ExecuteResponse, AdapterError> {
3068 let op = catalog::Op::RenameItem {
3069 id: plan.id,
3070 current_full_name: plan.current_full_name,
3071 to_name: plan.to_name,
3072 };
3073 match self
3074 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3075 .await
3076 {
3077 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3078 Err(err) => Err(err),
3079 }
3080 }
3081
3082 #[instrument]
3083 pub(super) async fn sequence_alter_retain_history(
3084 &mut self,
3085 ctx: &mut ExecuteContext,
3086 plan: plan::AlterRetainHistoryPlan,
3087 ) -> Result<ExecuteResponse, AdapterError> {
3088 let ops = vec![catalog::Op::AlterRetainHistory {
3089 id: plan.id,
3090 value: plan.value,
3091 window: plan.window,
3092 }];
3093 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3094 Box::pin(async move {
3095 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3096 let cluster = match catalog_item {
3097 CatalogItem::Table(_)
3098 | CatalogItem::MaterializedView(_)
3099 | CatalogItem::Source(_)
3100 | CatalogItem::ContinualTask(_) => None,
3101 CatalogItem::Index(index) => Some(index.cluster_id),
3102 CatalogItem::Log(_)
3103 | CatalogItem::View(_)
3104 | CatalogItem::Sink(_)
3105 | CatalogItem::Type(_)
3106 | CatalogItem::Func(_)
3107 | CatalogItem::Secret(_)
3108 | CatalogItem::Connection(_) => unreachable!(),
3109 };
3110 match cluster {
3111 Some(cluster) => {
3112 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3113 }
3114 None => {
3115 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3116 }
3117 }
3118 })
3119 })
3120 .await?;
3121 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3122 }
3123
3124 #[instrument]
3125 pub(super) async fn sequence_alter_schema_rename(
3126 &mut self,
3127 ctx: &mut ExecuteContext,
3128 plan: plan::AlterSchemaRenamePlan,
3129 ) -> Result<ExecuteResponse, AdapterError> {
3130 let (database_spec, schema_spec) = plan.cur_schema_spec;
3131 let op = catalog::Op::RenameSchema {
3132 database_spec,
3133 schema_spec,
3134 new_name: plan.new_schema_name,
3135 check_reserved_names: true,
3136 };
3137 match self
3138 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3139 .await
3140 {
3141 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3142 Err(err) => Err(err),
3143 }
3144 }
3145
3146 #[instrument]
3147 pub(super) async fn sequence_alter_schema_swap(
3148 &mut self,
3149 ctx: &mut ExecuteContext,
3150 plan: plan::AlterSchemaSwapPlan,
3151 ) -> Result<ExecuteResponse, AdapterError> {
3152 let plan::AlterSchemaSwapPlan {
3153 schema_a_spec: (schema_a_db, schema_a),
3154 schema_a_name,
3155 schema_b_spec: (schema_b_db, schema_b),
3156 schema_b_name,
3157 name_temp,
3158 } = plan;
3159
3160 let op_a = catalog::Op::RenameSchema {
3161 database_spec: schema_a_db,
3162 schema_spec: schema_a,
3163 new_name: name_temp,
3164 check_reserved_names: false,
3165 };
3166 let op_b = catalog::Op::RenameSchema {
3167 database_spec: schema_b_db,
3168 schema_spec: schema_b,
3169 new_name: schema_a_name,
3170 check_reserved_names: false,
3171 };
3172 let op_c = catalog::Op::RenameSchema {
3173 database_spec: schema_a_db,
3174 schema_spec: schema_a,
3175 new_name: schema_b_name,
3176 check_reserved_names: false,
3177 };
3178
3179 match self
3180 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3181 Box::pin(async {})
3182 })
3183 .await
3184 {
3185 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3186 Err(err) => Err(err),
3187 }
3188 }
3189
3190 #[instrument]
3191 pub(super) async fn sequence_alter_role(
3192 &mut self,
3193 session: &Session,
3194 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3195 ) -> Result<ExecuteResponse, AdapterError> {
3196 let catalog = self.catalog().for_session(session);
3197 let role = catalog.get_role(&id);
3198
3199 let mut notices = vec![];
3201
3202 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3204 let mut vars = role.vars().clone();
3205
3206 let mut nopassword = false;
3209
3210 match option {
3212 PlannedAlterRoleOption::Attributes(attrs) => {
3213 self.validate_role_attributes(&attrs.clone().into())?;
3214
3215 if let Some(inherit) = attrs.inherit {
3216 attributes.inherit = inherit;
3217 }
3218
3219 if let Some(password) = attrs.password {
3220 attributes.password = Some(password);
3221 attributes.scram_iterations =
3222 Some(self.catalog().system_config().scram_iterations())
3223 }
3224
3225 if let Some(superuser) = attrs.superuser {
3226 attributes.superuser = Some(superuser);
3227 }
3228
3229 if let Some(login) = attrs.login {
3230 attributes.login = Some(login);
3231 }
3232
3233 if attrs.nopassword.unwrap_or(false) {
3234 nopassword = true;
3235 }
3236
3237 if let Some(notice) = self.should_emit_rbac_notice(session) {
3238 notices.push(notice);
3239 }
3240 }
3241 PlannedAlterRoleOption::Variable(variable) => {
3242 let session_var = session.vars().inspect(variable.name())?;
3244 session_var.visible(session.user(), catalog.system_vars())?;
3246
3247 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3250 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3251 } else if let PlannedRoleVariable::Set {
3252 name,
3253 value: VariableValue::Values(vals),
3254 } = &variable
3255 {
3256 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3257 notices.push(AdapterNotice::IntrospectionClusterUsage);
3258 }
3259 }
3260
3261 let var_name = match variable {
3262 PlannedRoleVariable::Set { name, value } => {
3263 match &value {
3265 VariableValue::Default => {
3266 vars.remove(&name);
3267 }
3268 VariableValue::Values(vals) => {
3269 let var = match &vals[..] {
3270 [val] => OwnedVarInput::Flat(val.clone()),
3271 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3272 };
3273 session_var.check(var.borrow())?;
3275
3276 vars.insert(name.clone(), var);
3277 }
3278 };
3279 name
3280 }
3281 PlannedRoleVariable::Reset { name } => {
3282 vars.remove(&name);
3284 name
3285 }
3286 };
3287
3288 notices.push(AdapterNotice::VarDefaultUpdated {
3290 role: Some(name.clone()),
3291 var_name: Some(var_name),
3292 });
3293 }
3294 }
3295
3296 let op = catalog::Op::AlterRole {
3297 id,
3298 name,
3299 attributes,
3300 nopassword,
3301 vars: RoleVars { map: vars },
3302 };
3303 let response = self
3304 .catalog_transact(Some(session), vec![op])
3305 .await
3306 .map(|_| ExecuteResponse::AlteredRole)?;
3307
3308 session.add_notices(notices);
3310
3311 Ok(response)
3312 }
3313
3314 #[instrument]
3315 pub(super) async fn sequence_alter_sink_prepare(
3316 &mut self,
3317 ctx: ExecuteContext,
3318 plan: plan::AlterSinkPlan,
3319 ) {
3320 let id_bundle = crate::CollectionIdBundle {
3322 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3323 compute_ids: BTreeMap::new(),
3324 };
3325 let read_hold = self.acquire_read_holds(&id_bundle);
3326
3327 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3328 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3329 return;
3330 };
3331
3332 let otel_ctx = OpenTelemetryContext::obtain();
3333 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3334
3335 let plan_validity = PlanValidity::new(
3336 self.catalog().transient_revision(),
3337 BTreeSet::from_iter([plan.item_id, from_item_id]),
3338 Some(plan.in_cluster),
3339 None,
3340 ctx.session().role_metadata().clone(),
3341 );
3342
3343 info!(
3344 "preparing alter sink for {}: frontiers={:?} export={:?}",
3345 plan.global_id,
3346 self.controller
3347 .storage_collections
3348 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3349 self.controller.storage.export(plan.global_id)
3350 );
3351
3352 self.install_storage_watch_set(
3358 ctx.session().conn_id().clone(),
3359 BTreeSet::from_iter([plan.global_id]),
3360 read_ts,
3361 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3362 ctx: Some(ctx),
3363 otel_ctx,
3364 plan,
3365 plan_validity,
3366 read_hold,
3367 }),
3368 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3369 }
3370
3371 #[instrument]
3372 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3373 ctx.otel_ctx.attach_as_parent();
3374
3375 let plan::AlterSinkPlan {
3376 item_id,
3377 global_id,
3378 sink: sink_plan,
3379 with_snapshot,
3380 in_cluster,
3381 } = ctx.plan.clone();
3382
3383 match ctx.plan_validity.check(self.catalog()) {
3392 Ok(()) => {}
3393 Err(err) => {
3394 ctx.retire(Err(err));
3395 return;
3396 }
3397 }
3398
3399 let entry = self.catalog().get_entry(&item_id);
3400 let CatalogItem::Sink(old_sink) = entry.item() else {
3401 panic!("invalid item kind for `AlterSinkPlan`");
3402 };
3403
3404 if sink_plan.version != old_sink.version + 1 {
3405 ctx.retire(Err(AdapterError::ChangedPlan(
3406 "sink was altered concurrently".into(),
3407 )));
3408 return;
3409 }
3410
3411 info!(
3412 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3413 self.controller
3414 .storage_collections
3415 .collections_frontiers(vec![global_id, sink_plan.from]),
3416 self.controller.storage.export(global_id),
3417 );
3418
3419 let write_frontier = &self
3422 .controller
3423 .storage
3424 .export(global_id)
3425 .expect("sink known to exist")
3426 .write_frontier;
3427 let as_of = ctx.read_hold.least_valid_read();
3428 assert!(
3429 write_frontier.iter().all(|t| as_of.less_than(t)),
3430 "{:?} should be strictly less than {:?}",
3431 &*as_of,
3432 &**write_frontier
3433 );
3434
3435 let create_sql = &old_sink.create_sql;
3441 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3442 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3443 unreachable!("invalid statement kind for sink");
3444 };
3445
3446 stmt.with_options
3448 .retain(|o| o.name != CreateSinkOptionName::Version);
3449 stmt.with_options.push(CreateSinkOption {
3450 name: CreateSinkOptionName::Version,
3451 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3452 sink_plan.version.to_string(),
3453 ))),
3454 });
3455
3456 let conn_catalog = self.catalog().for_system_session();
3457 let (mut stmt, resolved_ids) =
3458 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3459
3460 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3462 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3463 stmt.from = ResolvedItemName::Item {
3464 id: from_entry.id(),
3465 qualifiers: from_entry.name.qualifiers.clone(),
3466 full_name,
3467 print_id: true,
3468 version: from_entry.version,
3469 };
3470
3471 let new_sink = Sink {
3472 create_sql: stmt.to_ast_string_stable(),
3473 global_id,
3474 from: sink_plan.from,
3475 connection: sink_plan.connection.clone(),
3476 envelope: sink_plan.envelope,
3477 version: sink_plan.version,
3478 with_snapshot,
3479 resolved_ids: resolved_ids.clone(),
3480 cluster_id: in_cluster,
3481 commit_interval: sink_plan.commit_interval,
3482 };
3483
3484 let ops = vec![catalog::Op::UpdateItem {
3485 id: item_id,
3486 name: entry.name().clone(),
3487 to_item: CatalogItem::Sink(new_sink),
3488 }];
3489
3490 match self
3491 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3492 .await
3493 {
3494 Ok(()) => {}
3495 Err(err) => {
3496 ctx.retire(Err(err));
3497 return;
3498 }
3499 }
3500
3501 let storage_sink_desc = StorageSinkDesc {
3502 from: sink_plan.from,
3503 from_desc: from_entry
3504 .relation_desc()
3505 .expect("sinks can only be built on items with descs")
3506 .into_owned(),
3507 connection: sink_plan
3508 .connection
3509 .clone()
3510 .into_inline_connection(self.catalog().state()),
3511 envelope: sink_plan.envelope,
3512 as_of,
3513 with_snapshot,
3514 version: sink_plan.version,
3515 from_storage_metadata: (),
3516 to_storage_metadata: (),
3517 commit_interval: sink_plan.commit_interval,
3518 };
3519
3520 self.controller
3521 .storage
3522 .alter_export(
3523 global_id,
3524 ExportDescription {
3525 sink: storage_sink_desc,
3526 instance_id: in_cluster,
3527 },
3528 )
3529 .await
3530 .unwrap_or_terminate("cannot fail to alter source desc");
3531
3532 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3533 }
3534
3535 #[instrument]
3536 pub(super) async fn sequence_alter_connection(
3537 &mut self,
3538 ctx: ExecuteContext,
3539 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3540 ) {
3541 match action {
3542 AlterConnectionAction::RotateKeys => {
3543 self.sequence_rotate_keys(ctx, id).await;
3544 }
3545 AlterConnectionAction::AlterOptions {
3546 set_options,
3547 drop_options,
3548 validate,
3549 } => {
3550 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3551 .await
3552 }
3553 }
3554 }
3555
3556 #[instrument]
3557 async fn sequence_alter_connection_options(
3558 &mut self,
3559 mut ctx: ExecuteContext,
3560 id: CatalogItemId,
3561 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3562 drop_options: BTreeSet<ConnectionOptionName>,
3563 validate: bool,
3564 ) {
3565 let cur_entry = self.catalog().get_entry(&id);
3566 let cur_conn = cur_entry.connection().expect("known to be connection");
3567 let connection_gid = cur_conn.global_id();
3568
3569 let inner = || -> Result<Connection, AdapterError> {
3570 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3572 .expect("invalid create sql persisted to catalog")
3573 .into_element()
3574 .ast
3575 {
3576 Statement::CreateConnection(stmt) => stmt,
3577 _ => unreachable!("proved type is source"),
3578 };
3579
3580 let catalog = self.catalog().for_system_session();
3581
3582 let (mut create_conn_stmt, resolved_ids) =
3584 mz_sql::names::resolve(&catalog, create_conn_stmt)
3585 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3586
3587 create_conn_stmt
3589 .values
3590 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3591
3592 create_conn_stmt.values.extend(
3594 set_options
3595 .into_iter()
3596 .map(|(name, value)| ConnectionOption { name, value }),
3597 );
3598
3599 let mut catalog = self.catalog().for_system_session();
3602 catalog.mark_id_unresolvable_for_replanning(id);
3603
3604 let plan = match mz_sql::plan::plan(
3606 None,
3607 &catalog,
3608 Statement::CreateConnection(create_conn_stmt),
3609 &Params::empty(),
3610 &resolved_ids,
3611 )
3612 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3613 {
3614 Plan::CreateConnection(plan) => plan,
3615 _ => unreachable!("create source plan is only valid response"),
3616 };
3617
3618 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3620 .expect("invalid create sql persisted to catalog")
3621 .into_element()
3622 .ast
3623 {
3624 Statement::CreateConnection(stmt) => stmt,
3625 _ => unreachable!("proved type is source"),
3626 };
3627
3628 let catalog = self.catalog().for_system_session();
3629
3630 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3632 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3633
3634 Ok(Connection {
3635 create_sql: plan.connection.create_sql,
3636 global_id: cur_conn.global_id,
3637 details: plan.connection.details,
3638 resolved_ids: new_deps,
3639 })
3640 };
3641
3642 let conn = match inner() {
3643 Ok(conn) => conn,
3644 Err(e) => {
3645 return ctx.retire(Err(e));
3646 }
3647 };
3648
3649 if validate {
3650 let connection = conn
3651 .details
3652 .to_connection()
3653 .into_inline_connection(self.catalog().state());
3654
3655 let internal_cmd_tx = self.internal_cmd_tx.clone();
3656 let transient_revision = self.catalog().transient_revision();
3657 let conn_id = ctx.session().conn_id().clone();
3658 let otel_ctx = OpenTelemetryContext::obtain();
3659 let role_metadata = ctx.session().role_metadata().clone();
3660 let current_storage_parameters = self.controller.storage.config().clone();
3661
3662 task::spawn(
3663 || format!("validate_alter_connection:{conn_id}"),
3664 async move {
3665 let resolved_ids = conn.resolved_ids.clone();
3666 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3667 let result = match connection.validate(id, ¤t_storage_parameters).await {
3668 Ok(()) => Ok(conn),
3669 Err(err) => Err(err.into()),
3670 };
3671
3672 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3674 AlterConnectionValidationReady {
3675 ctx,
3676 result,
3677 connection_id: id,
3678 connection_gid,
3679 plan_validity: PlanValidity::new(
3680 transient_revision,
3681 dependency_ids.clone(),
3682 None,
3683 None,
3684 role_metadata,
3685 ),
3686 otel_ctx,
3687 resolved_ids,
3688 },
3689 ));
3690 if let Err(e) = result {
3691 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3692 }
3693 },
3694 );
3695 } else {
3696 let result = self
3697 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3698 .await;
3699 ctx.retire(result);
3700 }
3701 }
3702
3703 #[instrument]
3704 pub(crate) async fn sequence_alter_connection_stage_finish(
3705 &mut self,
3706 session: &Session,
3707 id: CatalogItemId,
3708 connection: Connection,
3709 ) -> Result<ExecuteResponse, AdapterError> {
3710 match self.catalog.get_entry(&id).item() {
3711 CatalogItem::Connection(curr_conn) => {
3712 curr_conn
3713 .details
3714 .to_connection()
3715 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3716 .map_err(StorageError::from)?;
3717 }
3718 _ => unreachable!("known to be a connection"),
3719 };
3720
3721 let ops = vec![catalog::Op::UpdateItem {
3722 id,
3723 name: self.catalog.get_entry(&id).name().clone(),
3724 to_item: CatalogItem::Connection(connection.clone()),
3725 }];
3726
3727 self.catalog_transact(Some(session), ops).await?;
3728
3729 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3736 }
3737
3738 #[instrument]
3739 pub(super) async fn sequence_alter_source(
3740 &mut self,
3741 session: &Session,
3742 plan::AlterSourcePlan {
3743 item_id,
3744 ingestion_id,
3745 action,
3746 }: plan::AlterSourcePlan,
3747 ) -> Result<ExecuteResponse, AdapterError> {
3748 let cur_entry = self.catalog().get_entry(&item_id);
3749 let cur_source = cur_entry.source().expect("known to be source");
3750
3751 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3752 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3754 .expect("invalid create sql persisted to catalog")
3755 .into_element()
3756 .ast
3757 {
3758 Statement::CreateSource(stmt) => stmt,
3759 _ => unreachable!("proved type is source"),
3760 };
3761
3762 let catalog = coord.catalog().for_system_session();
3763
3764 mz_sql::names::resolve(&catalog, create_source_stmt)
3766 .map_err(|e| AdapterError::internal(err_cx, e))
3767 };
3768
3769 match action {
3770 plan::AlterSourceAction::AddSubsourceExports {
3771 subsources,
3772 options,
3773 } => {
3774 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3775
3776 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3777 text_columns: mut new_text_columns,
3778 exclude_columns: mut new_exclude_columns,
3779 ..
3780 } = options.try_into()?;
3781
3782 let (mut create_source_stmt, resolved_ids) =
3784 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3785
3786 let catalog = self.catalog();
3788 let curr_references: BTreeSet<_> = catalog
3789 .get_entry(&item_id)
3790 .used_by()
3791 .into_iter()
3792 .filter_map(|subsource| {
3793 catalog
3794 .get_entry(subsource)
3795 .subsource_details()
3796 .map(|(_id, reference, _details)| reference)
3797 })
3798 .collect();
3799
3800 let purification_err =
3803 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3804
3805 match &mut create_source_stmt.connection {
3809 CreateSourceConnection::Postgres {
3810 options: curr_options,
3811 ..
3812 } => {
3813 let mz_sql::plan::PgConfigOptionExtracted {
3814 mut text_columns, ..
3815 } = curr_options.clone().try_into()?;
3816
3817 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3820
3821 text_columns.retain(|column_qualified_reference| {
3823 mz_ore::soft_assert_eq_or_log!(
3824 column_qualified_reference.0.len(),
3825 4,
3826 "all TEXT COLUMNS values must be column-qualified references"
3827 );
3828 let mut table = column_qualified_reference.clone();
3829 table.0.truncate(3);
3830 curr_references.contains(&table)
3831 });
3832
3833 new_text_columns.extend(text_columns);
3835
3836 if !new_text_columns.is_empty() {
3838 new_text_columns.sort();
3839 let new_text_columns = new_text_columns
3840 .into_iter()
3841 .map(WithOptionValue::UnresolvedItemName)
3842 .collect();
3843
3844 curr_options.push(PgConfigOption {
3845 name: PgConfigOptionName::TextColumns,
3846 value: Some(WithOptionValue::Sequence(new_text_columns)),
3847 });
3848 }
3849 }
3850 CreateSourceConnection::MySql {
3851 options: curr_options,
3852 ..
3853 } => {
3854 let mz_sql::plan::MySqlConfigOptionExtracted {
3855 mut text_columns,
3856 mut exclude_columns,
3857 ..
3858 } = curr_options.clone().try_into()?;
3859
3860 curr_options.retain(|o| {
3863 !matches!(
3864 o.name,
3865 MySqlConfigOptionName::TextColumns
3866 | MySqlConfigOptionName::ExcludeColumns
3867 )
3868 });
3869
3870 let column_referenced =
3872 |column_qualified_reference: &UnresolvedItemName| {
3873 mz_ore::soft_assert_eq_or_log!(
3874 column_qualified_reference.0.len(),
3875 3,
3876 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3877 );
3878 let mut table = column_qualified_reference.clone();
3879 table.0.truncate(2);
3880 curr_references.contains(&table)
3881 };
3882 text_columns.retain(column_referenced);
3883 exclude_columns.retain(column_referenced);
3884
3885 new_text_columns.extend(text_columns);
3887 new_exclude_columns.extend(exclude_columns);
3888
3889 if !new_text_columns.is_empty() {
3891 new_text_columns.sort();
3892 let new_text_columns = new_text_columns
3893 .into_iter()
3894 .map(WithOptionValue::UnresolvedItemName)
3895 .collect();
3896
3897 curr_options.push(MySqlConfigOption {
3898 name: MySqlConfigOptionName::TextColumns,
3899 value: Some(WithOptionValue::Sequence(new_text_columns)),
3900 });
3901 }
3902 if !new_exclude_columns.is_empty() {
3904 new_exclude_columns.sort();
3905 let new_exclude_columns = new_exclude_columns
3906 .into_iter()
3907 .map(WithOptionValue::UnresolvedItemName)
3908 .collect();
3909
3910 curr_options.push(MySqlConfigOption {
3911 name: MySqlConfigOptionName::ExcludeColumns,
3912 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3913 });
3914 }
3915 }
3916 CreateSourceConnection::SqlServer {
3917 options: curr_options,
3918 ..
3919 } => {
3920 let mz_sql::plan::SqlServerConfigOptionExtracted {
3921 mut text_columns,
3922 mut exclude_columns,
3923 ..
3924 } = curr_options.clone().try_into()?;
3925
3926 curr_options.retain(|o| {
3929 !matches!(
3930 o.name,
3931 SqlServerConfigOptionName::TextColumns
3932 | SqlServerConfigOptionName::ExcludeColumns
3933 )
3934 });
3935
3936 let column_referenced =
3938 |column_qualified_reference: &UnresolvedItemName| {
3939 mz_ore::soft_assert_eq_or_log!(
3940 column_qualified_reference.0.len(),
3941 3,
3942 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3943 );
3944 let mut table = column_qualified_reference.clone();
3945 table.0.truncate(2);
3946 curr_references.contains(&table)
3947 };
3948 text_columns.retain(column_referenced);
3949 exclude_columns.retain(column_referenced);
3950
3951 new_text_columns.extend(text_columns);
3953 new_exclude_columns.extend(exclude_columns);
3954
3955 if !new_text_columns.is_empty() {
3957 new_text_columns.sort();
3958 let new_text_columns = new_text_columns
3959 .into_iter()
3960 .map(WithOptionValue::UnresolvedItemName)
3961 .collect();
3962
3963 curr_options.push(SqlServerConfigOption {
3964 name: SqlServerConfigOptionName::TextColumns,
3965 value: Some(WithOptionValue::Sequence(new_text_columns)),
3966 });
3967 }
3968 if !new_exclude_columns.is_empty() {
3970 new_exclude_columns.sort();
3971 let new_exclude_columns = new_exclude_columns
3972 .into_iter()
3973 .map(WithOptionValue::UnresolvedItemName)
3974 .collect();
3975
3976 curr_options.push(SqlServerConfigOption {
3977 name: SqlServerConfigOptionName::ExcludeColumns,
3978 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3979 });
3980 }
3981 }
3982 _ => return Err(purification_err()),
3983 };
3984
3985 let mut catalog = self.catalog().for_system_session();
3986 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3987
3988 let plan = match mz_sql::plan::plan(
3990 None,
3991 &catalog,
3992 Statement::CreateSource(create_source_stmt),
3993 &Params::empty(),
3994 &resolved_ids,
3995 )
3996 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
3997 {
3998 Plan::CreateSource(plan) => plan,
3999 _ => unreachable!("create source plan is only valid response"),
4000 };
4001
4002 let source = Source::new(
4006 plan,
4007 cur_source.global_id,
4008 resolved_ids,
4009 cur_source.custom_logical_compaction_window,
4010 cur_source.is_retained_metrics_object,
4011 );
4012
4013 let desc = match &source.data_source {
4015 DataSourceDesc::Ingestion { desc, .. }
4016 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4017 desc.clone().into_inline_connection(self.catalog().state())
4018 }
4019 _ => unreachable!("already verified of type ingestion"),
4020 };
4021
4022 self.controller
4023 .storage
4024 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4025 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4026
4027 let mut ops = vec![catalog::Op::UpdateItem {
4030 id: item_id,
4031 name: self.catalog.get_entry(&item_id).name().clone(),
4034 to_item: CatalogItem::Source(source),
4035 }];
4036
4037 let CreateSourceInner {
4038 ops: new_ops,
4039 sources: _,
4040 if_not_exists_ids,
4041 } = self.create_source_inner(session, subsources).await?;
4042
4043 ops.extend(new_ops.into_iter());
4044
4045 assert!(
4046 if_not_exists_ids.is_empty(),
4047 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4048 );
4049
4050 self.catalog_transact(Some(session), ops).await?;
4051 }
4052 plan::AlterSourceAction::RefreshReferences { references } => {
4053 self.catalog_transact(
4054 Some(session),
4055 vec![catalog::Op::UpdateSourceReferences {
4056 source_id: item_id,
4057 references: references.into(),
4058 }],
4059 )
4060 .await?;
4061 }
4062 }
4063
4064 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4065 }
4066
4067 #[instrument]
4068 pub(super) async fn sequence_alter_system_set(
4069 &mut self,
4070 session: &Session,
4071 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4072 ) -> Result<ExecuteResponse, AdapterError> {
4073 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4074 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4076 self.validate_alter_system_network_policy(session, &value)?;
4077 }
4078
4079 let op = match value {
4080 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4081 name: name.clone(),
4082 value: OwnedVarInput::SqlSet(values),
4083 },
4084 plan::VariableValue::Default => {
4085 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4086 }
4087 };
4088 self.catalog_transact(Some(session), vec![op]).await?;
4089
4090 session.add_notice(AdapterNotice::VarDefaultUpdated {
4091 role: None,
4092 var_name: Some(name),
4093 });
4094 Ok(ExecuteResponse::AlteredSystemConfiguration)
4095 }
4096
4097 #[instrument]
4098 pub(super) async fn sequence_alter_system_reset(
4099 &mut self,
4100 session: &Session,
4101 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4102 ) -> Result<ExecuteResponse, AdapterError> {
4103 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4104 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4105 self.catalog_transact(Some(session), vec![op]).await?;
4106 session.add_notice(AdapterNotice::VarDefaultUpdated {
4107 role: None,
4108 var_name: Some(name),
4109 });
4110 Ok(ExecuteResponse::AlteredSystemConfiguration)
4111 }
4112
4113 #[instrument]
4114 pub(super) async fn sequence_alter_system_reset_all(
4115 &mut self,
4116 session: &Session,
4117 _: plan::AlterSystemResetAllPlan,
4118 ) -> Result<ExecuteResponse, AdapterError> {
4119 self.is_user_allowed_to_alter_system(session, None)?;
4120 let op = catalog::Op::ResetAllSystemConfiguration;
4121 self.catalog_transact(Some(session), vec![op]).await?;
4122 session.add_notice(AdapterNotice::VarDefaultUpdated {
4123 role: None,
4124 var_name: None,
4125 });
4126 Ok(ExecuteResponse::AlteredSystemConfiguration)
4127 }
4128
4129 fn is_user_allowed_to_alter_system(
4131 &self,
4132 session: &Session,
4133 var_name: Option<&str>,
4134 ) -> Result<(), AdapterError> {
4135 match (session.user().kind(), var_name) {
4136 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4138 (UserKind::Superuser, Some(name))
4140 if session.user().is_internal()
4141 || self.catalog().system_config().user_modifiable(name) =>
4142 {
4143 let var = self.catalog().system_config().get(name)?;
4146 var.visible(session.user(), self.catalog().system_config())?;
4147 Ok(())
4148 }
4149 (UserKind::Regular, Some(name))
4152 if self.catalog().system_config().user_modifiable(name) =>
4153 {
4154 Err(AdapterError::Unauthorized(
4155 rbac::UnauthorizedError::Superuser {
4156 action: format!("toggle the '{name}' system configuration parameter"),
4157 },
4158 ))
4159 }
4160 _ => Err(AdapterError::Unauthorized(
4161 rbac::UnauthorizedError::MzSystem {
4162 action: "alter system".into(),
4163 },
4164 )),
4165 }
4166 }
4167
4168 fn validate_alter_system_network_policy(
4169 &self,
4170 session: &Session,
4171 policy_value: &plan::VariableValue,
4172 ) -> Result<(), AdapterError> {
4173 let policy_name = match &policy_value {
4174 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4176 plan::VariableValue::Values(values) if values.len() == 1 => {
4177 values.iter().next().cloned()
4178 }
4179 plan::VariableValue::Values(values) => {
4180 tracing::warn!(?values, "can't set multiple network policies at once");
4181 None
4182 }
4183 };
4184 let maybe_network_policy = policy_name
4185 .as_ref()
4186 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4187 let Some(network_policy) = maybe_network_policy else {
4188 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4189 VarError::InvalidParameterValue {
4190 name: NETWORK_POLICY.name(),
4191 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4192 reason: "no network policy with such name exists".to_string(),
4193 },
4194 )));
4195 };
4196 self.validate_alter_network_policy(session, &network_policy.rules)
4197 }
4198
4199 fn validate_alter_network_policy(
4204 &self,
4205 session: &Session,
4206 policy_rules: &Vec<NetworkPolicyRule>,
4207 ) -> Result<(), AdapterError> {
4208 if session.user().is_internal() {
4211 return Ok(());
4212 }
4213 if let Some(ip) = session.meta().client_ip() {
4214 validate_ip_with_policy_rules(ip, policy_rules)
4215 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4216 } else {
4217 return Err(AdapterError::NetworkPolicyDenied(
4220 NetworkPolicyError::MissingIp,
4221 ));
4222 }
4223 Ok(())
4224 }
4225
4226 #[instrument]
4228 pub(super) fn sequence_execute(
4229 &self,
4230 session: &mut Session,
4231 plan: plan::ExecutePlan,
4232 ) -> Result<String, AdapterError> {
4233 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4235 let ps = session
4236 .get_prepared_statement_unverified(&plan.name)
4237 .expect("known to exist");
4238 let stmt = ps.stmt().cloned();
4239 let desc = ps.desc().clone();
4240 let state_revision = ps.state_revision;
4241 let logging = Arc::clone(ps.logging());
4242 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4243 }
4244
4245 #[instrument]
4246 pub(super) async fn sequence_grant_privileges(
4247 &mut self,
4248 session: &Session,
4249 plan::GrantPrivilegesPlan {
4250 update_privileges,
4251 grantees,
4252 }: plan::GrantPrivilegesPlan,
4253 ) -> Result<ExecuteResponse, AdapterError> {
4254 self.sequence_update_privileges(
4255 session,
4256 update_privileges,
4257 grantees,
4258 UpdatePrivilegeVariant::Grant,
4259 )
4260 .await
4261 }
4262
4263 #[instrument]
4264 pub(super) async fn sequence_revoke_privileges(
4265 &mut self,
4266 session: &Session,
4267 plan::RevokePrivilegesPlan {
4268 update_privileges,
4269 revokees,
4270 }: plan::RevokePrivilegesPlan,
4271 ) -> Result<ExecuteResponse, AdapterError> {
4272 self.sequence_update_privileges(
4273 session,
4274 update_privileges,
4275 revokees,
4276 UpdatePrivilegeVariant::Revoke,
4277 )
4278 .await
4279 }
4280
4281 #[instrument]
4282 async fn sequence_update_privileges(
4283 &mut self,
4284 session: &Session,
4285 update_privileges: Vec<UpdatePrivilege>,
4286 grantees: Vec<RoleId>,
4287 variant: UpdatePrivilegeVariant,
4288 ) -> Result<ExecuteResponse, AdapterError> {
4289 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4290 let mut warnings = Vec::new();
4291 let catalog = self.catalog().for_session(session);
4292
4293 for UpdatePrivilege {
4294 acl_mode,
4295 target_id,
4296 grantor,
4297 } in update_privileges
4298 {
4299 let actual_object_type = catalog.get_system_object_type(&target_id);
4300 if actual_object_type.is_relation() {
4303 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4304 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4305 if !non_applicable_privileges.is_empty() {
4306 let object_description =
4307 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4308 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4309 non_applicable_privileges,
4310 object_description,
4311 })
4312 }
4313 }
4314
4315 if let SystemObjectId::Object(object_id) = &target_id {
4316 self.catalog()
4317 .ensure_not_reserved_object(object_id, session.conn_id())?;
4318 }
4319
4320 let privileges = self
4321 .catalog()
4322 .get_privileges(&target_id, session.conn_id())
4323 .ok_or(AdapterError::Unsupported(
4326 "GRANTs/REVOKEs on an object type with no privileges",
4327 ))?;
4328
4329 for grantee in &grantees {
4330 self.catalog().ensure_not_system_role(grantee)?;
4331 self.catalog().ensure_not_predefined_role(grantee)?;
4332 let existing_privilege = privileges
4333 .get_acl_item(grantee, &grantor)
4334 .map(Cow::Borrowed)
4335 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4336
4337 match variant {
4338 UpdatePrivilegeVariant::Grant
4339 if !existing_privilege.acl_mode.contains(acl_mode) =>
4340 {
4341 ops.push(catalog::Op::UpdatePrivilege {
4342 target_id: target_id.clone(),
4343 privilege: MzAclItem {
4344 grantee: *grantee,
4345 grantor,
4346 acl_mode,
4347 },
4348 variant,
4349 });
4350 }
4351 UpdatePrivilegeVariant::Revoke
4352 if !existing_privilege
4353 .acl_mode
4354 .intersection(acl_mode)
4355 .is_empty() =>
4356 {
4357 ops.push(catalog::Op::UpdatePrivilege {
4358 target_id: target_id.clone(),
4359 privilege: MzAclItem {
4360 grantee: *grantee,
4361 grantor,
4362 acl_mode,
4363 },
4364 variant,
4365 });
4366 }
4367 _ => {}
4369 }
4370 }
4371 }
4372
4373 if ops.is_empty() {
4374 session.add_notices(warnings);
4375 return Ok(variant.into());
4376 }
4377
4378 let res = self
4379 .catalog_transact(Some(session), ops)
4380 .await
4381 .map(|_| match variant {
4382 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4383 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4384 });
4385 if res.is_ok() {
4386 session.add_notices(warnings);
4387 }
4388 res
4389 }
4390
4391 #[instrument]
4392 pub(super) async fn sequence_alter_default_privileges(
4393 &mut self,
4394 session: &Session,
4395 plan::AlterDefaultPrivilegesPlan {
4396 privilege_objects,
4397 privilege_acl_items,
4398 is_grant,
4399 }: plan::AlterDefaultPrivilegesPlan,
4400 ) -> Result<ExecuteResponse, AdapterError> {
4401 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4402 let variant = if is_grant {
4403 UpdatePrivilegeVariant::Grant
4404 } else {
4405 UpdatePrivilegeVariant::Revoke
4406 };
4407 for privilege_object in &privilege_objects {
4408 self.catalog()
4409 .ensure_not_system_role(&privilege_object.role_id)?;
4410 self.catalog()
4411 .ensure_not_predefined_role(&privilege_object.role_id)?;
4412 if let Some(database_id) = privilege_object.database_id {
4413 self.catalog()
4414 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4415 }
4416 if let Some(schema_id) = privilege_object.schema_id {
4417 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4418 let schema_spec: SchemaSpecifier = schema_id.into();
4419
4420 self.catalog().ensure_not_reserved_object(
4421 &(database_spec, schema_spec).into(),
4422 session.conn_id(),
4423 )?;
4424 }
4425 for privilege_acl_item in &privilege_acl_items {
4426 self.catalog()
4427 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4428 self.catalog()
4429 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4430 ops.push(catalog::Op::UpdateDefaultPrivilege {
4431 privilege_object: privilege_object.clone(),
4432 privilege_acl_item: privilege_acl_item.clone(),
4433 variant,
4434 })
4435 }
4436 }
4437
4438 self.catalog_transact(Some(session), ops).await?;
4439 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4440 }
4441
4442 #[instrument]
4443 pub(super) async fn sequence_grant_role(
4444 &mut self,
4445 session: &Session,
4446 plan::GrantRolePlan {
4447 role_ids,
4448 member_ids,
4449 grantor_id,
4450 }: plan::GrantRolePlan,
4451 ) -> Result<ExecuteResponse, AdapterError> {
4452 let catalog = self.catalog();
4453 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4454 for role_id in role_ids {
4455 for member_id in &member_ids {
4456 let member_membership: BTreeSet<_> =
4457 catalog.get_role(member_id).membership().keys().collect();
4458 if member_membership.contains(&role_id) {
4459 let role_name = catalog.get_role(&role_id).name().to_string();
4460 let member_name = catalog.get_role(member_id).name().to_string();
4461 catalog.ensure_not_reserved_role(member_id)?;
4463 catalog.ensure_grantable_role(&role_id)?;
4464 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4465 role_name,
4466 member_name,
4467 });
4468 } else {
4469 ops.push(catalog::Op::GrantRole {
4470 role_id,
4471 member_id: *member_id,
4472 grantor_id,
4473 });
4474 }
4475 }
4476 }
4477
4478 if ops.is_empty() {
4479 return Ok(ExecuteResponse::GrantedRole);
4480 }
4481
4482 self.catalog_transact(Some(session), ops)
4483 .await
4484 .map(|_| ExecuteResponse::GrantedRole)
4485 }
4486
4487 #[instrument]
4488 pub(super) async fn sequence_revoke_role(
4489 &mut self,
4490 session: &Session,
4491 plan::RevokeRolePlan {
4492 role_ids,
4493 member_ids,
4494 grantor_id,
4495 }: plan::RevokeRolePlan,
4496 ) -> Result<ExecuteResponse, AdapterError> {
4497 let catalog = self.catalog();
4498 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4499 for role_id in role_ids {
4500 for member_id in &member_ids {
4501 let member_membership: BTreeSet<_> =
4502 catalog.get_role(member_id).membership().keys().collect();
4503 if !member_membership.contains(&role_id) {
4504 let role_name = catalog.get_role(&role_id).name().to_string();
4505 let member_name = catalog.get_role(member_id).name().to_string();
4506 catalog.ensure_not_reserved_role(member_id)?;
4508 catalog.ensure_grantable_role(&role_id)?;
4509 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4510 role_name,
4511 member_name,
4512 });
4513 } else {
4514 ops.push(catalog::Op::RevokeRole {
4515 role_id,
4516 member_id: *member_id,
4517 grantor_id,
4518 });
4519 }
4520 }
4521 }
4522
4523 if ops.is_empty() {
4524 return Ok(ExecuteResponse::RevokedRole);
4525 }
4526
4527 self.catalog_transact(Some(session), ops)
4528 .await
4529 .map(|_| ExecuteResponse::RevokedRole)
4530 }
4531
4532 #[instrument]
4533 pub(super) async fn sequence_alter_owner(
4534 &mut self,
4535 session: &Session,
4536 plan::AlterOwnerPlan {
4537 id,
4538 object_type,
4539 new_owner,
4540 }: plan::AlterOwnerPlan,
4541 ) -> Result<ExecuteResponse, AdapterError> {
4542 let mut ops = vec![catalog::Op::UpdateOwner {
4543 id: id.clone(),
4544 new_owner,
4545 }];
4546
4547 match &id {
4548 ObjectId::Item(global_id) => {
4549 let entry = self.catalog().get_entry(global_id);
4550
4551 if entry.is_index() {
4553 let name = self
4554 .catalog()
4555 .resolve_full_name(entry.name(), Some(session.conn_id()))
4556 .to_string();
4557 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4558 return Ok(ExecuteResponse::AlteredObject(object_type));
4559 }
4560
4561 let dependent_index_ops = entry
4563 .used_by()
4564 .into_iter()
4565 .filter(|id| self.catalog().get_entry(id).is_index())
4566 .map(|id| catalog::Op::UpdateOwner {
4567 id: ObjectId::Item(*id),
4568 new_owner,
4569 });
4570 ops.extend(dependent_index_ops);
4571
4572 let dependent_subsources =
4574 entry
4575 .progress_id()
4576 .into_iter()
4577 .map(|item_id| catalog::Op::UpdateOwner {
4578 id: ObjectId::Item(item_id),
4579 new_owner,
4580 });
4581 ops.extend(dependent_subsources);
4582 }
4583 ObjectId::Cluster(cluster_id) => {
4584 let cluster = self.catalog().get_cluster(*cluster_id);
4585 let managed_cluster_replica_ops =
4587 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4588 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4589 new_owner,
4590 });
4591 ops.extend(managed_cluster_replica_ops);
4592 }
4593 _ => {}
4594 }
4595
4596 self.catalog_transact(Some(session), ops)
4597 .await
4598 .map(|_| ExecuteResponse::AlteredObject(object_type))
4599 }
4600
4601 #[instrument]
4602 pub(super) async fn sequence_reassign_owned(
4603 &mut self,
4604 session: &Session,
4605 plan::ReassignOwnedPlan {
4606 old_roles,
4607 new_role,
4608 reassign_ids,
4609 }: plan::ReassignOwnedPlan,
4610 ) -> Result<ExecuteResponse, AdapterError> {
4611 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4612 self.catalog().ensure_not_reserved_role(role_id)?;
4613 }
4614
4615 let ops = reassign_ids
4616 .into_iter()
4617 .map(|id| catalog::Op::UpdateOwner {
4618 id,
4619 new_owner: new_role,
4620 })
4621 .collect();
4622
4623 self.catalog_transact(Some(session), ops)
4624 .await
4625 .map(|_| ExecuteResponse::ReassignOwned)
4626 }
4627
4628 #[instrument]
4629 pub(crate) async fn handle_deferred_statement(&mut self) {
4630 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4634 return;
4635 };
4636 match ps {
4637 crate::coord::PlanStatement::Statement { stmt, params } => {
4638 self.handle_execute_inner(stmt, params, ctx).await;
4639 }
4640 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4641 self.sequence_plan(ctx, plan, resolved_ids).await;
4642 }
4643 }
4644 }
4645
4646 #[instrument]
4647 #[allow(clippy::unused_async)]
4649 pub(super) async fn sequence_alter_table(
4650 &mut self,
4651 ctx: &mut ExecuteContext,
4652 plan: plan::AlterTablePlan,
4653 ) -> Result<ExecuteResponse, AdapterError> {
4654 let plan::AlterTablePlan {
4655 relation_id,
4656 column_name,
4657 column_type,
4658 raw_sql_type,
4659 } = plan;
4660
4661 let id_ts = self.get_catalog_write_ts().await;
4663 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4664 let ops = vec![catalog::Op::AlterAddColumn {
4665 id: relation_id,
4666 new_global_id,
4667 name: column_name,
4668 typ: column_type,
4669 sql: raw_sql_type,
4670 }];
4671
4672 self.catalog_transact_with_context(None, Some(ctx), ops)
4673 .await?;
4674
4675 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4676 }
4677
4678 #[instrument]
4680 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4681 &mut self,
4682 ctx: ExecuteContext,
4683 plan: AlterMaterializedViewApplyReplacementPlan,
4684 ) {
4685 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4695
4696 let plan_validity = PlanValidity::new(
4697 self.catalog().transient_revision(),
4698 BTreeSet::from_iter([id, replacement_id]),
4699 None,
4700 None,
4701 ctx.session().role_metadata().clone(),
4702 );
4703
4704 let target = self.catalog.get_entry(&id);
4705 let target_gid = target.latest_global_id();
4706
4707 let replacement = self.catalog.get_entry(&replacement_id);
4708 let replacement_gid = replacement.latest_global_id();
4709
4710 let target_upper = self
4711 .controller
4712 .storage_collections
4713 .collection_frontiers(target_gid)
4714 .expect("target MV exists")
4715 .write_frontier;
4716 let replacement_upper = self
4717 .controller
4718 .compute
4719 .collection_frontiers(replacement_gid, replacement.cluster_id())
4720 .expect("replacement MV exists")
4721 .write_frontier;
4722
4723 info!(
4724 %id, %replacement_id, ?target_upper, ?replacement_upper,
4725 "preparing materialized view replacement application",
4726 );
4727
4728 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4729 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4738 name: target.name().item.clone(),
4739 }));
4740 return;
4741 };
4742
4743 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4747
4748 self.install_storage_watch_set(
4752 ctx.session().conn_id().clone(),
4753 BTreeSet::from_iter([target_gid]),
4754 replacement_upper_ts,
4755 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4756 ctx: Some(ctx),
4757 otel_ctx: OpenTelemetryContext::obtain(),
4758 plan,
4759 plan_validity,
4760 }),
4761 )
4762 .expect("target collection exists");
4763 }
4764
4765 #[instrument]
4767 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4768 &mut self,
4769 mut ctx: AlterMaterializedViewReadyContext,
4770 ) {
4771 ctx.otel_ctx.attach_as_parent();
4772
4773 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4774
4775 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4780 ctx.retire(Err(err));
4781 return;
4782 }
4783
4784 info!(
4785 %id, %replacement_id,
4786 "finishing materialized view replacement application",
4787 );
4788
4789 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4790 match self
4791 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4792 .await
4793 {
4794 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4795 ObjectType::MaterializedView,
4796 ))),
4797 Err(err) => ctx.retire(Err(err)),
4798 }
4799 }
4800
4801 pub(super) async fn statistics_oracle(
4802 &self,
4803 session: &Session,
4804 source_ids: &BTreeSet<GlobalId>,
4805 query_as_of: &Antichain<Timestamp>,
4806 is_oneshot: bool,
4807 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4808 super::statistics_oracle(
4809 session,
4810 source_ids,
4811 query_as_of,
4812 is_oneshot,
4813 self.catalog().system_config(),
4814 self.controller.storage_collections.as_ref(),
4815 )
4816 .await
4817 }
4818}
4819
4820impl Coordinator {
4821 async fn process_dataflow_metainfo(
4823 &mut self,
4824 df_meta: DataflowMetainfo,
4825 export_id: GlobalId,
4826 ctx: Option<&mut ExecuteContext>,
4827 notice_ids: Vec<GlobalId>,
4828 ) -> Option<BuiltinTableAppendNotify> {
4829 if let Some(ctx) = ctx {
4831 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4832 }
4833
4834 let df_meta = self
4836 .catalog()
4837 .render_notices(df_meta, notice_ids, Some(export_id));
4838
4839 if self.catalog().state().system_config().enable_mz_notices()
4842 && !df_meta.optimizer_notices.is_empty()
4843 {
4844 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4845 self.catalog().state().pack_optimizer_notices(
4846 &mut builtin_table_updates,
4847 df_meta.optimizer_notices.iter(),
4848 Diff::ONE,
4849 );
4850
4851 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4853
4854 Some(
4855 self.builtin_table_update()
4856 .execute(builtin_table_updates)
4857 .await
4858 .0,
4859 )
4860 } else {
4861 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4863
4864 None
4865 }
4866 }
4867}