1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43 RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48 SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58 SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62 StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106 WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114 WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = return_if_err!(res, ctx);
255 f(ctx, next);
256 }
257 _ = rx, if cancel_enabled => {
258 ctx.retire(Err(AdapterError::Canceled));
259 }
260 }
261 });
262 }
263
264 async fn create_source_inner(
265 &self,
266 session: &Session,
267 plans: Vec<plan::CreateSourcePlanBundle>,
268 ) -> Result<CreateSourceInner, AdapterError> {
269 let mut ops = vec![];
270 let mut sources = vec![];
271
272 let if_not_exists_ids = plans
273 .iter()
274 .filter_map(
275 |plan::CreateSourcePlanBundle {
276 item_id,
277 global_id: _,
278 plan,
279 resolved_ids: _,
280 available_source_references: _,
281 }| {
282 if plan.if_not_exists {
283 Some((*item_id, plan.name.clone()))
284 } else {
285 None
286 }
287 },
288 )
289 .collect::<BTreeMap<_, _>>();
290
291 for plan::CreateSourcePlanBundle {
292 item_id,
293 global_id,
294 mut plan,
295 resolved_ids,
296 available_source_references,
297 } in plans
298 {
299 let name = plan.name.clone();
300
301 match plan.source.data_source {
302 plan::DataSourceDesc::Ingestion(ref desc)
303 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304 let cluster_id = plan
305 .in_cluster
306 .expect("ingestion plans must specify cluster");
307 match desc.connection {
308 GenericSourceConnection::Postgres(_)
309 | GenericSourceConnection::MySql(_)
310 | GenericSourceConnection::SqlServer(_)
311 | GenericSourceConnection::Kafka(_)
312 | GenericSourceConnection::LoadGenerator(_) => {
313 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315 .get(self.catalog().system_config().dyncfgs());
316
317 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318 {
319 return Err(AdapterError::Unsupported(
320 "sources in clusters with >1 replicas",
321 ));
322 }
323 }
324 }
325 }
326 }
327 plan::DataSourceDesc::Webhook { .. } => {
328 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331 .get(self.catalog().system_config().dyncfgs());
332
333 if !enable_multi_replica_sources {
334 if cluster.replica_ids().len() > 1 {
335 return Err(AdapterError::Unsupported(
336 "webhook sources in clusters with >1 replicas",
337 ));
338 }
339 }
340 }
341 }
342 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343 }
344
345 if let mz_sql::plan::DataSourceDesc::Webhook {
347 validate_using: Some(validate),
348 ..
349 } = &mut plan.source.data_source
350 {
351 if let Err(reason) = validate.reduce_expression().await {
352 self.metrics
353 .webhook_validation_reduce_failures
354 .with_label_values(&[reason])
355 .inc();
356 return Err(AdapterError::Internal(format!(
357 "failed to reduce check expression, {reason}"
358 )));
359 }
360 }
361
362 let mut reference_ops = vec![];
365 if let Some(references) = &available_source_references {
366 reference_ops.push(catalog::Op::UpdateSourceReferences {
367 source_id: item_id,
368 references: references.clone().into(),
369 });
370 }
371
372 let source = Source::new(plan, global_id, resolved_ids, None, false);
373 ops.push(catalog::Op::CreateItem {
374 id: item_id,
375 name,
376 item: CatalogItem::Source(source.clone()),
377 owner_id: *session.current_role_id(),
378 });
379 sources.push((item_id, source));
380 ops.extend(reference_ops);
382 }
383
384 Ok(CreateSourceInner {
385 ops,
386 sources,
387 if_not_exists_ids,
388 })
389 }
390
391 pub(crate) fn plan_subsource(
399 &self,
400 session: &Session,
401 params: &mz_sql::plan::Params,
402 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403 item_id: CatalogItemId,
404 global_id: GlobalId,
405 ) -> Result<CreateSourcePlanBundle, AdapterError> {
406 let catalog = self.catalog().for_session(session);
407 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409 let plan = self.plan_statement(
410 session,
411 Statement::CreateSubsource(subsource_stmt),
412 params,
413 &resolved_ids,
414 )?;
415 let plan = match plan {
416 Plan::CreateSource(plan) => plan,
417 _ => unreachable!(),
418 };
419 Ok(CreateSourcePlanBundle {
420 item_id,
421 global_id,
422 plan,
423 resolved_ids,
424 available_source_references: None,
425 })
426 }
427
428 pub(crate) async fn plan_purified_alter_source_add_subsource(
430 &mut self,
431 session: &Session,
432 params: Params,
433 source_name: ResolvedItemName,
434 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436 ) -> Result<(Plan, ResolvedIds), AdapterError> {
437 let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439 let conn_catalog = self.catalog().for_system_session();
441 let pcx = plan::PlanContext::zero();
442 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444 let entry = self.catalog().get_entry(source_name.item_id());
445 let source = entry.source().ok_or_else(|| {
446 AdapterError::internal(
447 "plan alter source",
448 format!("expected Source found {entry:?}"),
449 )
450 })?;
451
452 let item_id = entry.id();
453 let ingestion_id = source.global_id();
454 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456 let ids = self
457 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
458 .await?;
459 for (subsource_stmt, (item_id, global_id)) in
460 subsource_stmts.into_iter().zip_eq(ids.into_iter())
461 {
462 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
463 subsource_plans.push(s);
464 }
465
466 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
467 subsources: subsource_plans,
468 options,
469 };
470
471 Ok((
472 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
473 item_id,
474 ingestion_id,
475 action,
476 }),
477 ResolvedIds::empty(),
478 ))
479 }
480
481 pub(crate) fn plan_purified_alter_source_refresh_references(
483 &self,
484 _session: &Session,
485 _params: Params,
486 source_name: ResolvedItemName,
487 available_source_references: plan::SourceReferences,
488 ) -> Result<(Plan, ResolvedIds), AdapterError> {
489 let entry = self.catalog().get_entry(source_name.item_id());
490 let source = entry.source().ok_or_else(|| {
491 AdapterError::internal(
492 "plan alter source",
493 format!("expected Source found {entry:?}"),
494 )
495 })?;
496 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
497 references: available_source_references,
498 };
499
500 Ok((
501 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
502 item_id: entry.id(),
503 ingestion_id: source.global_id(),
504 action,
505 }),
506 ResolvedIds::empty(),
507 ))
508 }
509
510 pub(crate) async fn plan_purified_create_source(
514 &mut self,
515 ctx: &ExecuteContext,
516 params: Params,
517 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
518 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
519 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
520 available_source_references: plan::SourceReferences,
521 ) -> Result<(Plan, ResolvedIds), AdapterError> {
522 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
523
524 if let Some(progress_stmt) = progress_stmt {
526 assert_none!(progress_stmt.of_source);
531 let (item_id, global_id) = self.allocate_user_id().await?;
532 let progress_plan =
533 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
534 let progress_full_name = self
535 .catalog()
536 .resolve_full_name(&progress_plan.plan.name, None);
537 let progress_subsource = ResolvedItemName::Item {
538 id: progress_plan.item_id,
539 qualifiers: progress_plan.plan.name.qualifiers.clone(),
540 full_name: progress_full_name,
541 print_id: true,
542 version: RelationVersionSelector::Latest,
543 };
544
545 create_source_plans.push(progress_plan);
546
547 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548 }
549
550 let catalog = self.catalog().for_session(ctx.session());
551 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553 let propagated_with_options: Vec<_> = source_stmt
554 .with_options
555 .iter()
556 .filter_map(|opt| match opt.name {
557 CreateSourceOptionName::TimestampInterval => None,
558 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559 name: CreateSubsourceOptionName::RetainHistory,
560 value: opt.value.clone(),
561 }),
562 })
563 .collect();
564
565 let source_plan = match self.plan_statement(
567 ctx.session(),
568 Statement::CreateSource(source_stmt),
569 ¶ms,
570 &resolved_ids,
571 )? {
572 Plan::CreateSource(plan) => plan,
573 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574 };
575
576 let (item_id, global_id) = self.allocate_user_id().await?;
577
578 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579 let of_source = ResolvedItemName::Item {
580 id: item_id,
581 qualifiers: source_plan.name.qualifiers.clone(),
582 full_name: source_full_name,
583 print_id: true,
584 version: RelationVersionSelector::Latest,
585 };
586
587 let conn_catalog = self.catalog().for_system_session();
589 let pcx = plan::PlanContext::zero();
590 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594 for subsource_stmt in subsource_stmts.iter_mut() {
595 subsource_stmt
596 .with_options
597 .extend(propagated_with_options.iter().cloned())
598 }
599
600 create_source_plans.push(CreateSourcePlanBundle {
601 item_id,
602 global_id,
603 plan: source_plan,
604 resolved_ids: resolved_ids.clone(),
605 available_source_references: Some(available_source_references),
606 });
607
608 let ids = self
610 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
611 .await?;
612 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
613 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
614 create_source_plans.push(plan);
615 }
616
617 Ok((
618 Plan::CreateSources(create_source_plans),
619 ResolvedIds::empty(),
620 ))
621 }
622
623 #[instrument]
624 pub(super) async fn sequence_create_source(
625 &mut self,
626 ctx: &mut ExecuteContext,
627 plans: Vec<plan::CreateSourcePlanBundle>,
628 ) -> Result<ExecuteResponse, AdapterError> {
629 let CreateSourceInner {
630 ops,
631 sources,
632 if_not_exists_ids,
633 } = self.create_source_inner(ctx.session(), plans).await?;
634
635 let transact_result = self
636 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
637 .await;
638
639 for (item_id, source) in &sources {
641 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
642 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
643 ctx.session()
644 .add_notice(AdapterNotice::WebhookSourceCreated { url });
645 }
646 }
647 }
648
649 match transact_result {
650 Ok(()) => Ok(ExecuteResponse::CreatedSource),
651 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
652 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
653 })) if if_not_exists_ids.contains_key(&id) => {
654 ctx.session()
655 .add_notice(AdapterNotice::ObjectAlreadyExists {
656 name: if_not_exists_ids[&id].item.clone(),
657 ty: "source",
658 });
659 Ok(ExecuteResponse::CreatedSource)
660 }
661 Err(err) => Err(err),
662 }
663 }
664
665 #[instrument]
666 pub(super) async fn sequence_create_connection(
667 &mut self,
668 mut ctx: ExecuteContext,
669 plan: plan::CreateConnectionPlan,
670 resolved_ids: ResolvedIds,
671 ) {
672 let (connection_id, connection_gid) = match self.allocate_user_id().await {
673 Ok(item_id) => item_id,
674 Err(err) => return ctx.retire(Err(err)),
675 };
676
677 match &plan.connection.details {
678 ConnectionDetails::Ssh { key_1, key_2, .. } => {
679 let key_1 = match key_1.as_key_pair() {
680 Some(key_1) => key_1.clone(),
681 None => {
682 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
683 "the PUBLIC KEY 1 option cannot be explicitly specified"
684 ))));
685 }
686 };
687
688 let key_2 = match key_2.as_key_pair() {
689 Some(key_2) => key_2.clone(),
690 None => {
691 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
692 "the PUBLIC KEY 2 option cannot be explicitly specified"
693 ))));
694 }
695 };
696
697 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
698 let secret = key_set.to_bytes();
699 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
700 return ctx.retire(Err(err.into()));
701 }
702 }
703 _ => (),
704 };
705
706 if plan.validate {
707 let internal_cmd_tx = self.internal_cmd_tx.clone();
708 let transient_revision = self.catalog().transient_revision();
709 let conn_id = ctx.session().conn_id().clone();
710 let otel_ctx = OpenTelemetryContext::obtain();
711 let role_metadata = ctx.session().role_metadata().clone();
712
713 let connection = plan
714 .connection
715 .details
716 .to_connection()
717 .into_inline_connection(self.catalog().state());
718
719 let current_storage_parameters = self.controller.storage.config().clone();
720 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
721 let result = match connection
722 .validate(connection_id, ¤t_storage_parameters)
723 .await
724 {
725 Ok(()) => Ok(plan),
726 Err(err) => Err(err.into()),
727 };
728
729 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
731 CreateConnectionValidationReady {
732 ctx,
733 result,
734 connection_id,
735 connection_gid,
736 plan_validity: PlanValidity::new(
737 transient_revision,
738 resolved_ids.items().copied().collect(),
739 None,
740 None,
741 role_metadata,
742 ),
743 otel_ctx,
744 resolved_ids: resolved_ids.clone(),
745 },
746 ));
747 if let Err(e) = result {
748 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
749 }
750 });
751 } else {
752 let result = self
753 .sequence_create_connection_stage_finish(
754 &mut ctx,
755 connection_id,
756 connection_gid,
757 plan,
758 resolved_ids,
759 )
760 .await;
761 ctx.retire(result);
762 }
763 }
764
765 #[instrument]
766 pub(crate) async fn sequence_create_connection_stage_finish(
767 &mut self,
768 ctx: &mut ExecuteContext,
769 connection_id: CatalogItemId,
770 connection_gid: GlobalId,
771 plan: plan::CreateConnectionPlan,
772 resolved_ids: ResolvedIds,
773 ) -> Result<ExecuteResponse, AdapterError> {
774 let ops = vec![catalog::Op::CreateItem {
775 id: connection_id,
776 name: plan.name.clone(),
777 item: CatalogItem::Connection(Connection {
778 create_sql: plan.connection.create_sql,
779 global_id: connection_gid,
780 details: plan.connection.details.clone(),
781 resolved_ids,
782 }),
783 owner_id: *ctx.session().current_role_id(),
784 }];
785
786 let conn_id = ctx.session().conn_id().clone();
789 let transact_result = self
790 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
791 .await;
792
793 match transact_result {
794 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
795 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
796 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
797 })) if plan.if_not_exists => {
798 ctx.session()
799 .add_notice(AdapterNotice::ObjectAlreadyExists {
800 name: plan.name.item,
801 ty: "connection",
802 });
803 Ok(ExecuteResponse::CreatedConnection)
804 }
805 Err(err) => Err(err),
806 }
807 }
808
809 #[instrument]
810 pub(super) async fn sequence_create_database(
811 &mut self,
812 session: &Session,
813 plan: plan::CreateDatabasePlan,
814 ) -> Result<ExecuteResponse, AdapterError> {
815 let ops = vec![catalog::Op::CreateDatabase {
816 name: plan.name.clone(),
817 owner_id: *session.current_role_id(),
818 }];
819 match self.catalog_transact(Some(session), ops).await {
820 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
821 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
822 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
823 })) if plan.if_not_exists => {
824 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
825 Ok(ExecuteResponse::CreatedDatabase)
826 }
827 Err(err) => Err(err),
828 }
829 }
830
831 #[instrument]
832 pub(super) async fn sequence_create_schema(
833 &mut self,
834 session: &Session,
835 plan: plan::CreateSchemaPlan,
836 ) -> Result<ExecuteResponse, AdapterError> {
837 let op = catalog::Op::CreateSchema {
838 database_id: plan.database_spec,
839 schema_name: plan.schema_name.clone(),
840 owner_id: *session.current_role_id(),
841 };
842 match self.catalog_transact(Some(session), vec![op]).await {
843 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
844 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
845 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
846 })) if plan.if_not_exists => {
847 session.add_notice(AdapterNotice::SchemaAlreadyExists {
848 name: plan.schema_name,
849 });
850 Ok(ExecuteResponse::CreatedSchema)
851 }
852 Err(err) => Err(err),
853 }
854 }
855
856 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
858 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
859 if attributes.superuser.is_some()
860 || attributes.password.is_some()
861 || attributes.login.is_some()
862 {
863 return Err(AdapterError::UnavailableFeature {
864 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
865 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
866 });
867 }
868 }
869 Ok(())
870 }
871
872 #[instrument]
873 pub(super) async fn sequence_create_role(
874 &mut self,
875 conn_id: Option<&ConnectionId>,
876 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
877 ) -> Result<ExecuteResponse, AdapterError> {
878 self.validate_role_attributes(&attributes.clone())?;
879 let op = catalog::Op::CreateRole { name, attributes };
880 self.catalog_transact_with_context(conn_id, None, vec![op])
881 .await
882 .map(|_| ExecuteResponse::CreatedRole)
883 }
884
885 #[instrument]
886 pub(super) async fn sequence_create_network_policy(
887 &mut self,
888 session: &Session,
889 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
890 ) -> Result<ExecuteResponse, AdapterError> {
891 let op = catalog::Op::CreateNetworkPolicy {
892 rules,
893 name,
894 owner_id: *session.current_role_id(),
895 };
896 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
897 .await
898 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
899 }
900
901 #[instrument]
902 pub(super) async fn sequence_alter_network_policy(
903 &mut self,
904 session: &Session,
905 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
906 ) -> Result<ExecuteResponse, AdapterError> {
907 let current_network_policy_name =
909 self.catalog().system_config().default_network_policy_name();
910 if current_network_policy_name == name {
912 self.validate_alter_network_policy(session, &rules)?;
913 }
914
915 let op = catalog::Op::AlterNetworkPolicy {
916 id,
917 rules,
918 name,
919 owner_id: *session.current_role_id(),
920 };
921 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
922 .await
923 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
924 }
925
926 #[instrument]
927 pub(super) async fn sequence_create_table(
928 &mut self,
929 ctx: &mut ExecuteContext,
930 plan: plan::CreateTablePlan,
931 resolved_ids: ResolvedIds,
932 ) -> Result<ExecuteResponse, AdapterError> {
933 let plan::CreateTablePlan {
934 name,
935 table,
936 if_not_exists,
937 } = plan;
938
939 let conn_id = if table.temporary {
940 Some(ctx.session().conn_id())
941 } else {
942 None
943 };
944 let (table_id, global_id) = self.allocate_user_id().await?;
945 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
946
947 let data_source = match table.data_source {
948 plan::TableDataSource::TableWrites { defaults } => {
949 TableDataSource::TableWrites { defaults }
950 }
951 plan::TableDataSource::DataSource {
952 desc: data_source_plan,
953 timeline,
954 } => match data_source_plan {
955 plan::DataSourceDesc::IngestionExport {
956 ingestion_id,
957 external_reference,
958 details,
959 data_config,
960 } => TableDataSource::DataSource {
961 desc: DataSourceDesc::IngestionExport {
962 ingestion_id,
963 external_reference,
964 details,
965 data_config,
966 },
967 timeline,
968 },
969 plan::DataSourceDesc::Webhook {
970 validate_using,
971 body_format,
972 headers,
973 cluster_id,
974 } => TableDataSource::DataSource {
975 desc: DataSourceDesc::Webhook {
976 validate_using,
977 body_format,
978 headers,
979 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
980 },
981 timeline,
982 },
983 o => {
984 unreachable!("CREATE TABLE data source got {:?}", o)
985 }
986 },
987 };
988
989 let is_webhook = if let TableDataSource::DataSource {
990 desc: DataSourceDesc::Webhook { .. },
991 timeline: _,
992 } = &data_source
993 {
994 true
995 } else {
996 false
997 };
998
999 let table = Table {
1000 create_sql: Some(table.create_sql),
1001 desc: table.desc,
1002 collections,
1003 conn_id: conn_id.cloned(),
1004 resolved_ids,
1005 custom_logical_compaction_window: table.compaction_window,
1006 is_retained_metrics_object: false,
1007 data_source,
1008 };
1009 let ops = vec![catalog::Op::CreateItem {
1010 id: table_id,
1011 name: name.clone(),
1012 item: CatalogItem::Table(table.clone()),
1013 owner_id: *ctx.session().current_role_id(),
1014 }];
1015
1016 let catalog_result = self
1017 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1018 .await;
1019
1020 if is_webhook {
1021 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1024 ctx.session()
1025 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1026 }
1027 }
1028
1029 match catalog_result {
1030 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1031 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1032 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1033 })) if if_not_exists => {
1034 ctx.session_mut()
1035 .add_notice(AdapterNotice::ObjectAlreadyExists {
1036 name: name.item,
1037 ty: "table",
1038 });
1039 Ok(ExecuteResponse::CreatedTable)
1040 }
1041 Err(err) => Err(err),
1042 }
1043 }
1044
1045 #[instrument]
1046 pub(super) async fn sequence_create_sink(
1047 &mut self,
1048 ctx: ExecuteContext,
1049 plan: plan::CreateSinkPlan,
1050 resolved_ids: ResolvedIds,
1051 ) {
1052 let plan::CreateSinkPlan {
1053 name,
1054 sink,
1055 with_snapshot,
1056 if_not_exists,
1057 in_cluster,
1058 } = plan;
1059
1060 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1062
1063 let catalog_sink = Sink {
1064 create_sql: sink.create_sql,
1065 global_id,
1066 from: sink.from,
1067 connection: sink.connection,
1068 envelope: sink.envelope,
1069 version: sink.version,
1070 with_snapshot,
1071 resolved_ids,
1072 cluster_id: in_cluster,
1073 commit_interval: sink.commit_interval,
1074 };
1075
1076 let ops = vec![catalog::Op::CreateItem {
1077 id: item_id,
1078 name: name.clone(),
1079 item: CatalogItem::Sink(catalog_sink.clone()),
1080 owner_id: *ctx.session().current_role_id(),
1081 }];
1082
1083 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1084
1085 match result {
1086 Ok(()) => {}
1087 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1088 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1089 })) if if_not_exists => {
1090 ctx.session()
1091 .add_notice(AdapterNotice::ObjectAlreadyExists {
1092 name: name.item,
1093 ty: "sink",
1094 });
1095 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1096 return;
1097 }
1098 Err(e) => {
1099 ctx.retire(Err(e));
1100 return;
1101 }
1102 };
1103
1104 self.create_storage_export(global_id, &catalog_sink)
1105 .await
1106 .unwrap_or_terminate("cannot fail to create exports");
1107
1108 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1109 .await;
1110
1111 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1112 }
1113
1114 pub(super) fn validate_system_column_references(
1137 &self,
1138 uses_ambiguous_columns: bool,
1139 depends_on: &BTreeSet<GlobalId>,
1140 ) -> Result<(), AdapterError> {
1141 if uses_ambiguous_columns
1142 && depends_on
1143 .iter()
1144 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1145 {
1146 Err(AdapterError::AmbiguousSystemColumnReference)
1147 } else {
1148 Ok(())
1149 }
1150 }
1151
1152 #[instrument]
1153 pub(super) async fn sequence_create_type(
1154 &mut self,
1155 session: &Session,
1156 plan: plan::CreateTypePlan,
1157 resolved_ids: ResolvedIds,
1158 ) -> Result<ExecuteResponse, AdapterError> {
1159 let (item_id, global_id) = self.allocate_user_id().await?;
1160 plan.typ
1162 .inner
1163 .desc(&self.catalog().for_session(session))
1164 .map_err(AdapterError::from)?;
1165 let typ = Type {
1166 create_sql: Some(plan.typ.create_sql),
1167 global_id,
1168 details: CatalogTypeDetails {
1169 array_id: None,
1170 typ: plan.typ.inner,
1171 pg_metadata: None,
1172 },
1173 resolved_ids,
1174 };
1175 let op = catalog::Op::CreateItem {
1176 id: item_id,
1177 name: plan.name,
1178 item: CatalogItem::Type(typ),
1179 owner_id: *session.current_role_id(),
1180 };
1181 match self.catalog_transact(Some(session), vec![op]).await {
1182 Ok(()) => Ok(ExecuteResponse::CreatedType),
1183 Err(err) => Err(err),
1184 }
1185 }
1186
1187 #[instrument]
1188 pub(super) async fn sequence_comment_on(
1189 &mut self,
1190 session: &Session,
1191 plan: plan::CommentPlan,
1192 ) -> Result<ExecuteResponse, AdapterError> {
1193 let op = catalog::Op::Comment {
1194 object_id: plan.object_id,
1195 sub_component: plan.sub_component,
1196 comment: plan.comment,
1197 };
1198 self.catalog_transact(Some(session), vec![op]).await?;
1199 Ok(ExecuteResponse::Comment)
1200 }
1201
1202 #[instrument]
1203 pub(super) async fn sequence_drop_objects(
1204 &mut self,
1205 ctx: &mut ExecuteContext,
1206 plan::DropObjectsPlan {
1207 drop_ids,
1208 object_type,
1209 referenced_ids,
1210 }: plan::DropObjectsPlan,
1211 ) -> Result<ExecuteResponse, AdapterError> {
1212 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1213 let mut objects = Vec::new();
1214 for obj_id in &drop_ids {
1215 if !referenced_ids_hashset.contains(obj_id) {
1216 let object_info = ErrorMessageObjectDescription::from_id(
1217 obj_id,
1218 &self.catalog().for_session(ctx.session()),
1219 )
1220 .to_string();
1221 objects.push(object_info);
1222 }
1223 }
1224
1225 if !objects.is_empty() {
1226 ctx.session()
1227 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1228 }
1229
1230 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1232 .iter()
1233 .filter_map(|id| match id {
1234 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1235 _ => None,
1236 })
1237 .flatten()
1238 .collect();
1239
1240 let DropOps {
1241 ops,
1242 dropped_active_db,
1243 dropped_active_cluster,
1244 dropped_in_use_indexes,
1245 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1246
1247 self.catalog_transact_with_context(None, Some(ctx), ops)
1248 .await?;
1249
1250 if !expr_cache_invalidate_ids.is_empty() {
1252 let _fut = self.catalog().update_expression_cache(
1253 Default::default(),
1254 Default::default(),
1255 expr_cache_invalidate_ids,
1256 );
1257 }
1258
1259 fail::fail_point!("after_sequencer_drop_replica");
1260
1261 if dropped_active_db {
1262 ctx.session()
1263 .add_notice(AdapterNotice::DroppedActiveDatabase {
1264 name: ctx.session().vars().database().to_string(),
1265 });
1266 }
1267 if dropped_active_cluster {
1268 ctx.session()
1269 .add_notice(AdapterNotice::DroppedActiveCluster {
1270 name: ctx.session().vars().cluster().to_string(),
1271 });
1272 }
1273 for dropped_in_use_index in dropped_in_use_indexes {
1274 ctx.session()
1275 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1276 self.metrics
1277 .optimization_notices
1278 .with_label_values(&["DroppedInUseIndex"])
1279 .inc_by(1);
1280 }
1281 Ok(ExecuteResponse::DroppedObject(object_type))
1282 }
1283
1284 fn validate_dropped_role_ownership(
1285 &self,
1286 session: &Session,
1287 dropped_roles: &BTreeMap<RoleId, &str>,
1288 ) -> Result<(), AdapterError> {
1289 fn privilege_check(
1290 privileges: &PrivilegeMap,
1291 dropped_roles: &BTreeMap<RoleId, &str>,
1292 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1293 object_id: &SystemObjectId,
1294 catalog: &ConnCatalog,
1295 ) {
1296 for privilege in privileges.all_values() {
1297 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1298 let grantor_name = catalog.get_role(&privilege.grantor).name();
1299 let object_description =
1300 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1301 dependent_objects
1302 .entry(role_name.to_string())
1303 .or_default()
1304 .push(format!(
1305 "privileges on {object_description} granted by {grantor_name}",
1306 ));
1307 }
1308 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1309 let grantee_name = catalog.get_role(&privilege.grantee).name();
1310 let object_description =
1311 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1312 dependent_objects
1313 .entry(role_name.to_string())
1314 .or_default()
1315 .push(format!(
1316 "privileges granted on {object_description} to {grantee_name}"
1317 ));
1318 }
1319 }
1320 }
1321
1322 let catalog = self.catalog().for_session(session);
1323 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1324 for entry in self.catalog.entries() {
1325 let id = SystemObjectId::Object(entry.id().into());
1326 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1327 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1328 dependent_objects
1329 .entry(role_name.to_string())
1330 .or_default()
1331 .push(format!("owner of {object_description}"));
1332 }
1333 privilege_check(
1334 entry.privileges(),
1335 dropped_roles,
1336 &mut dependent_objects,
1337 &id,
1338 &catalog,
1339 );
1340 }
1341 for database in self.catalog.databases() {
1342 let database_id = SystemObjectId::Object(database.id().into());
1343 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1344 let object_description =
1345 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1346 dependent_objects
1347 .entry(role_name.to_string())
1348 .or_default()
1349 .push(format!("owner of {object_description}"));
1350 }
1351 privilege_check(
1352 &database.privileges,
1353 dropped_roles,
1354 &mut dependent_objects,
1355 &database_id,
1356 &catalog,
1357 );
1358 for schema in database.schemas_by_id.values() {
1359 let schema_id = SystemObjectId::Object(
1360 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1361 );
1362 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1363 let object_description =
1364 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1365 dependent_objects
1366 .entry(role_name.to_string())
1367 .or_default()
1368 .push(format!("owner of {object_description}"));
1369 }
1370 privilege_check(
1371 &schema.privileges,
1372 dropped_roles,
1373 &mut dependent_objects,
1374 &schema_id,
1375 &catalog,
1376 );
1377 }
1378 }
1379 for cluster in self.catalog.clusters() {
1380 let cluster_id = SystemObjectId::Object(cluster.id().into());
1381 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1382 let object_description =
1383 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1384 dependent_objects
1385 .entry(role_name.to_string())
1386 .or_default()
1387 .push(format!("owner of {object_description}"));
1388 }
1389 privilege_check(
1390 &cluster.privileges,
1391 dropped_roles,
1392 &mut dependent_objects,
1393 &cluster_id,
1394 &catalog,
1395 );
1396 for replica in cluster.replicas() {
1397 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1398 let replica_id =
1399 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1400 let object_description =
1401 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1402 dependent_objects
1403 .entry(role_name.to_string())
1404 .or_default()
1405 .push(format!("owner of {object_description}"));
1406 }
1407 }
1408 }
1409 privilege_check(
1410 self.catalog().system_privileges(),
1411 dropped_roles,
1412 &mut dependent_objects,
1413 &SystemObjectId::System,
1414 &catalog,
1415 );
1416 for (default_privilege_object, default_privilege_acl_items) in
1417 self.catalog.default_privileges()
1418 {
1419 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1420 dependent_objects
1421 .entry(role_name.to_string())
1422 .or_default()
1423 .push(format!(
1424 "default privileges on {}S created by {}",
1425 default_privilege_object.object_type, role_name
1426 ));
1427 }
1428 for default_privilege_acl_item in default_privilege_acl_items {
1429 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1430 dependent_objects
1431 .entry(role_name.to_string())
1432 .or_default()
1433 .push(format!(
1434 "default privileges on {}S granted to {}",
1435 default_privilege_object.object_type, role_name
1436 ));
1437 }
1438 }
1439 }
1440
1441 if !dependent_objects.is_empty() {
1442 Err(AdapterError::DependentObject(dependent_objects))
1443 } else {
1444 Ok(())
1445 }
1446 }
1447
1448 #[instrument]
1449 pub(super) async fn sequence_drop_owned(
1450 &mut self,
1451 session: &Session,
1452 plan: plan::DropOwnedPlan,
1453 ) -> Result<ExecuteResponse, AdapterError> {
1454 for role_id in &plan.role_ids {
1455 self.catalog().ensure_not_reserved_role(role_id)?;
1456 }
1457
1458 let mut privilege_revokes = plan.privilege_revokes;
1459
1460 let session_catalog = self.catalog().for_session(session);
1462 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1463 && !session.is_superuser()
1464 {
1465 let role_membership =
1467 session_catalog.collect_role_membership(session.current_role_id());
1468 let invalid_revokes: BTreeSet<_> = privilege_revokes
1469 .extract_if(.., |(_, privilege)| {
1470 !role_membership.contains(&privilege.grantor)
1471 })
1472 .map(|(object_id, _)| object_id)
1473 .collect();
1474 for invalid_revoke in invalid_revokes {
1475 let object_description =
1476 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1477 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1478 }
1479 }
1480
1481 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1482 catalog::Op::UpdatePrivilege {
1483 target_id: object_id,
1484 privilege,
1485 variant: UpdatePrivilegeVariant::Revoke,
1486 }
1487 });
1488 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1489 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1490 privilege_object,
1491 privilege_acl_item,
1492 variant: UpdatePrivilegeVariant::Revoke,
1493 },
1494 );
1495 let DropOps {
1496 ops: drop_ops,
1497 dropped_active_db,
1498 dropped_active_cluster,
1499 dropped_in_use_indexes,
1500 } = self.sequence_drop_common(session, plan.drop_ids)?;
1501
1502 let ops = privilege_revoke_ops
1503 .chain(default_privilege_revoke_ops)
1504 .chain(drop_ops.into_iter())
1505 .collect();
1506
1507 self.catalog_transact(Some(session), ops).await?;
1508
1509 if dropped_active_db {
1510 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1511 name: session.vars().database().to_string(),
1512 });
1513 }
1514 if dropped_active_cluster {
1515 session.add_notice(AdapterNotice::DroppedActiveCluster {
1516 name: session.vars().cluster().to_string(),
1517 });
1518 }
1519 for dropped_in_use_index in dropped_in_use_indexes {
1520 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1521 }
1522 Ok(ExecuteResponse::DroppedOwned)
1523 }
1524
1525 fn sequence_drop_common(
1526 &self,
1527 session: &Session,
1528 ids: Vec<ObjectId>,
1529 ) -> Result<DropOps, AdapterError> {
1530 let mut dropped_active_db = false;
1531 let mut dropped_active_cluster = false;
1532 let mut dropped_in_use_indexes = Vec::new();
1533 let mut dropped_roles = BTreeMap::new();
1534 let mut dropped_databases = BTreeSet::new();
1535 let mut dropped_schemas = BTreeSet::new();
1536 let mut role_revokes = BTreeSet::new();
1540 let mut default_privilege_revokes = BTreeSet::new();
1543
1544 let mut clusters_to_drop = BTreeSet::new();
1546
1547 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1548 for id in &ids {
1549 match id {
1550 ObjectId::Database(id) => {
1551 let name = self.catalog().get_database(id).name();
1552 if name == session.vars().database() {
1553 dropped_active_db = true;
1554 }
1555 dropped_databases.insert(id);
1556 }
1557 ObjectId::Schema((_, spec)) => {
1558 if let SchemaSpecifier::Id(id) = spec {
1559 dropped_schemas.insert(id);
1560 }
1561 }
1562 ObjectId::Cluster(id) => {
1563 clusters_to_drop.insert(*id);
1564 if let Some(active_id) = self
1565 .catalog()
1566 .active_cluster(session)
1567 .ok()
1568 .map(|cluster| cluster.id())
1569 {
1570 if id == &active_id {
1571 dropped_active_cluster = true;
1572 }
1573 }
1574 }
1575 ObjectId::Role(id) => {
1576 let role = self.catalog().get_role(id);
1577 let name = role.name();
1578 dropped_roles.insert(*id, name);
1579 for (group_id, grantor_id) in &role.membership.map {
1581 role_revokes.insert((*group_id, *id, *grantor_id));
1582 }
1583 }
1584 ObjectId::Item(id) => {
1585 if let Some(index) = self.catalog().get_entry(id).index() {
1586 let humanizer = self.catalog().for_session(session);
1587 let dependants = self
1588 .controller
1589 .compute
1590 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1591 .ok()
1592 .into_iter()
1593 .flatten()
1594 .filter(|dependant_id| {
1595 if dependant_id.is_transient() {
1602 return false;
1603 }
1604 let Some(dependent_id) = humanizer
1606 .try_get_item_by_global_id(dependant_id)
1607 .map(|item| item.id())
1608 else {
1609 return false;
1610 };
1611 !ids_set.contains(&ObjectId::Item(dependent_id))
1614 })
1615 .flat_map(|dependant_id| {
1616 humanizer.humanize_id(dependant_id)
1620 })
1621 .collect_vec();
1622 if !dependants.is_empty() {
1623 dropped_in_use_indexes.push(DroppedInUseIndex {
1624 index_name: humanizer
1625 .humanize_id(index.global_id())
1626 .unwrap_or_else(|| id.to_string()),
1627 dependant_objects: dependants,
1628 });
1629 }
1630 }
1631 }
1632 _ => {}
1633 }
1634 }
1635
1636 for id in &ids {
1637 match id {
1638 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1642 if !clusters_to_drop.contains(cluster_id) {
1643 let cluster = self.catalog.get_cluster(*cluster_id);
1644 if cluster.is_managed() {
1645 let replica =
1646 cluster.replica(*replica_id).expect("Catalog out of sync");
1647 if !replica.config.location.internal() {
1648 coord_bail!("cannot drop replica of managed cluster");
1649 }
1650 }
1651 }
1652 }
1653 _ => {}
1654 }
1655 }
1656
1657 for role_id in dropped_roles.keys() {
1658 self.catalog().ensure_not_reserved_role(role_id)?;
1659 }
1660 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1661 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1663 for role in self.catalog().user_roles() {
1664 for dropped_role_id in
1665 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1666 {
1667 role_revokes.insert((
1668 **dropped_role_id,
1669 role.id(),
1670 *role
1671 .membership
1672 .map
1673 .get(*dropped_role_id)
1674 .expect("included in keys above"),
1675 ));
1676 }
1677 }
1678
1679 for (default_privilege_object, default_privilege_acls) in
1680 self.catalog().default_privileges()
1681 {
1682 if matches!(
1683 &default_privilege_object.database_id,
1684 Some(database_id) if dropped_databases.contains(database_id),
1685 ) || matches!(
1686 &default_privilege_object.schema_id,
1687 Some(schema_id) if dropped_schemas.contains(schema_id),
1688 ) {
1689 for default_privilege_acl in default_privilege_acls {
1690 default_privilege_revokes.insert((
1691 default_privilege_object.clone(),
1692 default_privilege_acl.clone(),
1693 ));
1694 }
1695 }
1696 }
1697
1698 let ops = role_revokes
1699 .into_iter()
1700 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1701 role_id,
1702 member_id,
1703 grantor_id,
1704 })
1705 .chain(default_privilege_revokes.into_iter().map(
1706 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1707 privilege_object,
1708 privilege_acl_item,
1709 variant: UpdatePrivilegeVariant::Revoke,
1710 },
1711 ))
1712 .chain(iter::once(catalog::Op::DropObjects(
1713 ids.into_iter()
1714 .map(DropObjectInfo::manual_drop_from_object_id)
1715 .collect(),
1716 )))
1717 .collect();
1718
1719 Ok(DropOps {
1720 ops,
1721 dropped_active_db,
1722 dropped_active_cluster,
1723 dropped_in_use_indexes,
1724 })
1725 }
1726
1727 pub(super) fn sequence_explain_schema(
1728 &self,
1729 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1730 ) -> Result<ExecuteResponse, AdapterError> {
1731 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1732 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1733 })?;
1734
1735 let json_string = json_string(&json_value);
1736 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1737 Ok(Self::send_immediate_rows(row))
1738 }
1739
1740 pub(super) fn sequence_show_all_variables(
1741 &self,
1742 session: &Session,
1743 ) -> Result<ExecuteResponse, AdapterError> {
1744 let mut rows = viewable_variables(self.catalog().state(), session)
1745 .map(|v| (v.name(), v.value(), v.description()))
1746 .collect::<Vec<_>>();
1747 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1748
1749 let rows: Vec<_> = rows
1751 .into_iter()
1752 .map(|(name, val, desc)| {
1753 Row::pack_slice(&[
1754 Datum::String(name),
1755 Datum::String(&val),
1756 Datum::String(desc),
1757 ])
1758 })
1759 .collect();
1760 Ok(Self::send_immediate_rows(rows))
1761 }
1762
1763 pub(super) fn sequence_show_variable(
1764 &self,
1765 session: &Session,
1766 plan: plan::ShowVariablePlan,
1767 ) -> Result<ExecuteResponse, AdapterError> {
1768 if &plan.name == SCHEMA_ALIAS {
1769 let schemas = self.catalog.resolve_search_path(session);
1770 let schema = schemas.first();
1771 return match schema {
1772 Some((database_spec, schema_spec)) => {
1773 let schema_name = &self
1774 .catalog
1775 .get_schema(database_spec, schema_spec, session.conn_id())
1776 .name()
1777 .schema;
1778 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1779 Ok(Self::send_immediate_rows(row))
1780 }
1781 None => {
1782 if session.vars().current_object_missing_warnings() {
1783 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1784 search_path: session
1785 .vars()
1786 .search_path()
1787 .into_iter()
1788 .map(|schema| schema.to_string())
1789 .collect(),
1790 });
1791 }
1792 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1793 }
1794 };
1795 }
1796
1797 let variable = session
1798 .vars()
1799 .get(self.catalog().system_config(), &plan.name)
1800 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1801
1802 variable.visible(session.user(), self.catalog().system_config())?;
1805
1806 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1807 if variable.name() == vars::DATABASE.name()
1808 && matches!(
1809 self.catalog().resolve_database(&variable.value()),
1810 Err(CatalogError::UnknownDatabase(_))
1811 )
1812 && session.vars().current_object_missing_warnings()
1813 {
1814 let name = variable.value();
1815 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1816 } else if variable.name() == vars::CLUSTER.name()
1817 && matches!(
1818 self.catalog().resolve_cluster(&variable.value()),
1819 Err(CatalogError::UnknownCluster(_))
1820 )
1821 && session.vars().current_object_missing_warnings()
1822 {
1823 let name = variable.value();
1824 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1825 }
1826 Ok(Self::send_immediate_rows(row))
1827 }
1828
1829 #[instrument]
1830 pub(super) async fn sequence_inspect_shard(
1831 &self,
1832 session: &Session,
1833 plan: plan::InspectShardPlan,
1834 ) -> Result<ExecuteResponse, AdapterError> {
1835 if !session.user().is_internal() {
1838 return Err(AdapterError::Unauthorized(
1839 rbac::UnauthorizedError::MzSystem {
1840 action: "inspect".into(),
1841 },
1842 ));
1843 }
1844 let state = self
1845 .controller
1846 .storage
1847 .inspect_persist_state(plan.id)
1848 .await?;
1849 let jsonb = Jsonb::from_serde_json(state)?;
1850 Ok(Self::send_immediate_rows(jsonb.into_row()))
1851 }
1852
1853 #[instrument]
1854 pub(super) fn sequence_set_variable(
1855 &self,
1856 session: &mut Session,
1857 plan: plan::SetVariablePlan,
1858 ) -> Result<ExecuteResponse, AdapterError> {
1859 let (name, local) = (plan.name, plan.local);
1860 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1861 self.validate_set_isolation_level(session)?;
1862 }
1863 if &name == vars::CLUSTER.name() {
1864 self.validate_set_cluster(session)?;
1865 }
1866
1867 let vars = session.vars_mut();
1868 let values = match plan.value {
1869 plan::VariableValue::Default => None,
1870 plan::VariableValue::Values(values) => Some(values),
1871 };
1872
1873 match values {
1874 Some(values) => {
1875 vars.set(
1876 self.catalog().system_config(),
1877 &name,
1878 VarInput::SqlSet(&values),
1879 local,
1880 )?;
1881
1882 let vars = session.vars();
1883
1884 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1887 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1888 } else if name == vars::CLUSTER.name()
1889 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1890 {
1891 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1892 }
1893
1894 if name.as_str() == vars::DATABASE.name()
1896 && matches!(
1897 self.catalog().resolve_database(vars.database()),
1898 Err(CatalogError::UnknownDatabase(_))
1899 )
1900 && session.vars().current_object_missing_warnings()
1901 {
1902 let name = vars.database().to_string();
1903 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1904 } else if name.as_str() == vars::CLUSTER.name()
1905 && matches!(
1906 self.catalog().resolve_cluster(vars.cluster()),
1907 Err(CatalogError::UnknownCluster(_))
1908 )
1909 && session.vars().current_object_missing_warnings()
1910 {
1911 let name = vars.cluster().to_string();
1912 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1913 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1914 let v = values.into_first().to_lowercase();
1915 if v == IsolationLevel::ReadUncommitted.as_str()
1916 || v == IsolationLevel::ReadCommitted.as_str()
1917 || v == IsolationLevel::RepeatableRead.as_str()
1918 {
1919 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1920 isolation_level: v,
1921 });
1922 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1923 session.add_notice(AdapterNotice::StrongSessionSerializable);
1924 }
1925 }
1926 }
1927 None => vars.reset(self.catalog().system_config(), &name, local)?,
1928 }
1929
1930 Ok(ExecuteResponse::SetVariable { name, reset: false })
1931 }
1932
1933 pub(super) fn sequence_reset_variable(
1934 &self,
1935 session: &mut Session,
1936 plan: plan::ResetVariablePlan,
1937 ) -> Result<ExecuteResponse, AdapterError> {
1938 let name = plan.name;
1939 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1940 self.validate_set_isolation_level(session)?;
1941 }
1942 if &name == vars::CLUSTER.name() {
1943 self.validate_set_cluster(session)?;
1944 }
1945 session
1946 .vars_mut()
1947 .reset(self.catalog().system_config(), &name, false)?;
1948 Ok(ExecuteResponse::SetVariable { name, reset: true })
1949 }
1950
1951 pub(super) fn sequence_set_transaction(
1952 &self,
1953 session: &mut Session,
1954 plan: plan::SetTransactionPlan,
1955 ) -> Result<ExecuteResponse, AdapterError> {
1956 for mode in plan.modes {
1958 match mode {
1959 TransactionMode::AccessMode(_) => {
1960 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1961 }
1962 TransactionMode::IsolationLevel(isolation_level) => {
1963 self.validate_set_isolation_level(session)?;
1964
1965 session.vars_mut().set(
1966 self.catalog().system_config(),
1967 TRANSACTION_ISOLATION_VAR_NAME,
1968 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1969 plan.local,
1970 )?
1971 }
1972 }
1973 }
1974 Ok(ExecuteResponse::SetVariable {
1975 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1976 reset: false,
1977 })
1978 }
1979
1980 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1981 if session.transaction().contains_ops() {
1982 Err(AdapterError::InvalidSetIsolationLevel)
1983 } else {
1984 Ok(())
1985 }
1986 }
1987
1988 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1989 if session.transaction().contains_ops() {
1990 Err(AdapterError::InvalidSetCluster)
1991 } else {
1992 Ok(())
1993 }
1994 }
1995
1996 #[instrument]
1997 pub(super) async fn sequence_end_transaction(
1998 &mut self,
1999 mut ctx: ExecuteContext,
2000 mut action: EndTransactionAction,
2001 ) {
2002 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2004 (&action, ctx.session().transaction())
2005 {
2006 action = EndTransactionAction::Rollback;
2007 }
2008 let response = match action {
2009 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2010 params: BTreeMap::new(),
2011 }),
2012 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2013 params: BTreeMap::new(),
2014 }),
2015 };
2016
2017 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2018
2019 let (response, action) = match result {
2020 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2021 (response, action)
2022 }
2023 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2024 let validated_locks = match write_lock_guards {
2028 None => None,
2029 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2030 Ok(locks) => Some(locks),
2031 Err(missing) => {
2032 tracing::error!(?missing, "programming error, missing write locks");
2033 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2034 }
2035 },
2036 };
2037
2038 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2039 for WriteOp { id, rows } in writes {
2040 let total_rows = collected_writes.entry(id).or_default();
2041 total_rows.push(rows);
2042 }
2043
2044 self.submit_write(PendingWriteTxn::User {
2045 span: Span::current(),
2046 writes: collected_writes,
2047 write_locks: validated_locks,
2048 pending_txn: PendingTxn {
2049 ctx,
2050 response,
2051 action,
2052 },
2053 });
2054 return;
2055 }
2056 Ok((
2057 Some(TransactionOps::Peeks {
2058 determination,
2059 requires_linearization: RequireLinearization::Required,
2060 ..
2061 }),
2062 _,
2063 )) if ctx.session().vars().transaction_isolation()
2064 == &IsolationLevel::StrictSerializable =>
2065 {
2066 let conn_id = ctx.session().conn_id().clone();
2067 let pending_read_txn = PendingReadTxn {
2068 txn: PendingRead::Read {
2069 txn: PendingTxn {
2070 ctx,
2071 response,
2072 action,
2073 },
2074 },
2075 timestamp_context: determination.timestamp_context,
2076 created: Instant::now(),
2077 num_requeues: 0,
2078 otel_ctx: OpenTelemetryContext::obtain(),
2079 };
2080 self.strict_serializable_reads_tx
2081 .send((conn_id, pending_read_txn))
2082 .expect("sending to strict_serializable_reads_tx cannot fail");
2083 return;
2084 }
2085 Ok((
2086 Some(TransactionOps::Peeks {
2087 determination,
2088 requires_linearization: RequireLinearization::Required,
2089 ..
2090 }),
2091 _,
2092 )) if ctx.session().vars().transaction_isolation()
2093 == &IsolationLevel::StrongSessionSerializable =>
2094 {
2095 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2096 ctx.session_mut()
2097 .ensure_timestamp_oracle(timeline.clone())
2098 .apply_write(*ts);
2099 }
2100 (response, action)
2101 }
2102 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2103 self.internal_cmd_tx
2104 .send(Message::ExecuteSingleStatementTransaction {
2105 ctx,
2106 otel_ctx: OpenTelemetryContext::obtain(),
2107 stmt,
2108 params,
2109 })
2110 .expect("must send");
2111 return;
2112 }
2113 Ok((_, _)) => (response, action),
2114 Err(err) => (Err(err), EndTransactionAction::Rollback),
2115 };
2116 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2117 let response = response.map(|mut r| {
2119 r.extend_params(changed);
2120 ExecuteResponse::from(r)
2121 });
2122
2123 ctx.retire(response);
2124 }
2125
2126 #[instrument]
2127 async fn sequence_end_transaction_inner(
2128 &mut self,
2129 ctx: &mut ExecuteContext,
2130 action: EndTransactionAction,
2131 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2132 let txn = self.clear_transaction(ctx.session_mut()).await;
2133
2134 if let EndTransactionAction::Commit = action {
2135 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2136 match &mut ops {
2137 TransactionOps::Writes(writes) => {
2138 for WriteOp { id, .. } in &mut writes.iter() {
2139 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2141 AdapterError::Catalog(mz_catalog::memory::error::Error {
2142 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2143 })
2144 })?;
2145 }
2146
2147 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2149 }
2150 TransactionOps::DDL {
2151 ops,
2152 state: _,
2153 side_effects,
2154 revision,
2155 snapshot: _,
2156 } => {
2157 if *revision != self.catalog().transient_revision() {
2159 return Err(AdapterError::DDLTransactionRace);
2160 }
2161 let ops = std::mem::take(ops);
2163 let side_effects = std::mem::take(side_effects);
2164 self.catalog_transact_with_side_effects(
2165 Some(ctx),
2166 ops,
2167 move |a, mut ctx| {
2168 Box::pin(async move {
2169 for side_effect in side_effects {
2170 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2171 }
2172 })
2173 },
2174 )
2175 .await?;
2176 }
2177 _ => (),
2178 }
2179 return Ok((Some(ops), write_lock_guards));
2180 }
2181 }
2182
2183 Ok((None, None))
2184 }
2185
2186 pub(super) async fn sequence_side_effecting_func(
2187 &mut self,
2188 ctx: ExecuteContext,
2189 plan: SideEffectingFunc,
2190 ) {
2191 match plan {
2192 SideEffectingFunc::PgCancelBackend { connection_id } => {
2193 if ctx.session().conn_id().unhandled() == connection_id {
2194 ctx.retire(Err(AdapterError::Canceled));
2198 return;
2199 }
2200
2201 let res = if let Some((id_handle, _conn_meta)) =
2202 self.active_conns.get_key_value(&connection_id)
2203 {
2204 self.handle_privileged_cancel(id_handle.clone()).await;
2206 Datum::True
2207 } else {
2208 Datum::False
2209 };
2210 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2211 }
2212 }
2213 }
2214
2215 pub(crate) async fn execute_side_effecting_func(
2224 &mut self,
2225 plan: SideEffectingFunc,
2226 conn_id: ConnectionId,
2227 current_role: RoleId,
2228 ) -> Result<ExecuteResponse, AdapterError> {
2229 match plan {
2230 SideEffectingFunc::PgCancelBackend { connection_id } => {
2231 if conn_id.unhandled() == connection_id {
2232 return Err(AdapterError::Canceled);
2236 }
2237
2238 if let Some((_id_handle, conn_meta)) =
2241 self.active_conns.get_key_value(&connection_id)
2242 {
2243 let target_role = *conn_meta.authenticated_role_id();
2244 let role_membership = self
2245 .catalog()
2246 .state()
2247 .collect_role_membership(¤t_role);
2248 if !role_membership.contains(&target_role) {
2249 let target_role_name = self
2250 .catalog()
2251 .try_get_role(&target_role)
2252 .map(|role| role.name().to_string())
2253 .unwrap_or_else(|| target_role.to_string());
2254 return Err(AdapterError::Unauthorized(
2255 rbac::UnauthorizedError::RoleMembership {
2256 role_names: vec![target_role_name],
2257 },
2258 ));
2259 }
2260
2261 let id_handle = self
2263 .active_conns
2264 .get_key_value(&connection_id)
2265 .map(|(id, _)| id.clone())
2266 .expect("checked above");
2267 self.handle_privileged_cancel(id_handle).await;
2268 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2269 } else {
2270 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2272 }
2273 }
2274 }
2275 }
2276
2277 pub(crate) async fn determine_real_time_recent_timestamp(
2281 &self,
2282 source_ids: impl Iterator<Item = GlobalId>,
2283 real_time_recency_timeout: Duration,
2284 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2285 let item_ids = source_ids
2286 .map(|gid| {
2287 self.catalog
2288 .try_resolve_item_id(&gid)
2289 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2290 })
2291 .collect::<Result<Vec<_>, _>>()?;
2292
2293 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2299 if to_visit.is_empty() {
2302 return Ok(None);
2303 }
2304
2305 let mut timestamp_objects = BTreeSet::new();
2306
2307 while let Some(id) = to_visit.pop_front() {
2308 timestamp_objects.insert(id);
2309 to_visit.extend(
2310 self.catalog()
2311 .get_entry(&id)
2312 .uses()
2313 .into_iter()
2314 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2315 );
2316 }
2317 let timestamp_objects = timestamp_objects
2318 .into_iter()
2319 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2320 .collect();
2321
2322 let r = self
2323 .controller
2324 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2325 .await?;
2326
2327 Ok(Some(r))
2328 }
2329
2330 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2333 &self,
2334 session: &Session,
2335 source_ids: impl Iterator<Item = GlobalId>,
2336 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2337 let vars = session.vars();
2338
2339 if vars.real_time_recency()
2340 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2341 && !session.contains_read_timestamp()
2342 {
2343 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2344 .await
2345 } else {
2346 Ok(None)
2347 }
2348 }
2349
2350 #[instrument]
2351 pub(super) async fn sequence_explain_plan(
2352 &mut self,
2353 ctx: ExecuteContext,
2354 plan: plan::ExplainPlanPlan,
2355 target_cluster: TargetCluster,
2356 ) {
2357 match &plan.explainee {
2358 plan::Explainee::Statement(stmt) => match stmt {
2359 plan::ExplaineeStatement::CreateView { .. } => {
2360 self.explain_create_view(ctx, plan).await;
2361 }
2362 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2363 self.explain_create_materialized_view(ctx, plan).await;
2364 }
2365 plan::ExplaineeStatement::CreateIndex { .. } => {
2366 self.explain_create_index(ctx, plan).await;
2367 }
2368 plan::ExplaineeStatement::Select { .. } => {
2369 self.explain_peek(ctx, plan, target_cluster).await;
2370 }
2371 plan::ExplaineeStatement::Subscribe { .. } => {
2372 self.explain_subscribe(ctx, plan, target_cluster).await;
2373 }
2374 },
2375 plan::Explainee::View(_) => {
2376 let result = self.explain_view(&ctx, plan);
2377 ctx.retire(result);
2378 }
2379 plan::Explainee::MaterializedView(_) => {
2380 let result = self.explain_materialized_view(&ctx, plan);
2381 ctx.retire(result);
2382 }
2383 plan::Explainee::Index(_) => {
2384 let result = self.explain_index(&ctx, plan);
2385 ctx.retire(result);
2386 }
2387 plan::Explainee::ReplanView(_) => {
2388 self.explain_replan_view(ctx, plan).await;
2389 }
2390 plan::Explainee::ReplanMaterializedView(_) => {
2391 self.explain_replan_materialized_view(ctx, plan).await;
2392 }
2393 plan::Explainee::ReplanIndex(_) => {
2394 self.explain_replan_index(ctx, plan).await;
2395 }
2396 };
2397 }
2398
2399 pub(super) async fn sequence_explain_pushdown(
2400 &mut self,
2401 ctx: ExecuteContext,
2402 plan: plan::ExplainPushdownPlan,
2403 target_cluster: TargetCluster,
2404 ) {
2405 match plan.explainee {
2406 Explainee::Statement(ExplaineeStatement::Select {
2407 broken: false,
2408 plan,
2409 desc: _,
2410 }) => {
2411 let stage = return_if_err!(
2412 self.peek_validate(
2413 ctx.session(),
2414 plan,
2415 target_cluster,
2416 None,
2417 ExplainContext::Pushdown,
2418 Some(ctx.session().vars().max_query_result_size()),
2419 ),
2420 ctx
2421 );
2422 self.sequence_staged(ctx, Span::current(), stage).await;
2423 }
2424 Explainee::MaterializedView(item_id) => {
2425 self.explain_pushdown_materialized_view(ctx, item_id).await;
2426 }
2427 _ => {
2428 ctx.retire(Err(AdapterError::Unsupported(
2429 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2430 )));
2431 }
2432 };
2433 }
2434
2435 async fn execute_explain_pushdown_with_read_holds(
2437 &self,
2438 ctx: ExecuteContext,
2439 as_of: Antichain<Timestamp>,
2440 mz_now: ResultSpec<'static>,
2441 read_holds: Option<ReadHolds<Timestamp>>,
2442 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2443 ) {
2444 let fut = self
2445 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2446 .await;
2447 task::spawn(|| "render explain pushdown", async move {
2448 let _read_holds = read_holds;
2450 let res = fut.await;
2451 ctx.retire(res);
2452 });
2453 }
2454
2455 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2457 &self,
2458 session: &Session,
2459 as_of: Antichain<Timestamp>,
2460 mz_now: ResultSpec<'static>,
2461 imports: I,
2462 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2463 super::explain_pushdown_future_inner(
2465 session,
2466 &self.catalog,
2467 &self.controller.storage_collections,
2468 as_of,
2469 mz_now,
2470 imports,
2471 )
2472 .await
2473 }
2474
2475 #[instrument]
2476 pub(super) async fn sequence_insert(
2477 &mut self,
2478 mut ctx: ExecuteContext,
2479 plan: plan::InsertPlan,
2480 ) {
2481 if !ctx.session_mut().transaction().allows_writes() {
2489 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2490 return;
2491 }
2492
2493 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2507 let expr = return_if_err!(
2510 plan.values
2511 .clone()
2512 .lower(self.catalog().system_config(), None),
2513 ctx
2514 );
2515 OptimizedMirRelationExpr(expr)
2516 } else {
2517 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2519
2520 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2522
2523 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2525 };
2526
2527 match optimized_mir.into_inner() {
2528 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2529 let catalog = self.owned_catalog();
2530 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2531 let result =
2532 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2533 ctx.retire(result);
2534 });
2535 }
2536 _ => {
2538 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2539 Some(table) => {
2540 let desc = table.relation_desc_latest().expect("table has a desc");
2542 desc.arity()
2543 }
2544 None => {
2545 ctx.retire(Err(AdapterError::Catalog(
2546 mz_catalog::memory::error::Error {
2547 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2548 plan.id.to_string(),
2549 )),
2550 },
2551 )));
2552 return;
2553 }
2554 };
2555
2556 let finishing = RowSetFinishing {
2557 order_by: vec![],
2558 limit: None,
2559 offset: 0,
2560 project: (0..desc_arity).collect(),
2561 };
2562
2563 let read_then_write_plan = plan::ReadThenWritePlan {
2564 id: plan.id,
2565 selection: plan.values,
2566 finishing,
2567 assignments: BTreeMap::new(),
2568 kind: MutationKind::Insert,
2569 returning: plan.returning,
2570 };
2571
2572 self.sequence_read_then_write(ctx, read_then_write_plan)
2573 .await;
2574 }
2575 }
2576 }
2577
2578 #[instrument]
2583 pub(super) async fn sequence_read_then_write(
2584 &mut self,
2585 mut ctx: ExecuteContext,
2586 plan: plan::ReadThenWritePlan,
2587 ) {
2588 let mut source_ids: BTreeSet<_> = plan
2589 .selection
2590 .depends_on()
2591 .into_iter()
2592 .map(|gid| self.catalog().resolve_item_id(&gid))
2593 .collect();
2594 source_ids.insert(plan.id);
2595
2596 if ctx.session().transaction().write_locks().is_none() {
2598 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2600
2601 for id in &source_ids {
2603 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2604 write_locks.insert_lock(*id, lock);
2605 }
2606 }
2607
2608 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2610 Ok(locks) => locks,
2611 Err(missing) => {
2612 let role_metadata = ctx.session().role_metadata().clone();
2614 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2615 let plan = DeferredPlan {
2616 ctx,
2617 plan: Plan::ReadThenWrite(plan),
2618 validity: PlanValidity::new(
2619 self.catalog.transient_revision(),
2620 source_ids.clone(),
2621 None,
2622 None,
2623 role_metadata,
2624 ),
2625 requires_locks: source_ids,
2626 };
2627 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2628 }
2629 };
2630
2631 ctx.session_mut()
2632 .try_grant_write_locks(write_locks)
2633 .expect("session has already been granted write locks");
2634 }
2635
2636 let plan::ReadThenWritePlan {
2637 id,
2638 kind,
2639 selection,
2640 mut assignments,
2641 finishing,
2642 mut returning,
2643 } = plan;
2644
2645 let desc = match self.catalog().try_get_entry(&id) {
2647 Some(table) => {
2648 table
2650 .relation_desc_latest()
2651 .expect("table has a desc")
2652 .into_owned()
2653 }
2654 None => {
2655 ctx.retire(Err(AdapterError::Catalog(
2656 mz_catalog::memory::error::Error {
2657 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2658 },
2659 )));
2660 return;
2661 }
2662 };
2663
2664 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2666 || assignments.values().any(|e| e.contains_temporal())
2667 || returning.iter().any(|e| e.contains_temporal());
2668 if contains_temporal {
2669 ctx.retire(Err(AdapterError::Unsupported(
2670 "calls to mz_now in write statements",
2671 )));
2672 return;
2673 }
2674
2675 fn validate_read_dependencies(
2683 catalog: &Catalog,
2684 id: &CatalogItemId,
2685 ) -> Result<(), AdapterError> {
2686 use CatalogItemType::*;
2687 use mz_catalog::memory::objects;
2688 let mut ids_to_check = Vec::new();
2689 let valid = match catalog.try_get_entry(id) {
2690 Some(entry) => {
2691 if let CatalogItem::View(objects::View { optimized_expr, .. })
2692 | CatalogItem::MaterializedView(objects::MaterializedView {
2693 optimized_expr,
2694 ..
2695 }) = entry.item()
2696 {
2697 if optimized_expr.contains_temporal() {
2698 return Err(AdapterError::Unsupported(
2699 "calls to mz_now in write statements",
2700 ));
2701 }
2702 }
2703 match entry.item().typ() {
2704 typ @ (Func | View | MaterializedView | ContinualTask) => {
2705 ids_to_check.extend(entry.uses());
2706 let valid_id = id.is_user() || matches!(typ, Func);
2707 valid_id
2708 }
2709 Source | Secret | Connection => false,
2710 Sink | Index => unreachable!(),
2712 Table => {
2713 if !id.is_user() {
2714 false
2716 } else {
2717 entry.source_export_details().is_none()
2719 }
2720 }
2721 Type => true,
2722 }
2723 }
2724 None => false,
2725 };
2726 if !valid {
2727 let (object_name, object_type) = match catalog.try_get_entry(id) {
2728 Some(entry) => {
2729 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2730 let object_type = match entry.item().typ() {
2731 Source => "source",
2733 Secret => "secret",
2734 Connection => "connection",
2735 Table => {
2736 if !id.is_user() {
2737 "system table"
2738 } else {
2739 "source-export table"
2740 }
2741 }
2742 View => "system view",
2743 MaterializedView => "system materialized view",
2744 ContinualTask => "system task",
2745 _ => "invalid dependency",
2746 };
2747 (object_name, object_type.to_string())
2748 }
2749 None => (id.to_string(), "unknown".to_string()),
2750 };
2751 return Err(AdapterError::InvalidTableMutationSelection {
2752 object_name,
2753 object_type,
2754 });
2755 }
2756 for id in ids_to_check {
2757 validate_read_dependencies(catalog, &id)?;
2758 }
2759 Ok(())
2760 }
2761
2762 for gid in selection.depends_on() {
2763 let item_id = self.catalog().resolve_item_id(&gid);
2764 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2765 ctx.retire(Err(err));
2766 return;
2767 }
2768 }
2769
2770 let (peek_tx, peek_rx) = oneshot::channel();
2771 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2772 let (tx, _, session, extra) = ctx.into_parts();
2773 let peek_ctx = ExecuteContext::from_parts(
2785 peek_client_tx,
2786 self.internal_cmd_tx.clone(),
2787 session,
2788 Default::default(),
2789 );
2790
2791 self.sequence_peek(
2792 peek_ctx,
2793 plan::SelectPlan {
2794 select: None,
2795 source: selection,
2796 when: QueryWhen::FreshestTableWrite,
2797 finishing,
2798 copy_to: None,
2799 },
2800 TargetCluster::Active,
2801 None,
2802 )
2803 .await;
2804
2805 let internal_cmd_tx = self.internal_cmd_tx.clone();
2806 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2807 let catalog = self.owned_catalog();
2808 let max_result_size = self.catalog().system_config().max_result_size();
2809
2810 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2811 let (peek_response, session) = match peek_rx.await {
2812 Ok(Response {
2813 result: Ok(resp),
2814 session,
2815 otel_ctx,
2816 }) => {
2817 otel_ctx.attach_as_parent();
2818 (resp, session)
2819 }
2820 Ok(Response {
2821 result: Err(e),
2822 session,
2823 otel_ctx,
2824 }) => {
2825 let ctx =
2826 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2827 otel_ctx.attach_as_parent();
2828 ctx.retire(Err(e));
2829 return;
2830 }
2831 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2833 };
2834 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2835 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2836
2837 if timeout_dur == Duration::ZERO {
2839 timeout_dur = Duration::MAX;
2840 }
2841
2842 let style = ExprPrepOneShot {
2843 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2845 catalog_state: catalog.state(),
2846 };
2847 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2848 return_if_err!(style.prep_scalar_expr(expr), ctx);
2849 }
2850
2851 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2852 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2853 let arena = RowArena::new();
2854 let mut diffs = Vec::new();
2855 let mut datum_vec = mz_repr::DatumVec::new();
2856
2857 while let Some(row) = rows.next() {
2858 if !assignments.is_empty() {
2859 assert!(
2860 matches!(kind, MutationKind::Update),
2861 "only updates support assignments"
2862 );
2863 let mut datums = datum_vec.borrow_with(row);
2864 let mut updates = vec![];
2865 for (idx, expr) in &assignments {
2866 let updated = match expr.eval(&datums, &arena) {
2867 Ok(updated) => updated,
2868 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2869 };
2870 updates.push((*idx, updated));
2871 }
2872 for (idx, new_value) in updates {
2873 datums[idx] = new_value;
2874 }
2875 let updated = Row::pack_slice(&datums);
2876 diffs.push((updated, Diff::ONE));
2877 }
2878 match kind {
2879 MutationKind::Update | MutationKind::Delete => {
2883 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2884 }
2885 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2886 }
2887 }
2888
2889 let mut byte_size: u64 = 0;
2892 for (row, diff) in &diffs {
2893 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2894 if diff.is_positive() {
2895 for (idx, datum) in row.iter().enumerate() {
2896 desc.constraints_met(idx, &datum)?;
2897 }
2898 }
2899 }
2900 Ok((diffs, byte_size))
2901 };
2902
2903 let diffs = match peek_response {
2904 ExecuteResponse::SendingRowsStreaming {
2905 rows: mut rows_stream,
2906 ..
2907 } => {
2908 let mut byte_size: u64 = 0;
2909 let mut diffs = Vec::new();
2910 let result = loop {
2911 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2912 Ok(Some(res)) => match res {
2913 PeekResponseUnary::Rows(new_rows) => {
2914 match make_diffs(new_rows) {
2915 Ok((mut new_diffs, new_byte_size)) => {
2916 byte_size = byte_size.saturating_add(new_byte_size);
2917 if byte_size > max_result_size {
2918 break Err(AdapterError::ResultSize(format!(
2919 "result exceeds max size of {max_result_size}"
2920 )));
2921 }
2922 diffs.append(&mut new_diffs)
2923 }
2924 Err(e) => break Err(e),
2925 };
2926 }
2927 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2928 PeekResponseUnary::Error(e) => {
2929 break Err(AdapterError::Unstructured(anyhow!(e)));
2930 }
2931 },
2932 Ok(None) => break Ok(diffs),
2933 Err(_) => {
2934 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2939 conn_id: ctx.session().conn_id().clone(),
2940 });
2941 if let Err(e) = result {
2942 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2943 }
2944 break Err(AdapterError::StatementTimeout);
2945 }
2946 }
2947 };
2948
2949 result
2950 }
2951 ExecuteResponse::SendingRowsImmediate { rows } => {
2952 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2953 }
2954 resp => Err(AdapterError::Unstructured(anyhow!(
2955 "unexpected peek response: {resp:?}"
2956 ))),
2957 };
2958
2959 let mut returning_rows = Vec::new();
2960 let mut diff_err: Option<AdapterError> = None;
2961 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2962 let arena = RowArena::new();
2963 for (row, diff) in diffs {
2964 if !diff.is_positive() {
2965 continue;
2966 }
2967 let mut returning_row = Row::with_capacity(returning.len());
2968 let mut packer = returning_row.packer();
2969 for expr in &returning {
2970 let datums: Vec<_> = row.iter().collect();
2971 match expr.eval(&datums, &arena) {
2972 Ok(datum) => {
2973 packer.push(datum);
2974 }
2975 Err(err) => {
2976 diff_err = Some(err.into());
2977 break;
2978 }
2979 }
2980 }
2981 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2982 let diff = match NonZeroUsize::try_from(diff) {
2983 Ok(diff) => diff,
2984 Err(err) => {
2985 diff_err = Some(err.into());
2986 break;
2987 }
2988 };
2989 returning_rows.push((returning_row, diff));
2990 if diff_err.is_some() {
2991 break;
2992 }
2993 }
2994 }
2995 let diffs = if let Some(err) = diff_err {
2996 Err(err)
2997 } else {
2998 diffs
2999 };
3000
3001 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3004 if let Some(timestamp_context) = timestamp_context {
3013 let (tx, rx) = tokio::sync::oneshot::channel();
3014 let conn_id = ctx.session().conn_id().clone();
3015 let pending_read_txn = PendingReadTxn {
3016 txn: PendingRead::ReadThenWrite { ctx, tx },
3017 timestamp_context,
3018 created: Instant::now(),
3019 num_requeues: 0,
3020 otel_ctx: OpenTelemetryContext::obtain(),
3021 };
3022 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3023 if let Err(e) = result {
3025 warn!(
3026 "strict_serializable_reads_tx dropped before we could send: {:?}",
3027 e
3028 );
3029 return;
3030 }
3031 let result = rx.await;
3032 ctx = match result {
3034 Ok(Some(ctx)) => ctx,
3035 Ok(None) => {
3036 return;
3039 }
3040 Err(e) => {
3041 warn!(
3042 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3043 e
3044 );
3045 return;
3046 }
3047 };
3048 }
3049
3050 match diffs {
3051 Ok(diffs) => {
3052 let result = Self::send_diffs(
3053 ctx.session_mut(),
3054 plan::SendDiffsPlan {
3055 id,
3056 updates: diffs,
3057 kind,
3058 returning: returning_rows,
3059 max_result_size,
3060 },
3061 );
3062 ctx.retire(result);
3063 }
3064 Err(e) => {
3065 ctx.retire(Err(e));
3066 }
3067 }
3068 });
3069 }
3070
3071 #[instrument]
3072 pub(super) async fn sequence_alter_item_rename(
3073 &mut self,
3074 ctx: &mut ExecuteContext,
3075 plan: plan::AlterItemRenamePlan,
3076 ) -> Result<ExecuteResponse, AdapterError> {
3077 let op = catalog::Op::RenameItem {
3078 id: plan.id,
3079 current_full_name: plan.current_full_name,
3080 to_name: plan.to_name,
3081 };
3082 match self
3083 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3084 .await
3085 {
3086 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3087 Err(err) => Err(err),
3088 }
3089 }
3090
3091 #[instrument]
3092 pub(super) async fn sequence_alter_retain_history(
3093 &mut self,
3094 ctx: &mut ExecuteContext,
3095 plan: plan::AlterRetainHistoryPlan,
3096 ) -> Result<ExecuteResponse, AdapterError> {
3097 let ops = vec![catalog::Op::AlterRetainHistory {
3098 id: plan.id,
3099 value: plan.value,
3100 window: plan.window,
3101 }];
3102 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3103 Box::pin(async move {
3104 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3105 let cluster = match catalog_item {
3106 CatalogItem::Table(_)
3107 | CatalogItem::MaterializedView(_)
3108 | CatalogItem::Source(_)
3109 | CatalogItem::ContinualTask(_) => None,
3110 CatalogItem::Index(index) => Some(index.cluster_id),
3111 CatalogItem::Log(_)
3112 | CatalogItem::View(_)
3113 | CatalogItem::Sink(_)
3114 | CatalogItem::Type(_)
3115 | CatalogItem::Func(_)
3116 | CatalogItem::Secret(_)
3117 | CatalogItem::Connection(_) => unreachable!(),
3118 };
3119 match cluster {
3120 Some(cluster) => {
3121 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3122 }
3123 None => {
3124 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3125 }
3126 }
3127 })
3128 })
3129 .await?;
3130 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3131 }
3132
3133 #[instrument]
3134 pub(super) async fn sequence_alter_source_timestamp_interval(
3135 &mut self,
3136 ctx: &mut ExecuteContext,
3137 plan: plan::AlterSourceTimestampIntervalPlan,
3138 ) -> Result<ExecuteResponse, AdapterError> {
3139 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3140 id: plan.id,
3141 value: plan.value,
3142 interval: plan.interval,
3143 }];
3144 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3145 Box::pin(async move {
3146 let source = coord
3147 .catalog()
3148 .get_entry(&plan.id)
3149 .source()
3150 .expect("known to be source");
3151 let (global_id, desc) = match &source.data_source {
3152 DataSourceDesc::Ingestion { desc, .. }
3153 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3154 (source.global_id, desc.clone())
3155 }
3156 _ => return,
3157 };
3158 let desc = desc.into_inline_connection(coord.catalog().state());
3159 coord
3160 .controller
3161 .storage
3162 .alter_ingestion_source_desc(BTreeMap::from([(global_id, desc)]))
3163 .await
3164 .unwrap_or_terminate("cannot fail to alter ingestion source desc");
3165 })
3166 })
3167 .await?;
3168 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3169 }
3170
3171 #[instrument]
3172 pub(super) async fn sequence_alter_schema_rename(
3173 &mut self,
3174 ctx: &mut ExecuteContext,
3175 plan: plan::AlterSchemaRenamePlan,
3176 ) -> Result<ExecuteResponse, AdapterError> {
3177 let (database_spec, schema_spec) = plan.cur_schema_spec;
3178 let op = catalog::Op::RenameSchema {
3179 database_spec,
3180 schema_spec,
3181 new_name: plan.new_schema_name,
3182 check_reserved_names: true,
3183 };
3184 match self
3185 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3186 .await
3187 {
3188 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3189 Err(err) => Err(err),
3190 }
3191 }
3192
3193 #[instrument]
3194 pub(super) async fn sequence_alter_schema_swap(
3195 &mut self,
3196 ctx: &mut ExecuteContext,
3197 plan: plan::AlterSchemaSwapPlan,
3198 ) -> Result<ExecuteResponse, AdapterError> {
3199 let plan::AlterSchemaSwapPlan {
3200 schema_a_spec: (schema_a_db, schema_a),
3201 schema_a_name,
3202 schema_b_spec: (schema_b_db, schema_b),
3203 schema_b_name,
3204 name_temp,
3205 } = plan;
3206
3207 let op_a = catalog::Op::RenameSchema {
3208 database_spec: schema_a_db,
3209 schema_spec: schema_a,
3210 new_name: name_temp,
3211 check_reserved_names: false,
3212 };
3213 let op_b = catalog::Op::RenameSchema {
3214 database_spec: schema_b_db,
3215 schema_spec: schema_b,
3216 new_name: schema_a_name,
3217 check_reserved_names: false,
3218 };
3219 let op_c = catalog::Op::RenameSchema {
3220 database_spec: schema_a_db,
3221 schema_spec: schema_a,
3222 new_name: schema_b_name,
3223 check_reserved_names: false,
3224 };
3225
3226 match self
3227 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3228 Box::pin(async {})
3229 })
3230 .await
3231 {
3232 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3233 Err(err) => Err(err),
3234 }
3235 }
3236
3237 #[instrument]
3238 pub(super) async fn sequence_alter_role(
3239 &mut self,
3240 session: &Session,
3241 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3242 ) -> Result<ExecuteResponse, AdapterError> {
3243 let catalog = self.catalog().for_session(session);
3244 let role = catalog.get_role(&id);
3245
3246 let mut notices = vec![];
3248
3249 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3251 let mut vars = role.vars().clone();
3252
3253 let mut nopassword = false;
3256
3257 match option {
3259 PlannedAlterRoleOption::Attributes(attrs) => {
3260 self.validate_role_attributes(&attrs.clone().into())?;
3261
3262 if let Some(inherit) = attrs.inherit {
3263 attributes.inherit = inherit;
3264 }
3265
3266 if let Some(password) = attrs.password {
3267 attributes.password = Some(password);
3268 attributes.scram_iterations =
3269 Some(self.catalog().system_config().scram_iterations())
3270 }
3271
3272 if let Some(superuser) = attrs.superuser {
3273 attributes.superuser = Some(superuser);
3274 }
3275
3276 if let Some(login) = attrs.login {
3277 attributes.login = Some(login);
3278 }
3279
3280 if attrs.nopassword.unwrap_or(false) {
3281 nopassword = true;
3282 }
3283
3284 if let Some(notice) = self.should_emit_rbac_notice(session) {
3285 notices.push(notice);
3286 }
3287 }
3288 PlannedAlterRoleOption::Variable(variable) => {
3289 let session_var = session.vars().inspect(variable.name())?;
3291 session_var.visible(session.user(), catalog.system_vars())?;
3293
3294 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3297 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3298 } else if let PlannedRoleVariable::Set {
3299 name,
3300 value: VariableValue::Values(vals),
3301 } = &variable
3302 {
3303 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3304 notices.push(AdapterNotice::IntrospectionClusterUsage);
3305 }
3306 }
3307
3308 let var_name = match variable {
3309 PlannedRoleVariable::Set { name, value } => {
3310 match &value {
3312 VariableValue::Default => {
3313 vars.remove(&name);
3314 }
3315 VariableValue::Values(vals) => {
3316 let var = match &vals[..] {
3317 [val] => OwnedVarInput::Flat(val.clone()),
3318 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3319 };
3320 session_var.check(var.borrow())?;
3322
3323 vars.insert(name.clone(), var);
3324 }
3325 };
3326 name
3327 }
3328 PlannedRoleVariable::Reset { name } => {
3329 vars.remove(&name);
3331 name
3332 }
3333 };
3334
3335 notices.push(AdapterNotice::VarDefaultUpdated {
3337 role: Some(name.clone()),
3338 var_name: Some(var_name),
3339 });
3340 }
3341 }
3342
3343 let op = catalog::Op::AlterRole {
3344 id,
3345 name,
3346 attributes,
3347 nopassword,
3348 vars: RoleVars { map: vars },
3349 };
3350 let response = self
3351 .catalog_transact(Some(session), vec![op])
3352 .await
3353 .map(|_| ExecuteResponse::AlteredRole)?;
3354
3355 session.add_notices(notices);
3357
3358 Ok(response)
3359 }
3360
3361 #[instrument]
3362 pub(super) async fn sequence_alter_sink_prepare(
3363 &mut self,
3364 ctx: ExecuteContext,
3365 plan: plan::AlterSinkPlan,
3366 ) {
3367 let id_bundle = crate::CollectionIdBundle {
3369 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3370 compute_ids: BTreeMap::new(),
3371 };
3372 let read_hold = self.acquire_read_holds(&id_bundle);
3373
3374 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3375 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3376 return;
3377 };
3378
3379 let otel_ctx = OpenTelemetryContext::obtain();
3380 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3381
3382 let plan_validity = PlanValidity::new(
3383 self.catalog().transient_revision(),
3384 BTreeSet::from_iter([plan.item_id, from_item_id]),
3385 Some(plan.in_cluster),
3386 None,
3387 ctx.session().role_metadata().clone(),
3388 );
3389
3390 info!(
3391 "preparing alter sink for {}: frontiers={:?} export={:?}",
3392 plan.global_id,
3393 self.controller
3394 .storage_collections
3395 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3396 self.controller.storage.export(plan.global_id)
3397 );
3398
3399 self.install_storage_watch_set(
3405 ctx.session().conn_id().clone(),
3406 BTreeSet::from_iter([plan.global_id]),
3407 read_ts,
3408 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3409 ctx: Some(ctx),
3410 otel_ctx,
3411 plan,
3412 plan_validity,
3413 read_hold,
3414 }),
3415 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3416 }
3417
3418 #[instrument]
3419 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3420 ctx.otel_ctx.attach_as_parent();
3421
3422 let plan::AlterSinkPlan {
3423 item_id,
3424 global_id,
3425 sink: sink_plan,
3426 with_snapshot,
3427 in_cluster,
3428 } = ctx.plan.clone();
3429
3430 match ctx.plan_validity.check(self.catalog()) {
3439 Ok(()) => {}
3440 Err(err) => {
3441 ctx.retire(Err(err));
3442 return;
3443 }
3444 }
3445
3446 let entry = self.catalog().get_entry(&item_id);
3447 let CatalogItem::Sink(old_sink) = entry.item() else {
3448 panic!("invalid item kind for `AlterSinkPlan`");
3449 };
3450
3451 if sink_plan.version != old_sink.version + 1 {
3452 ctx.retire(Err(AdapterError::ChangedPlan(
3453 "sink was altered concurrently".into(),
3454 )));
3455 return;
3456 }
3457
3458 info!(
3459 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3460 self.controller
3461 .storage_collections
3462 .collections_frontiers(vec![global_id, sink_plan.from]),
3463 self.controller.storage.export(global_id),
3464 );
3465
3466 let write_frontier = &self
3469 .controller
3470 .storage
3471 .export(global_id)
3472 .expect("sink known to exist")
3473 .write_frontier;
3474 let as_of = ctx.read_hold.least_valid_read();
3475 assert!(
3476 write_frontier.iter().all(|t| as_of.less_than(t)),
3477 "{:?} should be strictly less than {:?}",
3478 &*as_of,
3479 &**write_frontier
3480 );
3481
3482 let create_sql = &old_sink.create_sql;
3488 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3489 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3490 unreachable!("invalid statement kind for sink");
3491 };
3492
3493 stmt.with_options
3495 .retain(|o| o.name != CreateSinkOptionName::Version);
3496 stmt.with_options.push(CreateSinkOption {
3497 name: CreateSinkOptionName::Version,
3498 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3499 sink_plan.version.to_string(),
3500 ))),
3501 });
3502
3503 let conn_catalog = self.catalog().for_system_session();
3504 let (mut stmt, resolved_ids) =
3505 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3506
3507 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3509 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3510 stmt.from = ResolvedItemName::Item {
3511 id: from_entry.id(),
3512 qualifiers: from_entry.name.qualifiers.clone(),
3513 full_name,
3514 print_id: true,
3515 version: from_entry.version,
3516 };
3517
3518 let new_sink = Sink {
3519 create_sql: stmt.to_ast_string_stable(),
3520 global_id,
3521 from: sink_plan.from,
3522 connection: sink_plan.connection.clone(),
3523 envelope: sink_plan.envelope,
3524 version: sink_plan.version,
3525 with_snapshot,
3526 resolved_ids: resolved_ids.clone(),
3527 cluster_id: in_cluster,
3528 commit_interval: sink_plan.commit_interval,
3529 };
3530
3531 let ops = vec![catalog::Op::UpdateItem {
3532 id: item_id,
3533 name: entry.name().clone(),
3534 to_item: CatalogItem::Sink(new_sink),
3535 }];
3536
3537 match self
3538 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3539 .await
3540 {
3541 Ok(()) => {}
3542 Err(err) => {
3543 ctx.retire(Err(err));
3544 return;
3545 }
3546 }
3547
3548 let storage_sink_desc = StorageSinkDesc {
3549 from: sink_plan.from,
3550 from_desc: from_entry
3551 .relation_desc()
3552 .expect("sinks can only be built on items with descs")
3553 .into_owned(),
3554 connection: sink_plan
3555 .connection
3556 .clone()
3557 .into_inline_connection(self.catalog().state()),
3558 envelope: sink_plan.envelope,
3559 as_of,
3560 with_snapshot,
3561 version: sink_plan.version,
3562 from_storage_metadata: (),
3563 to_storage_metadata: (),
3564 commit_interval: sink_plan.commit_interval,
3565 };
3566
3567 self.controller
3568 .storage
3569 .alter_export(
3570 global_id,
3571 ExportDescription {
3572 sink: storage_sink_desc,
3573 instance_id: in_cluster,
3574 },
3575 )
3576 .await
3577 .unwrap_or_terminate("cannot fail to alter source desc");
3578
3579 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3580 }
3581
3582 #[instrument]
3583 pub(super) async fn sequence_alter_connection(
3584 &mut self,
3585 ctx: ExecuteContext,
3586 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3587 ) {
3588 match action {
3589 AlterConnectionAction::RotateKeys => {
3590 self.sequence_rotate_keys(ctx, id).await;
3591 }
3592 AlterConnectionAction::AlterOptions {
3593 set_options,
3594 drop_options,
3595 validate,
3596 } => {
3597 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3598 .await
3599 }
3600 }
3601 }
3602
3603 #[instrument]
3604 async fn sequence_alter_connection_options(
3605 &mut self,
3606 mut ctx: ExecuteContext,
3607 id: CatalogItemId,
3608 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3609 drop_options: BTreeSet<ConnectionOptionName>,
3610 validate: bool,
3611 ) {
3612 let cur_entry = self.catalog().get_entry(&id);
3613 let cur_conn = cur_entry.connection().expect("known to be connection");
3614 let connection_gid = cur_conn.global_id();
3615
3616 let inner = || -> Result<Connection, AdapterError> {
3617 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3619 .expect("invalid create sql persisted to catalog")
3620 .into_element()
3621 .ast
3622 {
3623 Statement::CreateConnection(stmt) => stmt,
3624 _ => unreachable!("proved type is source"),
3625 };
3626
3627 let catalog = self.catalog().for_system_session();
3628
3629 let (mut create_conn_stmt, resolved_ids) =
3631 mz_sql::names::resolve(&catalog, create_conn_stmt)
3632 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3633
3634 create_conn_stmt
3636 .values
3637 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3638
3639 create_conn_stmt.values.extend(
3641 set_options
3642 .into_iter()
3643 .map(|(name, value)| ConnectionOption { name, value }),
3644 );
3645
3646 let mut catalog = self.catalog().for_system_session();
3649 catalog.mark_id_unresolvable_for_replanning(id);
3650
3651 let plan = match mz_sql::plan::plan(
3653 None,
3654 &catalog,
3655 Statement::CreateConnection(create_conn_stmt),
3656 &Params::empty(),
3657 &resolved_ids,
3658 )
3659 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3660 {
3661 Plan::CreateConnection(plan) => plan,
3662 _ => unreachable!("create source plan is only valid response"),
3663 };
3664
3665 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3667 .expect("invalid create sql persisted to catalog")
3668 .into_element()
3669 .ast
3670 {
3671 Statement::CreateConnection(stmt) => stmt,
3672 _ => unreachable!("proved type is source"),
3673 };
3674
3675 let catalog = self.catalog().for_system_session();
3676
3677 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3679 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3680
3681 Ok(Connection {
3682 create_sql: plan.connection.create_sql,
3683 global_id: cur_conn.global_id,
3684 details: plan.connection.details,
3685 resolved_ids: new_deps,
3686 })
3687 };
3688
3689 let conn = match inner() {
3690 Ok(conn) => conn,
3691 Err(e) => {
3692 return ctx.retire(Err(e));
3693 }
3694 };
3695
3696 if validate {
3697 let connection = conn
3698 .details
3699 .to_connection()
3700 .into_inline_connection(self.catalog().state());
3701
3702 let internal_cmd_tx = self.internal_cmd_tx.clone();
3703 let transient_revision = self.catalog().transient_revision();
3704 let conn_id = ctx.session().conn_id().clone();
3705 let otel_ctx = OpenTelemetryContext::obtain();
3706 let role_metadata = ctx.session().role_metadata().clone();
3707 let current_storage_parameters = self.controller.storage.config().clone();
3708
3709 task::spawn(
3710 || format!("validate_alter_connection:{conn_id}"),
3711 async move {
3712 let resolved_ids = conn.resolved_ids.clone();
3713 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3714 let result = match connection.validate(id, ¤t_storage_parameters).await {
3715 Ok(()) => Ok(conn),
3716 Err(err) => Err(err.into()),
3717 };
3718
3719 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3721 AlterConnectionValidationReady {
3722 ctx,
3723 result,
3724 connection_id: id,
3725 connection_gid,
3726 plan_validity: PlanValidity::new(
3727 transient_revision,
3728 dependency_ids.clone(),
3729 None,
3730 None,
3731 role_metadata,
3732 ),
3733 otel_ctx,
3734 resolved_ids,
3735 },
3736 ));
3737 if let Err(e) = result {
3738 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3739 }
3740 },
3741 );
3742 } else {
3743 let result = self
3744 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3745 .await;
3746 ctx.retire(result);
3747 }
3748 }
3749
3750 #[instrument]
3751 pub(crate) async fn sequence_alter_connection_stage_finish(
3752 &mut self,
3753 session: &Session,
3754 id: CatalogItemId,
3755 connection: Connection,
3756 ) -> Result<ExecuteResponse, AdapterError> {
3757 match self.catalog.get_entry(&id).item() {
3758 CatalogItem::Connection(curr_conn) => {
3759 curr_conn
3760 .details
3761 .to_connection()
3762 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3763 .map_err(StorageError::from)?;
3764 }
3765 _ => unreachable!("known to be a connection"),
3766 };
3767
3768 let ops = vec![catalog::Op::UpdateItem {
3769 id,
3770 name: self.catalog.get_entry(&id).name().clone(),
3771 to_item: CatalogItem::Connection(connection.clone()),
3772 }];
3773
3774 self.catalog_transact(Some(session), ops).await?;
3775
3776 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3783 }
3784
3785 #[instrument]
3786 pub(super) async fn sequence_alter_source(
3787 &mut self,
3788 session: &Session,
3789 plan::AlterSourcePlan {
3790 item_id,
3791 ingestion_id,
3792 action,
3793 }: plan::AlterSourcePlan,
3794 ) -> Result<ExecuteResponse, AdapterError> {
3795 let cur_entry = self.catalog().get_entry(&item_id);
3796 let cur_source = cur_entry.source().expect("known to be source");
3797
3798 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3799 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3801 .expect("invalid create sql persisted to catalog")
3802 .into_element()
3803 .ast
3804 {
3805 Statement::CreateSource(stmt) => stmt,
3806 _ => unreachable!("proved type is source"),
3807 };
3808
3809 let catalog = coord.catalog().for_system_session();
3810
3811 mz_sql::names::resolve(&catalog, create_source_stmt)
3813 .map_err(|e| AdapterError::internal(err_cx, e))
3814 };
3815
3816 match action {
3817 plan::AlterSourceAction::AddSubsourceExports {
3818 subsources,
3819 options,
3820 } => {
3821 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3822
3823 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3824 text_columns: mut new_text_columns,
3825 exclude_columns: mut new_exclude_columns,
3826 ..
3827 } = options.try_into()?;
3828
3829 let (mut create_source_stmt, resolved_ids) =
3831 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3832
3833 let catalog = self.catalog();
3835 let curr_references: BTreeSet<_> = catalog
3836 .get_entry(&item_id)
3837 .used_by()
3838 .into_iter()
3839 .filter_map(|subsource| {
3840 catalog
3841 .get_entry(subsource)
3842 .subsource_details()
3843 .map(|(_id, reference, _details)| reference)
3844 })
3845 .collect();
3846
3847 let purification_err =
3850 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3851
3852 match &mut create_source_stmt.connection {
3856 CreateSourceConnection::Postgres {
3857 options: curr_options,
3858 ..
3859 } => {
3860 let mz_sql::plan::PgConfigOptionExtracted {
3861 mut text_columns, ..
3862 } = curr_options.clone().try_into()?;
3863
3864 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3867
3868 text_columns.retain(|column_qualified_reference| {
3870 mz_ore::soft_assert_eq_or_log!(
3871 column_qualified_reference.0.len(),
3872 4,
3873 "all TEXT COLUMNS values must be column-qualified references"
3874 );
3875 let mut table = column_qualified_reference.clone();
3876 table.0.truncate(3);
3877 curr_references.contains(&table)
3878 });
3879
3880 new_text_columns.extend(text_columns);
3882
3883 if !new_text_columns.is_empty() {
3885 new_text_columns.sort();
3886 let new_text_columns = new_text_columns
3887 .into_iter()
3888 .map(WithOptionValue::UnresolvedItemName)
3889 .collect();
3890
3891 curr_options.push(PgConfigOption {
3892 name: PgConfigOptionName::TextColumns,
3893 value: Some(WithOptionValue::Sequence(new_text_columns)),
3894 });
3895 }
3896 }
3897 CreateSourceConnection::MySql {
3898 options: curr_options,
3899 ..
3900 } => {
3901 let mz_sql::plan::MySqlConfigOptionExtracted {
3902 mut text_columns,
3903 mut exclude_columns,
3904 ..
3905 } = curr_options.clone().try_into()?;
3906
3907 curr_options.retain(|o| {
3910 !matches!(
3911 o.name,
3912 MySqlConfigOptionName::TextColumns
3913 | MySqlConfigOptionName::ExcludeColumns
3914 )
3915 });
3916
3917 let column_referenced =
3919 |column_qualified_reference: &UnresolvedItemName| {
3920 mz_ore::soft_assert_eq_or_log!(
3921 column_qualified_reference.0.len(),
3922 3,
3923 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3924 );
3925 let mut table = column_qualified_reference.clone();
3926 table.0.truncate(2);
3927 curr_references.contains(&table)
3928 };
3929 text_columns.retain(column_referenced);
3930 exclude_columns.retain(column_referenced);
3931
3932 new_text_columns.extend(text_columns);
3934 new_exclude_columns.extend(exclude_columns);
3935
3936 if !new_text_columns.is_empty() {
3938 new_text_columns.sort();
3939 let new_text_columns = new_text_columns
3940 .into_iter()
3941 .map(WithOptionValue::UnresolvedItemName)
3942 .collect();
3943
3944 curr_options.push(MySqlConfigOption {
3945 name: MySqlConfigOptionName::TextColumns,
3946 value: Some(WithOptionValue::Sequence(new_text_columns)),
3947 });
3948 }
3949 if !new_exclude_columns.is_empty() {
3951 new_exclude_columns.sort();
3952 let new_exclude_columns = new_exclude_columns
3953 .into_iter()
3954 .map(WithOptionValue::UnresolvedItemName)
3955 .collect();
3956
3957 curr_options.push(MySqlConfigOption {
3958 name: MySqlConfigOptionName::ExcludeColumns,
3959 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3960 });
3961 }
3962 }
3963 CreateSourceConnection::SqlServer {
3964 options: curr_options,
3965 ..
3966 } => {
3967 let mz_sql::plan::SqlServerConfigOptionExtracted {
3968 mut text_columns,
3969 mut exclude_columns,
3970 ..
3971 } = curr_options.clone().try_into()?;
3972
3973 curr_options.retain(|o| {
3976 !matches!(
3977 o.name,
3978 SqlServerConfigOptionName::TextColumns
3979 | SqlServerConfigOptionName::ExcludeColumns
3980 )
3981 });
3982
3983 let column_referenced =
3985 |column_qualified_reference: &UnresolvedItemName| {
3986 mz_ore::soft_assert_eq_or_log!(
3987 column_qualified_reference.0.len(),
3988 3,
3989 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3990 );
3991 let mut table = column_qualified_reference.clone();
3992 table.0.truncate(2);
3993 curr_references.contains(&table)
3994 };
3995 text_columns.retain(column_referenced);
3996 exclude_columns.retain(column_referenced);
3997
3998 new_text_columns.extend(text_columns);
4000 new_exclude_columns.extend(exclude_columns);
4001
4002 if !new_text_columns.is_empty() {
4004 new_text_columns.sort();
4005 let new_text_columns = new_text_columns
4006 .into_iter()
4007 .map(WithOptionValue::UnresolvedItemName)
4008 .collect();
4009
4010 curr_options.push(SqlServerConfigOption {
4011 name: SqlServerConfigOptionName::TextColumns,
4012 value: Some(WithOptionValue::Sequence(new_text_columns)),
4013 });
4014 }
4015 if !new_exclude_columns.is_empty() {
4017 new_exclude_columns.sort();
4018 let new_exclude_columns = new_exclude_columns
4019 .into_iter()
4020 .map(WithOptionValue::UnresolvedItemName)
4021 .collect();
4022
4023 curr_options.push(SqlServerConfigOption {
4024 name: SqlServerConfigOptionName::ExcludeColumns,
4025 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4026 });
4027 }
4028 }
4029 _ => return Err(purification_err()),
4030 };
4031
4032 let mut catalog = self.catalog().for_system_session();
4033 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4034
4035 let plan = match mz_sql::plan::plan(
4037 None,
4038 &catalog,
4039 Statement::CreateSource(create_source_stmt),
4040 &Params::empty(),
4041 &resolved_ids,
4042 )
4043 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4044 {
4045 Plan::CreateSource(plan) => plan,
4046 _ => unreachable!("create source plan is only valid response"),
4047 };
4048
4049 let source = Source::new(
4053 plan,
4054 cur_source.global_id,
4055 resolved_ids,
4056 cur_source.custom_logical_compaction_window,
4057 cur_source.is_retained_metrics_object,
4058 );
4059
4060 let desc = match &source.data_source {
4062 DataSourceDesc::Ingestion { desc, .. }
4063 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4064 desc.clone().into_inline_connection(self.catalog().state())
4065 }
4066 _ => unreachable!("already verified of type ingestion"),
4067 };
4068
4069 self.controller
4070 .storage
4071 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4072 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4073
4074 let mut ops = vec![catalog::Op::UpdateItem {
4077 id: item_id,
4078 name: self.catalog.get_entry(&item_id).name().clone(),
4081 to_item: CatalogItem::Source(source),
4082 }];
4083
4084 let CreateSourceInner {
4085 ops: new_ops,
4086 sources: _,
4087 if_not_exists_ids,
4088 } = self.create_source_inner(session, subsources).await?;
4089
4090 ops.extend(new_ops.into_iter());
4091
4092 assert!(
4093 if_not_exists_ids.is_empty(),
4094 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4095 );
4096
4097 self.catalog_transact(Some(session), ops).await?;
4098 }
4099 plan::AlterSourceAction::RefreshReferences { references } => {
4100 self.catalog_transact(
4101 Some(session),
4102 vec![catalog::Op::UpdateSourceReferences {
4103 source_id: item_id,
4104 references: references.into(),
4105 }],
4106 )
4107 .await?;
4108 }
4109 }
4110
4111 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4112 }
4113
4114 #[instrument]
4115 pub(super) async fn sequence_alter_system_set(
4116 &mut self,
4117 session: &Session,
4118 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4119 ) -> Result<ExecuteResponse, AdapterError> {
4120 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4121 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4123 self.validate_alter_system_network_policy(session, &value)?;
4124 }
4125
4126 let op = match value {
4127 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4128 name: name.clone(),
4129 value: OwnedVarInput::SqlSet(values),
4130 },
4131 plan::VariableValue::Default => {
4132 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4133 }
4134 };
4135 self.catalog_transact(Some(session), vec![op]).await?;
4136
4137 session.add_notice(AdapterNotice::VarDefaultUpdated {
4138 role: None,
4139 var_name: Some(name),
4140 });
4141 Ok(ExecuteResponse::AlteredSystemConfiguration)
4142 }
4143
4144 #[instrument]
4145 pub(super) async fn sequence_alter_system_reset(
4146 &mut self,
4147 session: &Session,
4148 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4149 ) -> Result<ExecuteResponse, AdapterError> {
4150 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4151 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4152 self.catalog_transact(Some(session), vec![op]).await?;
4153 session.add_notice(AdapterNotice::VarDefaultUpdated {
4154 role: None,
4155 var_name: Some(name),
4156 });
4157 Ok(ExecuteResponse::AlteredSystemConfiguration)
4158 }
4159
4160 #[instrument]
4161 pub(super) async fn sequence_alter_system_reset_all(
4162 &mut self,
4163 session: &Session,
4164 _: plan::AlterSystemResetAllPlan,
4165 ) -> Result<ExecuteResponse, AdapterError> {
4166 self.is_user_allowed_to_alter_system(session, None)?;
4167 let op = catalog::Op::ResetAllSystemConfiguration;
4168 self.catalog_transact(Some(session), vec![op]).await?;
4169 session.add_notice(AdapterNotice::VarDefaultUpdated {
4170 role: None,
4171 var_name: None,
4172 });
4173 Ok(ExecuteResponse::AlteredSystemConfiguration)
4174 }
4175
4176 fn is_user_allowed_to_alter_system(
4178 &self,
4179 session: &Session,
4180 var_name: Option<&str>,
4181 ) -> Result<(), AdapterError> {
4182 match (session.user().kind(), var_name) {
4183 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4185 (UserKind::Superuser, Some(name))
4187 if session.user().is_internal()
4188 || self.catalog().system_config().user_modifiable(name) =>
4189 {
4190 let var = self.catalog().system_config().get(name)?;
4193 var.visible(session.user(), self.catalog().system_config())?;
4194 Ok(())
4195 }
4196 (UserKind::Regular, Some(name))
4199 if self.catalog().system_config().user_modifiable(name) =>
4200 {
4201 Err(AdapterError::Unauthorized(
4202 rbac::UnauthorizedError::Superuser {
4203 action: format!("toggle the '{name}' system configuration parameter"),
4204 },
4205 ))
4206 }
4207 _ => Err(AdapterError::Unauthorized(
4208 rbac::UnauthorizedError::MzSystem {
4209 action: "alter system".into(),
4210 },
4211 )),
4212 }
4213 }
4214
4215 fn validate_alter_system_network_policy(
4216 &self,
4217 session: &Session,
4218 policy_value: &plan::VariableValue,
4219 ) -> Result<(), AdapterError> {
4220 let policy_name = match &policy_value {
4221 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4223 plan::VariableValue::Values(values) if values.len() == 1 => {
4224 values.iter().next().cloned()
4225 }
4226 plan::VariableValue::Values(values) => {
4227 tracing::warn!(?values, "can't set multiple network policies at once");
4228 None
4229 }
4230 };
4231 let maybe_network_policy = policy_name
4232 .as_ref()
4233 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4234 let Some(network_policy) = maybe_network_policy else {
4235 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4236 VarError::InvalidParameterValue {
4237 name: NETWORK_POLICY.name(),
4238 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4239 reason: "no network policy with such name exists".to_string(),
4240 },
4241 )));
4242 };
4243 self.validate_alter_network_policy(session, &network_policy.rules)
4244 }
4245
4246 fn validate_alter_network_policy(
4251 &self,
4252 session: &Session,
4253 policy_rules: &Vec<NetworkPolicyRule>,
4254 ) -> Result<(), AdapterError> {
4255 if session.user().is_internal() {
4258 return Ok(());
4259 }
4260 if let Some(ip) = session.meta().client_ip() {
4261 validate_ip_with_policy_rules(ip, policy_rules)
4262 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4263 } else {
4264 return Err(AdapterError::NetworkPolicyDenied(
4267 NetworkPolicyError::MissingIp,
4268 ));
4269 }
4270 Ok(())
4271 }
4272
4273 #[instrument]
4275 pub(super) fn sequence_execute(
4276 &self,
4277 session: &mut Session,
4278 plan: plan::ExecutePlan,
4279 ) -> Result<String, AdapterError> {
4280 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4282 let ps = session
4283 .get_prepared_statement_unverified(&plan.name)
4284 .expect("known to exist");
4285 let stmt = ps.stmt().cloned();
4286 let desc = ps.desc().clone();
4287 let state_revision = ps.state_revision;
4288 let logging = Arc::clone(ps.logging());
4289 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4290 }
4291
4292 #[instrument]
4293 pub(super) async fn sequence_grant_privileges(
4294 &mut self,
4295 session: &Session,
4296 plan::GrantPrivilegesPlan {
4297 update_privileges,
4298 grantees,
4299 }: plan::GrantPrivilegesPlan,
4300 ) -> Result<ExecuteResponse, AdapterError> {
4301 self.sequence_update_privileges(
4302 session,
4303 update_privileges,
4304 grantees,
4305 UpdatePrivilegeVariant::Grant,
4306 )
4307 .await
4308 }
4309
4310 #[instrument]
4311 pub(super) async fn sequence_revoke_privileges(
4312 &mut self,
4313 session: &Session,
4314 plan::RevokePrivilegesPlan {
4315 update_privileges,
4316 revokees,
4317 }: plan::RevokePrivilegesPlan,
4318 ) -> Result<ExecuteResponse, AdapterError> {
4319 self.sequence_update_privileges(
4320 session,
4321 update_privileges,
4322 revokees,
4323 UpdatePrivilegeVariant::Revoke,
4324 )
4325 .await
4326 }
4327
4328 #[instrument]
4329 async fn sequence_update_privileges(
4330 &mut self,
4331 session: &Session,
4332 update_privileges: Vec<UpdatePrivilege>,
4333 grantees: Vec<RoleId>,
4334 variant: UpdatePrivilegeVariant,
4335 ) -> Result<ExecuteResponse, AdapterError> {
4336 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4337 let mut warnings = Vec::new();
4338 let catalog = self.catalog().for_session(session);
4339
4340 for UpdatePrivilege {
4341 acl_mode,
4342 target_id,
4343 grantor,
4344 } in update_privileges
4345 {
4346 let actual_object_type = catalog.get_system_object_type(&target_id);
4347 if actual_object_type.is_relation() {
4350 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4351 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4352 if !non_applicable_privileges.is_empty() {
4353 let object_description =
4354 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4355 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4356 non_applicable_privileges,
4357 object_description,
4358 })
4359 }
4360 }
4361
4362 if let SystemObjectId::Object(object_id) = &target_id {
4363 self.catalog()
4364 .ensure_not_reserved_object(object_id, session.conn_id())?;
4365 }
4366
4367 let privileges = self
4368 .catalog()
4369 .get_privileges(&target_id, session.conn_id())
4370 .ok_or(AdapterError::Unsupported(
4373 "GRANTs/REVOKEs on an object type with no privileges",
4374 ))?;
4375
4376 for grantee in &grantees {
4377 self.catalog().ensure_not_system_role(grantee)?;
4378 self.catalog().ensure_not_predefined_role(grantee)?;
4379 let existing_privilege = privileges
4380 .get_acl_item(grantee, &grantor)
4381 .map(Cow::Borrowed)
4382 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4383
4384 match variant {
4385 UpdatePrivilegeVariant::Grant
4386 if !existing_privilege.acl_mode.contains(acl_mode) =>
4387 {
4388 ops.push(catalog::Op::UpdatePrivilege {
4389 target_id: target_id.clone(),
4390 privilege: MzAclItem {
4391 grantee: *grantee,
4392 grantor,
4393 acl_mode,
4394 },
4395 variant,
4396 });
4397 }
4398 UpdatePrivilegeVariant::Revoke
4399 if !existing_privilege
4400 .acl_mode
4401 .intersection(acl_mode)
4402 .is_empty() =>
4403 {
4404 ops.push(catalog::Op::UpdatePrivilege {
4405 target_id: target_id.clone(),
4406 privilege: MzAclItem {
4407 grantee: *grantee,
4408 grantor,
4409 acl_mode,
4410 },
4411 variant,
4412 });
4413 }
4414 _ => {}
4416 }
4417 }
4418 }
4419
4420 if ops.is_empty() {
4421 session.add_notices(warnings);
4422 return Ok(variant.into());
4423 }
4424
4425 let res = self
4426 .catalog_transact(Some(session), ops)
4427 .await
4428 .map(|_| match variant {
4429 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4430 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4431 });
4432 if res.is_ok() {
4433 session.add_notices(warnings);
4434 }
4435 res
4436 }
4437
4438 #[instrument]
4439 pub(super) async fn sequence_alter_default_privileges(
4440 &mut self,
4441 session: &Session,
4442 plan::AlterDefaultPrivilegesPlan {
4443 privilege_objects,
4444 privilege_acl_items,
4445 is_grant,
4446 }: plan::AlterDefaultPrivilegesPlan,
4447 ) -> Result<ExecuteResponse, AdapterError> {
4448 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4449 let variant = if is_grant {
4450 UpdatePrivilegeVariant::Grant
4451 } else {
4452 UpdatePrivilegeVariant::Revoke
4453 };
4454 for privilege_object in &privilege_objects {
4455 self.catalog()
4456 .ensure_not_system_role(&privilege_object.role_id)?;
4457 self.catalog()
4458 .ensure_not_predefined_role(&privilege_object.role_id)?;
4459 if let Some(database_id) = privilege_object.database_id {
4460 self.catalog()
4461 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4462 }
4463 if let Some(schema_id) = privilege_object.schema_id {
4464 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4465 let schema_spec: SchemaSpecifier = schema_id.into();
4466
4467 self.catalog().ensure_not_reserved_object(
4468 &(database_spec, schema_spec).into(),
4469 session.conn_id(),
4470 )?;
4471 }
4472 for privilege_acl_item in &privilege_acl_items {
4473 self.catalog()
4474 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4475 self.catalog()
4476 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4477 ops.push(catalog::Op::UpdateDefaultPrivilege {
4478 privilege_object: privilege_object.clone(),
4479 privilege_acl_item: privilege_acl_item.clone(),
4480 variant,
4481 })
4482 }
4483 }
4484
4485 self.catalog_transact(Some(session), ops).await?;
4486 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4487 }
4488
4489 #[instrument]
4490 pub(super) async fn sequence_grant_role(
4491 &mut self,
4492 session: &Session,
4493 plan::GrantRolePlan {
4494 role_ids,
4495 member_ids,
4496 grantor_id,
4497 }: plan::GrantRolePlan,
4498 ) -> Result<ExecuteResponse, AdapterError> {
4499 let catalog = self.catalog();
4500 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4501 for role_id in role_ids {
4502 for member_id in &member_ids {
4503 let member_membership: BTreeSet<_> =
4504 catalog.get_role(member_id).membership().keys().collect();
4505 if member_membership.contains(&role_id) {
4506 let role_name = catalog.get_role(&role_id).name().to_string();
4507 let member_name = catalog.get_role(member_id).name().to_string();
4508 catalog.ensure_not_reserved_role(member_id)?;
4510 catalog.ensure_grantable_role(&role_id)?;
4511 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4512 role_name,
4513 member_name,
4514 });
4515 } else {
4516 ops.push(catalog::Op::GrantRole {
4517 role_id,
4518 member_id: *member_id,
4519 grantor_id,
4520 });
4521 }
4522 }
4523 }
4524
4525 if ops.is_empty() {
4526 return Ok(ExecuteResponse::GrantedRole);
4527 }
4528
4529 self.catalog_transact(Some(session), ops)
4530 .await
4531 .map(|_| ExecuteResponse::GrantedRole)
4532 }
4533
4534 #[instrument]
4535 pub(super) async fn sequence_revoke_role(
4536 &mut self,
4537 session: &Session,
4538 plan::RevokeRolePlan {
4539 role_ids,
4540 member_ids,
4541 grantor_id,
4542 }: plan::RevokeRolePlan,
4543 ) -> Result<ExecuteResponse, AdapterError> {
4544 let catalog = self.catalog();
4545 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4546 for role_id in role_ids {
4547 for member_id in &member_ids {
4548 let member_membership: BTreeSet<_> =
4549 catalog.get_role(member_id).membership().keys().collect();
4550 if !member_membership.contains(&role_id) {
4551 let role_name = catalog.get_role(&role_id).name().to_string();
4552 let member_name = catalog.get_role(member_id).name().to_string();
4553 catalog.ensure_not_reserved_role(member_id)?;
4555 catalog.ensure_grantable_role(&role_id)?;
4556 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4557 role_name,
4558 member_name,
4559 });
4560 } else {
4561 ops.push(catalog::Op::RevokeRole {
4562 role_id,
4563 member_id: *member_id,
4564 grantor_id,
4565 });
4566 }
4567 }
4568 }
4569
4570 if ops.is_empty() {
4571 return Ok(ExecuteResponse::RevokedRole);
4572 }
4573
4574 self.catalog_transact(Some(session), ops)
4575 .await
4576 .map(|_| ExecuteResponse::RevokedRole)
4577 }
4578
4579 #[instrument]
4580 pub(super) async fn sequence_alter_owner(
4581 &mut self,
4582 session: &Session,
4583 plan::AlterOwnerPlan {
4584 id,
4585 object_type,
4586 new_owner,
4587 }: plan::AlterOwnerPlan,
4588 ) -> Result<ExecuteResponse, AdapterError> {
4589 let mut ops = vec![catalog::Op::UpdateOwner {
4590 id: id.clone(),
4591 new_owner,
4592 }];
4593
4594 match &id {
4595 ObjectId::Item(global_id) => {
4596 let entry = self.catalog().get_entry(global_id);
4597
4598 if entry.is_index() {
4600 let name = self
4601 .catalog()
4602 .resolve_full_name(entry.name(), Some(session.conn_id()))
4603 .to_string();
4604 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4605 return Ok(ExecuteResponse::AlteredObject(object_type));
4606 }
4607
4608 let dependent_index_ops = entry
4610 .used_by()
4611 .into_iter()
4612 .filter(|id| self.catalog().get_entry(id).is_index())
4613 .map(|id| catalog::Op::UpdateOwner {
4614 id: ObjectId::Item(*id),
4615 new_owner,
4616 });
4617 ops.extend(dependent_index_ops);
4618
4619 let dependent_subsources =
4621 entry
4622 .progress_id()
4623 .into_iter()
4624 .map(|item_id| catalog::Op::UpdateOwner {
4625 id: ObjectId::Item(item_id),
4626 new_owner,
4627 });
4628 ops.extend(dependent_subsources);
4629 }
4630 ObjectId::Cluster(cluster_id) => {
4631 let cluster = self.catalog().get_cluster(*cluster_id);
4632 let managed_cluster_replica_ops =
4634 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4635 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4636 new_owner,
4637 });
4638 ops.extend(managed_cluster_replica_ops);
4639 }
4640 _ => {}
4641 }
4642
4643 self.catalog_transact(Some(session), ops)
4644 .await
4645 .map(|_| ExecuteResponse::AlteredObject(object_type))
4646 }
4647
4648 #[instrument]
4649 pub(super) async fn sequence_reassign_owned(
4650 &mut self,
4651 session: &Session,
4652 plan::ReassignOwnedPlan {
4653 old_roles,
4654 new_role,
4655 reassign_ids,
4656 }: plan::ReassignOwnedPlan,
4657 ) -> Result<ExecuteResponse, AdapterError> {
4658 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4659 self.catalog().ensure_not_reserved_role(role_id)?;
4660 }
4661
4662 let ops = reassign_ids
4663 .into_iter()
4664 .map(|id| catalog::Op::UpdateOwner {
4665 id,
4666 new_owner: new_role,
4667 })
4668 .collect();
4669
4670 self.catalog_transact(Some(session), ops)
4671 .await
4672 .map(|_| ExecuteResponse::ReassignOwned)
4673 }
4674
4675 #[instrument]
4676 pub(crate) async fn handle_deferred_statement(&mut self) {
4677 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4681 return;
4682 };
4683 match ps {
4684 crate::coord::PlanStatement::Statement { stmt, params } => {
4685 self.handle_execute_inner(stmt, params, ctx).await;
4686 }
4687 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4688 self.sequence_plan(ctx, plan, resolved_ids).await;
4689 }
4690 }
4691 }
4692
4693 #[instrument]
4694 #[allow(clippy::unused_async)]
4696 pub(super) async fn sequence_alter_table(
4697 &mut self,
4698 ctx: &mut ExecuteContext,
4699 plan: plan::AlterTablePlan,
4700 ) -> Result<ExecuteResponse, AdapterError> {
4701 let plan::AlterTablePlan {
4702 relation_id,
4703 column_name,
4704 column_type,
4705 raw_sql_type,
4706 } = plan;
4707
4708 let (_, new_global_id) = self.allocate_user_id().await?;
4710 let ops = vec![catalog::Op::AlterAddColumn {
4711 id: relation_id,
4712 new_global_id,
4713 name: column_name,
4714 typ: column_type,
4715 sql: raw_sql_type,
4716 }];
4717
4718 self.catalog_transact_with_context(None, Some(ctx), ops)
4719 .await?;
4720
4721 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4722 }
4723
4724 #[instrument]
4726 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4727 &mut self,
4728 ctx: ExecuteContext,
4729 plan: AlterMaterializedViewApplyReplacementPlan,
4730 ) {
4731 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4741
4742 let plan_validity = PlanValidity::new(
4743 self.catalog().transient_revision(),
4744 BTreeSet::from_iter([id, replacement_id]),
4745 None,
4746 None,
4747 ctx.session().role_metadata().clone(),
4748 );
4749
4750 let target = self.catalog.get_entry(&id);
4751 let target_gid = target.latest_global_id();
4752
4753 let replacement = self.catalog.get_entry(&replacement_id);
4754 let replacement_gid = replacement.latest_global_id();
4755
4756 let target_upper = self
4757 .controller
4758 .storage_collections
4759 .collection_frontiers(target_gid)
4760 .expect("target MV exists")
4761 .write_frontier;
4762 let replacement_upper = self
4763 .controller
4764 .compute
4765 .collection_frontiers(replacement_gid, replacement.cluster_id())
4766 .expect("replacement MV exists")
4767 .write_frontier;
4768
4769 info!(
4770 %id, %replacement_id, ?target_upper, ?replacement_upper,
4771 "preparing materialized view replacement application",
4772 );
4773
4774 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4775 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4784 name: target.name().item.clone(),
4785 }));
4786 return;
4787 };
4788
4789 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4793
4794 self.install_storage_watch_set(
4798 ctx.session().conn_id().clone(),
4799 BTreeSet::from_iter([target_gid]),
4800 replacement_upper_ts,
4801 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4802 ctx: Some(ctx),
4803 otel_ctx: OpenTelemetryContext::obtain(),
4804 plan,
4805 plan_validity,
4806 }),
4807 )
4808 .expect("target collection exists");
4809 }
4810
4811 #[instrument]
4813 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4814 &mut self,
4815 mut ctx: AlterMaterializedViewReadyContext,
4816 ) {
4817 ctx.otel_ctx.attach_as_parent();
4818
4819 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4820
4821 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4826 ctx.retire(Err(err));
4827 return;
4828 }
4829
4830 info!(
4831 %id, %replacement_id,
4832 "finishing materialized view replacement application",
4833 );
4834
4835 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4836 match self
4837 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4838 .await
4839 {
4840 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4841 ObjectType::MaterializedView,
4842 ))),
4843 Err(err) => ctx.retire(Err(err)),
4844 }
4845 }
4846
4847 pub(super) async fn statistics_oracle(
4848 &self,
4849 session: &Session,
4850 source_ids: &BTreeSet<GlobalId>,
4851 query_as_of: &Antichain<Timestamp>,
4852 is_oneshot: bool,
4853 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4854 super::statistics_oracle(
4855 session,
4856 source_ids,
4857 query_as_of,
4858 is_oneshot,
4859 self.catalog().system_config(),
4860 self.controller.storage_collections.as_ref(),
4861 )
4862 .await
4863 }
4864}
4865
4866impl Coordinator {
4867 async fn process_dataflow_metainfo(
4869 &mut self,
4870 df_meta: DataflowMetainfo,
4871 export_id: GlobalId,
4872 ctx: Option<&mut ExecuteContext>,
4873 notice_ids: Vec<GlobalId>,
4874 ) -> Option<BuiltinTableAppendNotify> {
4875 if let Some(ctx) = ctx {
4877 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4878 }
4879
4880 let df_meta = self
4882 .catalog()
4883 .render_notices(df_meta, notice_ids, Some(export_id));
4884
4885 if self.catalog().state().system_config().enable_mz_notices()
4888 && !df_meta.optimizer_notices.is_empty()
4889 {
4890 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4891 self.catalog().state().pack_optimizer_notices(
4892 &mut builtin_table_updates,
4893 df_meta.optimizer_notices.iter(),
4894 Diff::ONE,
4895 );
4896
4897 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4899
4900 Some(
4901 self.builtin_table_update()
4902 .execute(builtin_table_updates)
4903 .await
4904 .0,
4905 )
4906 } else {
4907 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4909
4910 None
4911 }
4912 }
4913}