1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument, soft_assert_or_log};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44 RowIterator, Timestamp,
45};
46use mz_sql::ast::{
47 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
48 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
49 SqlServerConfigOptionName,
50};
51use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
52use mz_sql::catalog::{
53 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
54 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
55 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
56};
57use mz_sql::names::{
58 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
59 SchemaSpecifier, SystemObjectId,
60};
61use mz_sql::plan::{
62 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63 StatementContext,
64};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_storage_types::sources::GenericSourceConnection;
68use mz_sql::plan::{
70 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
78 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85 WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::ExportDescription;
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_transform::dataflow::DataflowMetainfo;
93use smallvec::SmallVec;
94use timely::progress::Antichain;
95use tokio::sync::{oneshot, watch};
96use tracing::{Instrument, Span, info, warn};
97
98use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
99use crate::command::{ExecuteResponse, Response};
100use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
101use crate::coord::sequencer::emit_optimizer_notices;
102use crate::coord::{
103 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
104 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
105 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
106 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
107 WatchSetResponse, validate_ip_with_policy_rules,
108};
109use crate::error::AdapterError;
110use crate::notice::{AdapterNotice, DroppedInUseIndex};
111use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
112use crate::optimize::{self, Optimize};
113use crate::session::{
114 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
115 WriteLocks, WriteOp,
116};
117use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
118use crate::{PeekResponseUnary, ReadHolds};
119
120type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
122
123mod cluster;
124mod copy_from;
125mod create_continual_task;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134macro_rules! return_if_err {
137 ($expr:expr, $ctx:expr) => {
138 match $expr {
139 Ok(v) => v,
140 Err(e) => return $ctx.retire(Err(e.into())),
141 }
142 };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148 ops: Vec<catalog::Op>,
149 dropped_active_db: bool,
150 dropped_active_cluster: bool,
151 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154struct CreateSourceInner {
156 ops: Vec<catalog::Op>,
157 sources: Vec<(CatalogItemId, Source)>,
158 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162 pub(crate) async fn sequence_staged<S>(
167 &mut self,
168 mut ctx: S::Ctx,
169 parent_span: Span,
170 mut stage: S,
171 ) where
172 S: Staged + 'static,
173 S::Ctx: Send + 'static,
174 {
175 return_if_err!(stage.validity().check(self.catalog()), ctx);
176 loop {
177 let mut cancel_enabled = stage.cancel_enabled();
178 if let Some(session) = ctx.session() {
179 if cancel_enabled {
180 if let Some((_prev_tx, prev_rx)) = self
183 .staged_cancellation
184 .insert(session.conn_id().clone(), watch::channel(false))
185 {
186 let was_canceled = *prev_rx.borrow();
187 if was_canceled {
188 ctx.retire(Err(AdapterError::Canceled));
189 return;
190 }
191 }
192 } else {
193 self.staged_cancellation.remove(session.conn_id());
196 }
197 } else {
198 cancel_enabled = false
199 };
200 let next = stage
201 .stage(self, &mut ctx)
202 .instrument(parent_span.clone())
203 .await;
204 let res = return_if_err!(next, ctx);
205 stage = match res {
206 StageResult::Handle(handle) => {
207 let internal_cmd_tx = self.internal_cmd_tx.clone();
208 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210 });
211 return;
212 }
213 StageResult::HandleRetire(handle) => {
214 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215 ctx.retire(Ok(resp));
216 });
217 return;
218 }
219 StageResult::Response(resp) => {
220 ctx.retire(Ok(resp));
221 return;
222 }
223 StageResult::Immediate(stage) => *stage,
224 }
225 }
226 }
227
228 fn handle_spawn<C, T, F>(
229 &self,
230 ctx: C,
231 handle: JoinHandle<Result<T, AdapterError>>,
232 cancel_enabled: bool,
233 f: F,
234 ) where
235 C: StagedContext + Send + 'static,
236 T: Send + 'static,
237 F: FnOnce(C, T) + Send + 'static,
238 {
239 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240 .session()
241 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242 {
243 let mut rx = rx.clone();
244 Box::pin(async move {
245 let _ = rx.wait_for(|v| *v).await;
247 ()
248 })
249 } else {
250 Box::pin(future::pending())
251 };
252 spawn(|| "sequence_staged", async move {
253 tokio::select! {
254 res = handle => {
255 let next = return_if_err!(res, ctx);
256 f(ctx, next);
257 }
258 _ = rx, if cancel_enabled => {
259 ctx.retire(Err(AdapterError::Canceled));
260 }
261 }
262 });
263 }
264
265 async fn create_source_inner(
266 &self,
267 session: &Session,
268 plans: Vec<plan::CreateSourcePlanBundle>,
269 ) -> Result<CreateSourceInner, AdapterError> {
270 let mut ops = vec![];
271 let mut sources = vec![];
272
273 let if_not_exists_ids = plans
274 .iter()
275 .filter_map(
276 |plan::CreateSourcePlanBundle {
277 item_id,
278 global_id: _,
279 plan,
280 resolved_ids: _,
281 available_source_references: _,
282 }| {
283 if plan.if_not_exists {
284 Some((*item_id, plan.name.clone()))
285 } else {
286 None
287 }
288 },
289 )
290 .collect::<BTreeMap<_, _>>();
291
292 for plan::CreateSourcePlanBundle {
293 item_id,
294 global_id,
295 mut plan,
296 resolved_ids,
297 available_source_references,
298 } in plans
299 {
300 let name = plan.name.clone();
301
302 match plan.source.data_source {
303 plan::DataSourceDesc::Ingestion(ref desc)
304 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
305 let cluster_id = plan
306 .in_cluster
307 .expect("ingestion plans must specify cluster");
308 match desc.connection {
309 GenericSourceConnection::Postgres(_)
310 | GenericSourceConnection::MySql(_)
311 | GenericSourceConnection::SqlServer(_)
312 | GenericSourceConnection::Kafka(_)
313 | GenericSourceConnection::LoadGenerator(_) => {
314 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
315 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
316 .get(self.catalog().system_config().dyncfgs());
317
318 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
319 {
320 return Err(AdapterError::Unsupported(
321 "sources in clusters with >1 replicas",
322 ));
323 }
324 }
325 }
326 }
327 }
328 plan::DataSourceDesc::Webhook { .. } => {
329 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
330 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
331 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
332 .get(self.catalog().system_config().dyncfgs());
333
334 if !enable_multi_replica_sources {
335 if cluster.replica_ids().len() > 1 {
336 return Err(AdapterError::Unsupported(
337 "webhook sources in clusters with >1 replicas",
338 ));
339 }
340 }
341 }
342 }
343 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
344 }
345
346 if let mz_sql::plan::DataSourceDesc::Webhook {
348 validate_using: Some(validate),
349 ..
350 } = &mut plan.source.data_source
351 {
352 if let Err(reason) = validate.reduce_expression().await {
353 self.metrics
354 .webhook_validation_reduce_failures
355 .with_label_values(&[reason])
356 .inc();
357 return Err(AdapterError::Internal(format!(
358 "failed to reduce check expression, {reason}"
359 )));
360 }
361 }
362
363 let mut reference_ops = vec![];
366 if let Some(references) = &available_source_references {
367 reference_ops.push(catalog::Op::UpdateSourceReferences {
368 source_id: item_id,
369 references: references.clone().into(),
370 });
371 }
372
373 let source = Source::new(plan, global_id, resolved_ids, None, false);
374 ops.push(catalog::Op::CreateItem {
375 id: item_id,
376 name,
377 item: CatalogItem::Source(source.clone()),
378 owner_id: *session.current_role_id(),
379 });
380 sources.push((item_id, source));
381 ops.extend(reference_ops);
383 }
384
385 Ok(CreateSourceInner {
386 ops,
387 sources,
388 if_not_exists_ids,
389 })
390 }
391
392 pub(crate) fn plan_subsource(
400 &self,
401 session: &Session,
402 params: &mz_sql::plan::Params,
403 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
404 item_id: CatalogItemId,
405 global_id: GlobalId,
406 ) -> Result<CreateSourcePlanBundle, AdapterError> {
407 let catalog = self.catalog().for_session(session);
408 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
409
410 let plan = self.plan_statement(
411 session,
412 Statement::CreateSubsource(subsource_stmt),
413 params,
414 &resolved_ids,
415 )?;
416 let plan = match plan {
417 Plan::CreateSource(plan) => plan,
418 _ => unreachable!(),
419 };
420 Ok(CreateSourcePlanBundle {
421 item_id,
422 global_id,
423 plan,
424 resolved_ids,
425 available_source_references: None,
426 })
427 }
428
429 pub(crate) async fn plan_purified_alter_source_add_subsource(
431 &mut self,
432 session: &Session,
433 params: Params,
434 source_name: ResolvedItemName,
435 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
436 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
437 ) -> Result<(Plan, ResolvedIds), AdapterError> {
438 let mut subsource_plans = Vec::with_capacity(subsources.len());
439
440 let conn_catalog = self.catalog().for_system_session();
442 let pcx = plan::PlanContext::zero();
443 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
444
445 let entry = self.catalog().get_entry(source_name.item_id());
446 let source = entry.source().ok_or_else(|| {
447 AdapterError::internal(
448 "plan alter source",
449 format!("expected Source found {entry:?}"),
450 )
451 })?;
452
453 let item_id = entry.id();
454 let ingestion_id = source.global_id();
455 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
456
457 let ids = self
458 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
459 .await?;
460 for (subsource_stmt, (item_id, global_id)) in
461 subsource_stmts.into_iter().zip_eq(ids.into_iter())
462 {
463 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
464 subsource_plans.push(s);
465 }
466
467 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
468 subsources: subsource_plans,
469 options,
470 };
471
472 Ok((
473 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
474 item_id,
475 ingestion_id,
476 action,
477 }),
478 ResolvedIds::empty(),
479 ))
480 }
481
482 pub(crate) fn plan_purified_alter_source_refresh_references(
484 &self,
485 _session: &Session,
486 _params: Params,
487 source_name: ResolvedItemName,
488 available_source_references: plan::SourceReferences,
489 ) -> Result<(Plan, ResolvedIds), AdapterError> {
490 let entry = self.catalog().get_entry(source_name.item_id());
491 let source = entry.source().ok_or_else(|| {
492 AdapterError::internal(
493 "plan alter source",
494 format!("expected Source found {entry:?}"),
495 )
496 })?;
497 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
498 references: available_source_references,
499 };
500
501 Ok((
502 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
503 item_id: entry.id(),
504 ingestion_id: source.global_id(),
505 action,
506 }),
507 ResolvedIds::empty(),
508 ))
509 }
510
511 pub(crate) async fn plan_purified_create_source(
515 &mut self,
516 ctx: &ExecuteContext,
517 params: Params,
518 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
519 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
520 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
521 available_source_references: plan::SourceReferences,
522 ) -> Result<(Plan, ResolvedIds), AdapterError> {
523 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
524
525 if let Some(progress_stmt) = progress_stmt {
527 assert_none!(progress_stmt.of_source);
532 let (item_id, global_id) = self.allocate_user_id().await?;
533 let progress_plan =
534 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
535 let progress_full_name = self
536 .catalog()
537 .resolve_full_name(&progress_plan.plan.name, None);
538 let progress_subsource = ResolvedItemName::Item {
539 id: progress_plan.item_id,
540 qualifiers: progress_plan.plan.name.qualifiers.clone(),
541 full_name: progress_full_name,
542 print_id: true,
543 version: RelationVersionSelector::Latest,
544 };
545
546 create_source_plans.push(progress_plan);
547
548 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
549 }
550
551 let catalog = self.catalog().for_session(ctx.session());
552 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
553
554 let propagated_with_options: Vec<_> = source_stmt
555 .with_options
556 .iter()
557 .filter_map(|opt| match opt.name {
558 CreateSourceOptionName::TimestampInterval => None,
559 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
560 name: CreateSubsourceOptionName::RetainHistory,
561 value: opt.value.clone(),
562 }),
563 })
564 .collect();
565
566 let source_plan = match self.plan_statement(
568 ctx.session(),
569 Statement::CreateSource(source_stmt),
570 ¶ms,
571 &resolved_ids,
572 )? {
573 Plan::CreateSource(plan) => plan,
574 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
575 };
576
577 let (item_id, global_id) = self.allocate_user_id().await?;
578
579 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
580 let of_source = ResolvedItemName::Item {
581 id: item_id,
582 qualifiers: source_plan.name.qualifiers.clone(),
583 full_name: source_full_name,
584 print_id: true,
585 version: RelationVersionSelector::Latest,
586 };
587
588 let conn_catalog = self.catalog().for_system_session();
590 let pcx = plan::PlanContext::zero();
591 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
592
593 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
594
595 for subsource_stmt in subsource_stmts.iter_mut() {
596 subsource_stmt
597 .with_options
598 .extend(propagated_with_options.iter().cloned())
599 }
600
601 create_source_plans.push(CreateSourcePlanBundle {
602 item_id,
603 global_id,
604 plan: source_plan,
605 resolved_ids: resolved_ids.clone(),
606 available_source_references: Some(available_source_references),
607 });
608
609 let ids = self
611 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
612 .await?;
613 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
614 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
615 create_source_plans.push(plan);
616 }
617
618 Ok((
619 Plan::CreateSources(create_source_plans),
620 ResolvedIds::empty(),
621 ))
622 }
623
624 #[instrument]
625 pub(super) async fn sequence_create_source(
626 &mut self,
627 ctx: &mut ExecuteContext,
628 plans: Vec<plan::CreateSourcePlanBundle>,
629 ) -> Result<ExecuteResponse, AdapterError> {
630 let CreateSourceInner {
631 ops,
632 sources,
633 if_not_exists_ids,
634 } = self.create_source_inner(ctx.session(), plans).await?;
635
636 let transact_result = self
637 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
638 .await;
639
640 for (item_id, source) in &sources {
642 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
643 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
644 ctx.session()
645 .add_notice(AdapterNotice::WebhookSourceCreated { url });
646 }
647 }
648 }
649
650 match transact_result {
651 Ok(()) => Ok(ExecuteResponse::CreatedSource),
652 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
653 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
654 })) if if_not_exists_ids.contains_key(&id) => {
655 ctx.session()
656 .add_notice(AdapterNotice::ObjectAlreadyExists {
657 name: if_not_exists_ids[&id].item.clone(),
658 ty: "source",
659 });
660 Ok(ExecuteResponse::CreatedSource)
661 }
662 Err(err) => Err(err),
663 }
664 }
665
666 #[instrument]
667 pub(super) async fn sequence_create_connection(
668 &mut self,
669 mut ctx: ExecuteContext,
670 plan: plan::CreateConnectionPlan,
671 resolved_ids: ResolvedIds,
672 ) {
673 let (connection_id, connection_gid) = match self.allocate_user_id().await {
674 Ok(item_id) => item_id,
675 Err(err) => return ctx.retire(Err(err)),
676 };
677
678 match &plan.connection.details {
679 ConnectionDetails::Ssh { key_1, key_2, .. } => {
680 let key_1 = match key_1.as_key_pair() {
681 Some(key_1) => key_1.clone(),
682 None => {
683 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
684 "the PUBLIC KEY 1 option cannot be explicitly specified"
685 ))));
686 }
687 };
688
689 let key_2 = match key_2.as_key_pair() {
690 Some(key_2) => key_2.clone(),
691 None => {
692 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
693 "the PUBLIC KEY 2 option cannot be explicitly specified"
694 ))));
695 }
696 };
697
698 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
699 let secret = key_set.to_bytes();
700 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
701 return ctx.retire(Err(err.into()));
702 }
703 self.caching_secrets_reader.invalidate(connection_id);
704 }
705 _ => (),
706 };
707
708 if plan.validate {
709 let internal_cmd_tx = self.internal_cmd_tx.clone();
710 let transient_revision = self.catalog().transient_revision();
711 let conn_id = ctx.session().conn_id().clone();
712 let otel_ctx = OpenTelemetryContext::obtain();
713 let role_metadata = ctx.session().role_metadata().clone();
714
715 let connection = plan
716 .connection
717 .details
718 .to_connection()
719 .into_inline_connection(self.catalog().state());
720
721 let current_storage_parameters = self.controller.storage.config().clone();
722 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
723 let result = match std::panic::AssertUnwindSafe(
724 connection.validate(connection_id, ¤t_storage_parameters),
725 )
726 .ore_catch_unwind()
727 .await
728 {
729 Ok(Ok(())) => Ok(plan),
730 Ok(Err(err)) => Err(err.into()),
731 Err(_panic) => {
732 tracing::error!("connection validation panicked");
733 Err(AdapterError::Internal(
734 "connection validation panicked".into(),
735 ))
736 }
737 };
738
739 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
741 CreateConnectionValidationReady {
742 ctx,
743 result,
744 connection_id,
745 connection_gid,
746 plan_validity: PlanValidity::new(
747 transient_revision,
748 resolved_ids.items().copied().collect(),
749 None,
750 None,
751 role_metadata,
752 ),
753 otel_ctx,
754 resolved_ids: resolved_ids.clone(),
755 },
756 ));
757 if let Err(e) = result {
758 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
759 }
760 });
761 } else {
762 let result = self
763 .sequence_create_connection_stage_finish(
764 &mut ctx,
765 connection_id,
766 connection_gid,
767 plan,
768 resolved_ids,
769 )
770 .await;
771 ctx.retire(result);
772 }
773 }
774
775 #[instrument]
776 pub(crate) async fn sequence_create_connection_stage_finish(
777 &mut self,
778 ctx: &mut ExecuteContext,
779 connection_id: CatalogItemId,
780 connection_gid: GlobalId,
781 plan: plan::CreateConnectionPlan,
782 resolved_ids: ResolvedIds,
783 ) -> Result<ExecuteResponse, AdapterError> {
784 let ops = vec![catalog::Op::CreateItem {
785 id: connection_id,
786 name: plan.name.clone(),
787 item: CatalogItem::Connection(Connection {
788 create_sql: plan.connection.create_sql,
789 global_id: connection_gid,
790 details: plan.connection.details.clone(),
791 resolved_ids,
792 }),
793 owner_id: *ctx.session().current_role_id(),
794 }];
795
796 let conn_id = ctx.session().conn_id().clone();
799 let transact_result = self
800 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
801 .await;
802
803 match transact_result {
804 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
805 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
806 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
807 })) if plan.if_not_exists => {
808 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
811 if let Err(e) = self.secrets_controller.delete(connection_id).await {
812 tracing::warn!(
813 "Dropping SSH secret for existing connection has encountered an error: {}",
814 e
815 );
816 } else {
817 self.caching_secrets_reader.invalidate(connection_id);
818 }
819 }
820 ctx.session()
821 .add_notice(AdapterNotice::ObjectAlreadyExists {
822 name: plan.name.item,
823 ty: "connection",
824 });
825 Ok(ExecuteResponse::CreatedConnection)
826 }
827 Err(err) => {
828 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
831 if let Err(e) = self.secrets_controller.delete(connection_id).await {
832 tracing::warn!(
833 "Dropping SSH secret for failed connection has encountered an error: {}",
834 e
835 );
836 } else {
837 self.caching_secrets_reader.invalidate(connection_id);
838 }
839 }
840 Err(err)
841 }
842 }
843 }
844
845 #[instrument]
846 pub(super) async fn sequence_create_database(
847 &mut self,
848 session: &Session,
849 plan: plan::CreateDatabasePlan,
850 ) -> Result<ExecuteResponse, AdapterError> {
851 let ops = vec![catalog::Op::CreateDatabase {
852 name: plan.name.clone(),
853 owner_id: *session.current_role_id(),
854 }];
855 match self.catalog_transact(Some(session), ops).await {
856 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
857 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
858 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
859 })) if plan.if_not_exists => {
860 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
861 Ok(ExecuteResponse::CreatedDatabase)
862 }
863 Err(err) => Err(err),
864 }
865 }
866
867 #[instrument]
868 pub(super) async fn sequence_create_schema(
869 &mut self,
870 session: &Session,
871 plan: plan::CreateSchemaPlan,
872 ) -> Result<ExecuteResponse, AdapterError> {
873 let op = catalog::Op::CreateSchema {
874 database_id: plan.database_spec,
875 schema_name: plan.schema_name.clone(),
876 owner_id: *session.current_role_id(),
877 };
878 match self.catalog_transact(Some(session), vec![op]).await {
879 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
880 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
881 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
882 })) if plan.if_not_exists => {
883 session.add_notice(AdapterNotice::SchemaAlreadyExists {
884 name: plan.schema_name,
885 });
886 Ok(ExecuteResponse::CreatedSchema)
887 }
888 Err(err) => Err(err),
889 }
890 }
891
892 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
894 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
895 if attributes.superuser.is_some() || attributes.password.is_some() {
896 return Err(AdapterError::UnavailableFeature {
897 feature: "SUPERUSER and PASSWORD attributes".to_string(),
898 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
899 });
900 }
901 }
902 Ok(())
903 }
904
905 #[instrument]
906 pub(super) async fn sequence_create_role(
907 &mut self,
908 conn_id: Option<&ConnectionId>,
909 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
910 ) -> Result<ExecuteResponse, AdapterError> {
911 self.validate_role_attributes(&attributes.clone())?;
912 let op = catalog::Op::CreateRole { name, attributes };
913 self.catalog_transact_with_context(conn_id, None, vec![op])
914 .await
915 .map(|_| ExecuteResponse::CreatedRole)
916 }
917
918 #[instrument]
919 pub(super) async fn sequence_create_network_policy(
920 &mut self,
921 session: &Session,
922 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
923 ) -> Result<ExecuteResponse, AdapterError> {
924 let op = catalog::Op::CreateNetworkPolicy {
925 rules,
926 name,
927 owner_id: *session.current_role_id(),
928 };
929 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
930 .await
931 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
932 }
933
934 #[instrument]
935 pub(super) async fn sequence_alter_network_policy(
936 &mut self,
937 session: &Session,
938 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
939 ) -> Result<ExecuteResponse, AdapterError> {
940 let current_network_policy_name =
942 self.catalog().system_config().default_network_policy_name();
943 if current_network_policy_name == name {
945 self.validate_alter_network_policy(session, &rules)?;
946 }
947
948 let op = catalog::Op::AlterNetworkPolicy {
949 id,
950 rules,
951 name,
952 owner_id: *session.current_role_id(),
953 };
954 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
955 .await
956 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
957 }
958
959 #[instrument]
960 pub(super) async fn sequence_create_table(
961 &mut self,
962 ctx: &mut ExecuteContext,
963 plan: plan::CreateTablePlan,
964 resolved_ids: ResolvedIds,
965 ) -> Result<ExecuteResponse, AdapterError> {
966 let plan::CreateTablePlan {
967 name,
968 table,
969 if_not_exists,
970 } = plan;
971
972 let conn_id = if table.temporary {
973 Some(ctx.session().conn_id())
974 } else {
975 None
976 };
977 let (table_id, global_id) = self.allocate_user_id().await?;
978 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
979
980 let data_source = match table.data_source {
981 plan::TableDataSource::TableWrites { defaults } => {
982 TableDataSource::TableWrites { defaults }
983 }
984 plan::TableDataSource::DataSource {
985 desc: data_source_plan,
986 timeline,
987 } => match data_source_plan {
988 plan::DataSourceDesc::IngestionExport {
989 ingestion_id,
990 external_reference,
991 details,
992 data_config,
993 } => TableDataSource::DataSource {
994 desc: DataSourceDesc::IngestionExport {
995 ingestion_id,
996 external_reference,
997 details,
998 data_config,
999 },
1000 timeline,
1001 },
1002 plan::DataSourceDesc::Webhook {
1003 validate_using,
1004 body_format,
1005 headers,
1006 cluster_id,
1007 } => TableDataSource::DataSource {
1008 desc: DataSourceDesc::Webhook {
1009 validate_using,
1010 body_format,
1011 headers,
1012 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1013 },
1014 timeline,
1015 },
1016 o => {
1017 unreachable!("CREATE TABLE data source got {:?}", o)
1018 }
1019 },
1020 };
1021
1022 let is_webhook = if let TableDataSource::DataSource {
1023 desc: DataSourceDesc::Webhook { .. },
1024 timeline: _,
1025 } = &data_source
1026 {
1027 true
1028 } else {
1029 false
1030 };
1031
1032 let table = Table {
1033 create_sql: Some(table.create_sql),
1034 desc: table.desc,
1035 collections,
1036 conn_id: conn_id.cloned(),
1037 resolved_ids,
1038 custom_logical_compaction_window: table.compaction_window,
1039 is_retained_metrics_object: false,
1040 data_source,
1041 };
1042 let ops = vec![catalog::Op::CreateItem {
1043 id: table_id,
1044 name: name.clone(),
1045 item: CatalogItem::Table(table.clone()),
1046 owner_id: *ctx.session().current_role_id(),
1047 }];
1048
1049 let catalog_result = self
1050 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1051 .await;
1052
1053 if is_webhook {
1054 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1057 ctx.session()
1058 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1059 }
1060 }
1061
1062 match catalog_result {
1063 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1064 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1065 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1066 })) if if_not_exists => {
1067 ctx.session_mut()
1068 .add_notice(AdapterNotice::ObjectAlreadyExists {
1069 name: name.item,
1070 ty: "table",
1071 });
1072 Ok(ExecuteResponse::CreatedTable)
1073 }
1074 Err(err) => Err(err),
1075 }
1076 }
1077
1078 #[instrument]
1079 pub(super) async fn sequence_create_sink(
1080 &mut self,
1081 ctx: ExecuteContext,
1082 plan: plan::CreateSinkPlan,
1083 resolved_ids: ResolvedIds,
1084 ) {
1085 let plan::CreateSinkPlan {
1086 name,
1087 sink,
1088 with_snapshot,
1089 if_not_exists,
1090 in_cluster,
1091 } = plan;
1092
1093 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1095
1096 let catalog_sink = Sink {
1097 create_sql: sink.create_sql,
1098 global_id,
1099 from: sink.from,
1100 connection: sink.connection,
1101 envelope: sink.envelope,
1102 version: sink.version,
1103 with_snapshot,
1104 resolved_ids,
1105 cluster_id: in_cluster,
1106 commit_interval: sink.commit_interval,
1107 };
1108
1109 let ops = vec![catalog::Op::CreateItem {
1110 id: item_id,
1111 name: name.clone(),
1112 item: CatalogItem::Sink(catalog_sink.clone()),
1113 owner_id: *ctx.session().current_role_id(),
1114 }];
1115
1116 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1117
1118 match result {
1119 Ok(()) => {}
1120 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1121 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1122 })) if if_not_exists => {
1123 ctx.session()
1124 .add_notice(AdapterNotice::ObjectAlreadyExists {
1125 name: name.item,
1126 ty: "sink",
1127 });
1128 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1129 return;
1130 }
1131 Err(e) => {
1132 ctx.retire(Err(e));
1133 return;
1134 }
1135 };
1136
1137 self.create_storage_export(global_id, &catalog_sink)
1138 .await
1139 .unwrap_or_terminate("cannot fail to create exports");
1140
1141 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1142 .await;
1143
1144 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1145 }
1146
1147 pub(super) fn validate_system_column_references(
1170 &self,
1171 uses_ambiguous_columns: bool,
1172 depends_on: &BTreeSet<GlobalId>,
1173 ) -> Result<(), AdapterError> {
1174 if uses_ambiguous_columns
1175 && depends_on
1176 .iter()
1177 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1178 {
1179 Err(AdapterError::AmbiguousSystemColumnReference)
1180 } else {
1181 Ok(())
1182 }
1183 }
1184
1185 #[instrument]
1186 pub(super) async fn sequence_create_type(
1187 &mut self,
1188 session: &Session,
1189 plan: plan::CreateTypePlan,
1190 resolved_ids: ResolvedIds,
1191 ) -> Result<ExecuteResponse, AdapterError> {
1192 let (item_id, global_id) = self.allocate_user_id().await?;
1193 plan.typ
1195 .inner
1196 .desc(&self.catalog().for_session(session))
1197 .map_err(AdapterError::from)?;
1198 let typ = Type {
1199 create_sql: Some(plan.typ.create_sql),
1200 global_id,
1201 details: CatalogTypeDetails {
1202 array_id: None,
1203 typ: plan.typ.inner,
1204 pg_metadata: None,
1205 },
1206 resolved_ids,
1207 };
1208 let op = catalog::Op::CreateItem {
1209 id: item_id,
1210 name: plan.name,
1211 item: CatalogItem::Type(typ),
1212 owner_id: *session.current_role_id(),
1213 };
1214 match self.catalog_transact(Some(session), vec![op]).await {
1215 Ok(()) => Ok(ExecuteResponse::CreatedType),
1216 Err(err) => Err(err),
1217 }
1218 }
1219
1220 #[instrument]
1221 pub(super) async fn sequence_comment_on(
1222 &mut self,
1223 session: &Session,
1224 plan: plan::CommentPlan,
1225 ) -> Result<ExecuteResponse, AdapterError> {
1226 let op = catalog::Op::Comment {
1227 object_id: plan.object_id,
1228 sub_component: plan.sub_component,
1229 comment: plan.comment,
1230 };
1231 self.catalog_transact(Some(session), vec![op]).await?;
1232 Ok(ExecuteResponse::Comment)
1233 }
1234
1235 #[instrument]
1236 pub(super) async fn sequence_drop_objects(
1237 &mut self,
1238 ctx: &mut ExecuteContext,
1239 plan::DropObjectsPlan {
1240 drop_ids,
1241 object_type,
1242 referenced_ids,
1243 }: plan::DropObjectsPlan,
1244 ) -> Result<ExecuteResponse, AdapterError> {
1245 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1246 let mut objects = Vec::new();
1247 for obj_id in &drop_ids {
1248 if !referenced_ids_hashset.contains(obj_id) {
1249 let object_info = ErrorMessageObjectDescription::from_id(
1250 obj_id,
1251 &self.catalog().for_session(ctx.session()),
1252 )
1253 .to_string();
1254 objects.push(object_info);
1255 }
1256 }
1257
1258 if !objects.is_empty() {
1259 ctx.session()
1260 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1261 }
1262
1263 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1265 .iter()
1266 .filter_map(|id| match id {
1267 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1268 _ => None,
1269 })
1270 .flatten()
1271 .collect();
1272
1273 let DropOps {
1274 ops,
1275 dropped_active_db,
1276 dropped_active_cluster,
1277 dropped_in_use_indexes,
1278 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1279
1280 self.catalog_transact_with_context(None, Some(ctx), ops)
1281 .await?;
1282
1283 if !expr_cache_invalidate_ids.is_empty() {
1285 let _fut = self.catalog().update_expression_cache(
1286 Default::default(),
1287 Default::default(),
1288 expr_cache_invalidate_ids,
1289 );
1290 }
1291
1292 fail::fail_point!("after_sequencer_drop_replica");
1293
1294 if dropped_active_db {
1295 ctx.session()
1296 .add_notice(AdapterNotice::DroppedActiveDatabase {
1297 name: ctx.session().vars().database().to_string(),
1298 });
1299 }
1300 if dropped_active_cluster {
1301 ctx.session()
1302 .add_notice(AdapterNotice::DroppedActiveCluster {
1303 name: ctx.session().vars().cluster().to_string(),
1304 });
1305 }
1306 for dropped_in_use_index in dropped_in_use_indexes {
1307 ctx.session()
1308 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1309 self.metrics
1310 .optimization_notices
1311 .with_label_values(&["DroppedInUseIndex"])
1312 .inc_by(1);
1313 }
1314 Ok(ExecuteResponse::DroppedObject(object_type))
1315 }
1316
1317 fn validate_dropped_role_ownership(
1318 &self,
1319 session: &Session,
1320 dropped_roles: &BTreeMap<RoleId, &str>,
1321 ) -> Result<(), AdapterError> {
1322 fn privilege_check(
1323 privileges: &PrivilegeMap,
1324 dropped_roles: &BTreeMap<RoleId, &str>,
1325 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1326 object_id: &SystemObjectId,
1327 catalog: &ConnCatalog,
1328 ) {
1329 for privilege in privileges.all_values() {
1330 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1331 let grantor_name = catalog.get_role(&privilege.grantor).name();
1332 let object_description =
1333 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1334 dependent_objects
1335 .entry(role_name.to_string())
1336 .or_default()
1337 .push(format!(
1338 "privileges on {object_description} granted by {grantor_name}",
1339 ));
1340 }
1341 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1342 let grantee_name = catalog.get_role(&privilege.grantee).name();
1343 let object_description =
1344 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1345 dependent_objects
1346 .entry(role_name.to_string())
1347 .or_default()
1348 .push(format!(
1349 "privileges granted on {object_description} to {grantee_name}"
1350 ));
1351 }
1352 }
1353 }
1354
1355 let catalog = self.catalog().for_session(session);
1356 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1357 for entry in self.catalog.entries() {
1358 let id = SystemObjectId::Object(entry.id().into());
1359 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1360 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1361 dependent_objects
1362 .entry(role_name.to_string())
1363 .or_default()
1364 .push(format!("owner of {object_description}"));
1365 }
1366 privilege_check(
1367 entry.privileges(),
1368 dropped_roles,
1369 &mut dependent_objects,
1370 &id,
1371 &catalog,
1372 );
1373 }
1374 for database in self.catalog.databases() {
1375 let database_id = SystemObjectId::Object(database.id().into());
1376 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1377 let object_description =
1378 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1379 dependent_objects
1380 .entry(role_name.to_string())
1381 .or_default()
1382 .push(format!("owner of {object_description}"));
1383 }
1384 privilege_check(
1385 &database.privileges,
1386 dropped_roles,
1387 &mut dependent_objects,
1388 &database_id,
1389 &catalog,
1390 );
1391 for schema in database.schemas_by_id.values() {
1392 let schema_id = SystemObjectId::Object(
1393 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1394 );
1395 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1396 let object_description =
1397 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1398 dependent_objects
1399 .entry(role_name.to_string())
1400 .or_default()
1401 .push(format!("owner of {object_description}"));
1402 }
1403 privilege_check(
1404 &schema.privileges,
1405 dropped_roles,
1406 &mut dependent_objects,
1407 &schema_id,
1408 &catalog,
1409 );
1410 }
1411 }
1412 for cluster in self.catalog.clusters() {
1413 let cluster_id = SystemObjectId::Object(cluster.id().into());
1414 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1415 let object_description =
1416 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1417 dependent_objects
1418 .entry(role_name.to_string())
1419 .or_default()
1420 .push(format!("owner of {object_description}"));
1421 }
1422 privilege_check(
1423 &cluster.privileges,
1424 dropped_roles,
1425 &mut dependent_objects,
1426 &cluster_id,
1427 &catalog,
1428 );
1429 for replica in cluster.replicas() {
1430 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1431 let replica_id =
1432 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1433 let object_description =
1434 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1435 dependent_objects
1436 .entry(role_name.to_string())
1437 .or_default()
1438 .push(format!("owner of {object_description}"));
1439 }
1440 }
1441 }
1442 privilege_check(
1443 self.catalog().system_privileges(),
1444 dropped_roles,
1445 &mut dependent_objects,
1446 &SystemObjectId::System,
1447 &catalog,
1448 );
1449 for (default_privilege_object, default_privilege_acl_items) in
1450 self.catalog.default_privileges()
1451 {
1452 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1453 dependent_objects
1454 .entry(role_name.to_string())
1455 .or_default()
1456 .push(format!(
1457 "default privileges on {}S created by {}",
1458 default_privilege_object.object_type, role_name
1459 ));
1460 }
1461 for default_privilege_acl_item in default_privilege_acl_items {
1462 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1463 dependent_objects
1464 .entry(role_name.to_string())
1465 .or_default()
1466 .push(format!(
1467 "default privileges on {}S granted to {}",
1468 default_privilege_object.object_type, role_name
1469 ));
1470 }
1471 }
1472 }
1473
1474 if !dependent_objects.is_empty() {
1475 Err(AdapterError::DependentObject(dependent_objects))
1476 } else {
1477 Ok(())
1478 }
1479 }
1480
1481 #[instrument]
1482 pub(super) async fn sequence_drop_owned(
1483 &mut self,
1484 session: &Session,
1485 plan: plan::DropOwnedPlan,
1486 ) -> Result<ExecuteResponse, AdapterError> {
1487 for role_id in &plan.role_ids {
1488 self.catalog().ensure_not_reserved_role(role_id)?;
1489 }
1490
1491 let mut privilege_revokes = plan.privilege_revokes;
1492
1493 let session_catalog = self.catalog().for_session(session);
1495 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1496 && !session.is_superuser()
1497 {
1498 let role_membership =
1500 session_catalog.collect_role_membership(session.current_role_id());
1501 let invalid_revokes: BTreeSet<_> = privilege_revokes
1502 .extract_if(.., |(_, privilege)| {
1503 !role_membership.contains(&privilege.grantor)
1504 })
1505 .map(|(object_id, _)| object_id)
1506 .collect();
1507 for invalid_revoke in invalid_revokes {
1508 let object_description =
1509 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1510 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1511 }
1512 }
1513
1514 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1515 catalog::Op::UpdatePrivilege {
1516 target_id: object_id,
1517 privilege,
1518 variant: UpdatePrivilegeVariant::Revoke,
1519 }
1520 });
1521 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1522 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1523 privilege_object,
1524 privilege_acl_item,
1525 variant: UpdatePrivilegeVariant::Revoke,
1526 },
1527 );
1528 let DropOps {
1529 ops: drop_ops,
1530 dropped_active_db,
1531 dropped_active_cluster,
1532 dropped_in_use_indexes,
1533 } = self.sequence_drop_common(session, plan.drop_ids)?;
1534
1535 let ops = privilege_revoke_ops
1536 .chain(default_privilege_revoke_ops)
1537 .chain(drop_ops.into_iter())
1538 .collect();
1539
1540 self.catalog_transact(Some(session), ops).await?;
1541
1542 if dropped_active_db {
1543 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1544 name: session.vars().database().to_string(),
1545 });
1546 }
1547 if dropped_active_cluster {
1548 session.add_notice(AdapterNotice::DroppedActiveCluster {
1549 name: session.vars().cluster().to_string(),
1550 });
1551 }
1552 for dropped_in_use_index in dropped_in_use_indexes {
1553 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1554 }
1555 Ok(ExecuteResponse::DroppedOwned)
1556 }
1557
1558 fn sequence_drop_common(
1559 &self,
1560 session: &Session,
1561 ids: Vec<ObjectId>,
1562 ) -> Result<DropOps, AdapterError> {
1563 let mut dropped_active_db = false;
1564 let mut dropped_active_cluster = false;
1565 let mut dropped_in_use_indexes = Vec::new();
1566 let mut dropped_roles = BTreeMap::new();
1567 let mut dropped_databases = BTreeSet::new();
1568 let mut dropped_schemas = BTreeSet::new();
1569 let mut role_revokes = BTreeSet::new();
1573 let mut default_privilege_revokes = BTreeSet::new();
1576
1577 let mut clusters_to_drop = BTreeSet::new();
1579
1580 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1581 for id in &ids {
1582 match id {
1583 ObjectId::Database(id) => {
1584 let name = self.catalog().get_database(id).name();
1585 if name == session.vars().database() {
1586 dropped_active_db = true;
1587 }
1588 dropped_databases.insert(id);
1589 }
1590 ObjectId::Schema((_, spec)) => {
1591 if let SchemaSpecifier::Id(id) = spec {
1592 dropped_schemas.insert(id);
1593 }
1594 }
1595 ObjectId::Cluster(id) => {
1596 clusters_to_drop.insert(*id);
1597 if let Some(active_id) = self
1598 .catalog()
1599 .active_cluster(session)
1600 .ok()
1601 .map(|cluster| cluster.id())
1602 {
1603 if id == &active_id {
1604 dropped_active_cluster = true;
1605 }
1606 }
1607 }
1608 ObjectId::Role(id) => {
1609 let role = self.catalog().get_role(id);
1610 let name = role.name();
1611 dropped_roles.insert(*id, name);
1612 for (group_id, grantor_id) in &role.membership.map {
1614 role_revokes.insert((*group_id, *id, *grantor_id));
1615 }
1616 }
1617 ObjectId::Item(id) => {
1618 if let Some(index) = self.catalog().get_entry(id).index() {
1619 let humanizer = self.catalog().for_session(session);
1620 let dependants = self
1621 .controller
1622 .compute
1623 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1624 .ok()
1625 .into_iter()
1626 .flatten()
1627 .filter(|dependant_id| {
1628 if dependant_id.is_transient() {
1635 return false;
1636 }
1637 let Some(dependent_id) = humanizer
1639 .try_get_item_by_global_id(dependant_id)
1640 .map(|item| item.id())
1641 else {
1642 return false;
1643 };
1644 !ids_set.contains(&ObjectId::Item(dependent_id))
1647 })
1648 .flat_map(|dependant_id| {
1649 humanizer.humanize_id(dependant_id)
1653 })
1654 .collect_vec();
1655 if !dependants.is_empty() {
1656 dropped_in_use_indexes.push(DroppedInUseIndex {
1657 index_name: humanizer
1658 .humanize_id(index.global_id())
1659 .unwrap_or_else(|| id.to_string()),
1660 dependant_objects: dependants,
1661 });
1662 }
1663 }
1664 }
1665 _ => {}
1666 }
1667 }
1668
1669 for id in &ids {
1670 match id {
1671 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1675 if !clusters_to_drop.contains(cluster_id) {
1676 let cluster = self.catalog.get_cluster(*cluster_id);
1677 if cluster.is_managed() {
1678 let replica =
1679 cluster.replica(*replica_id).expect("Catalog out of sync");
1680 if !replica.config.location.internal() {
1681 coord_bail!("cannot drop replica of managed cluster");
1682 }
1683 }
1684 }
1685 }
1686 _ => {}
1687 }
1688 }
1689
1690 for role_id in dropped_roles.keys() {
1691 self.catalog().ensure_not_reserved_role(role_id)?;
1692 }
1693 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1694 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1696 for role in self.catalog().user_roles() {
1697 for dropped_role_id in
1698 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1699 {
1700 role_revokes.insert((
1701 **dropped_role_id,
1702 role.id(),
1703 *role
1704 .membership
1705 .map
1706 .get(*dropped_role_id)
1707 .expect("included in keys above"),
1708 ));
1709 }
1710 }
1711
1712 for (default_privilege_object, default_privilege_acls) in
1713 self.catalog().default_privileges()
1714 {
1715 if matches!(
1716 &default_privilege_object.database_id,
1717 Some(database_id) if dropped_databases.contains(database_id),
1718 ) || matches!(
1719 &default_privilege_object.schema_id,
1720 Some(schema_id) if dropped_schemas.contains(schema_id),
1721 ) {
1722 for default_privilege_acl in default_privilege_acls {
1723 default_privilege_revokes.insert((
1724 default_privilege_object.clone(),
1725 default_privilege_acl.clone(),
1726 ));
1727 }
1728 }
1729 }
1730
1731 let ops = role_revokes
1732 .into_iter()
1733 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1734 role_id,
1735 member_id,
1736 grantor_id,
1737 })
1738 .chain(default_privilege_revokes.into_iter().map(
1739 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1740 privilege_object,
1741 privilege_acl_item,
1742 variant: UpdatePrivilegeVariant::Revoke,
1743 },
1744 ))
1745 .chain(iter::once(catalog::Op::DropObjects(
1746 ids.into_iter()
1747 .map(DropObjectInfo::manual_drop_from_object_id)
1748 .collect(),
1749 )))
1750 .collect();
1751
1752 Ok(DropOps {
1753 ops,
1754 dropped_active_db,
1755 dropped_active_cluster,
1756 dropped_in_use_indexes,
1757 })
1758 }
1759
1760 pub(super) fn sequence_explain_schema(
1761 &self,
1762 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1763 ) -> Result<ExecuteResponse, AdapterError> {
1764 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1765 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1766 })?;
1767
1768 let json_string = json_string(&json_value);
1769 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1770 Ok(Self::send_immediate_rows(row))
1771 }
1772
1773 pub(super) fn sequence_show_all_variables(
1774 &self,
1775 session: &Session,
1776 ) -> Result<ExecuteResponse, AdapterError> {
1777 let mut rows = viewable_variables(self.catalog().state(), session)
1778 .map(|v| (v.name(), v.value(), v.description()))
1779 .collect::<Vec<_>>();
1780 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1781
1782 let rows: Vec<_> = rows
1784 .into_iter()
1785 .map(|(name, val, desc)| {
1786 Row::pack_slice(&[
1787 Datum::String(name),
1788 Datum::String(&val),
1789 Datum::String(desc),
1790 ])
1791 })
1792 .collect();
1793 Ok(Self::send_immediate_rows(rows))
1794 }
1795
1796 pub(super) fn sequence_show_variable(
1797 &self,
1798 session: &Session,
1799 plan: plan::ShowVariablePlan,
1800 ) -> Result<ExecuteResponse, AdapterError> {
1801 if &plan.name == SCHEMA_ALIAS {
1802 let schemas = self.catalog.resolve_search_path(session);
1803 let schema = schemas.first();
1804 return match schema {
1805 Some((database_spec, schema_spec)) => {
1806 let schema_name = &self
1807 .catalog
1808 .get_schema(database_spec, schema_spec, session.conn_id())
1809 .name()
1810 .schema;
1811 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1812 Ok(Self::send_immediate_rows(row))
1813 }
1814 None => {
1815 if session.vars().current_object_missing_warnings() {
1816 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1817 search_path: session
1818 .vars()
1819 .search_path()
1820 .into_iter()
1821 .map(|schema| schema.to_string())
1822 .collect(),
1823 });
1824 }
1825 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1826 }
1827 };
1828 }
1829
1830 let variable = session
1831 .vars()
1832 .get(self.catalog().system_config(), &plan.name)
1833 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1834
1835 variable.visible(session.user(), self.catalog().system_config())?;
1838
1839 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1840 if variable.name() == vars::DATABASE.name()
1841 && matches!(
1842 self.catalog().resolve_database(&variable.value()),
1843 Err(CatalogError::UnknownDatabase(_))
1844 )
1845 && session.vars().current_object_missing_warnings()
1846 {
1847 let name = variable.value();
1848 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1849 } else if variable.name() == vars::CLUSTER.name()
1850 && matches!(
1851 self.catalog().resolve_cluster(&variable.value()),
1852 Err(CatalogError::UnknownCluster(_))
1853 )
1854 && session.vars().current_object_missing_warnings()
1855 {
1856 let name = variable.value();
1857 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1858 }
1859 Ok(Self::send_immediate_rows(row))
1860 }
1861
1862 #[instrument]
1863 pub(super) async fn sequence_inspect_shard(
1864 &self,
1865 session: &Session,
1866 plan: plan::InspectShardPlan,
1867 ) -> Result<ExecuteResponse, AdapterError> {
1868 if !session.user().is_internal() {
1871 return Err(AdapterError::Unauthorized(
1872 rbac::UnauthorizedError::MzSystem {
1873 action: "inspect".into(),
1874 },
1875 ));
1876 }
1877 let state = self
1878 .controller
1879 .storage
1880 .inspect_persist_state(plan.id)
1881 .await?;
1882 let jsonb = Jsonb::from_serde_json(state)?;
1883 Ok(Self::send_immediate_rows(jsonb.into_row()))
1884 }
1885
1886 #[instrument]
1887 pub(super) fn sequence_set_variable(
1888 &self,
1889 session: &mut Session,
1890 plan: plan::SetVariablePlan,
1891 ) -> Result<ExecuteResponse, AdapterError> {
1892 let (name, local) = (plan.name, plan.local);
1893 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1894 self.validate_set_isolation_level(session)?;
1895 }
1896 if &name == vars::CLUSTER.name() {
1897 self.validate_set_cluster(session)?;
1898 }
1899
1900 let vars = session.vars_mut();
1901 let values = match plan.value {
1902 plan::VariableValue::Default => None,
1903 plan::VariableValue::Values(values) => Some(values),
1904 };
1905
1906 match values {
1907 Some(values) => {
1908 vars.set(
1909 self.catalog().system_config(),
1910 &name,
1911 VarInput::SqlSet(&values),
1912 local,
1913 )?;
1914
1915 let vars = session.vars();
1916
1917 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1920 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1921 } else if name == vars::CLUSTER.name()
1922 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1923 {
1924 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1925 }
1926
1927 if name.as_str() == vars::DATABASE.name()
1929 && matches!(
1930 self.catalog().resolve_database(vars.database()),
1931 Err(CatalogError::UnknownDatabase(_))
1932 )
1933 && session.vars().current_object_missing_warnings()
1934 {
1935 let name = vars.database().to_string();
1936 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1937 } else if name.as_str() == vars::CLUSTER.name()
1938 && matches!(
1939 self.catalog().resolve_cluster(vars.cluster()),
1940 Err(CatalogError::UnknownCluster(_))
1941 )
1942 && session.vars().current_object_missing_warnings()
1943 {
1944 let name = vars.cluster().to_string();
1945 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1946 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1947 let v = values.into_first().to_lowercase();
1948 if v == IsolationLevel::ReadUncommitted.as_str()
1949 || v == IsolationLevel::ReadCommitted.as_str()
1950 || v == IsolationLevel::RepeatableRead.as_str()
1951 {
1952 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1953 isolation_level: v,
1954 });
1955 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1956 session.add_notice(AdapterNotice::StrongSessionSerializable);
1957 }
1958 }
1959 }
1960 None => vars.reset(self.catalog().system_config(), &name, local)?,
1961 }
1962
1963 Ok(ExecuteResponse::SetVariable { name, reset: false })
1964 }
1965
1966 pub(super) fn sequence_reset_variable(
1967 &self,
1968 session: &mut Session,
1969 plan: plan::ResetVariablePlan,
1970 ) -> Result<ExecuteResponse, AdapterError> {
1971 let name = plan.name;
1972 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1973 self.validate_set_isolation_level(session)?;
1974 }
1975 if &name == vars::CLUSTER.name() {
1976 self.validate_set_cluster(session)?;
1977 }
1978 session
1979 .vars_mut()
1980 .reset(self.catalog().system_config(), &name, false)?;
1981 Ok(ExecuteResponse::SetVariable { name, reset: true })
1982 }
1983
1984 pub(super) fn sequence_set_transaction(
1985 &self,
1986 session: &mut Session,
1987 plan: plan::SetTransactionPlan,
1988 ) -> Result<ExecuteResponse, AdapterError> {
1989 for mode in plan.modes {
1991 match mode {
1992 TransactionMode::AccessMode(_) => {
1993 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1994 }
1995 TransactionMode::IsolationLevel(isolation_level) => {
1996 self.validate_set_isolation_level(session)?;
1997
1998 session.vars_mut().set(
1999 self.catalog().system_config(),
2000 TRANSACTION_ISOLATION_VAR_NAME,
2001 VarInput::Flat(&isolation_level.to_ast_string_stable()),
2002 plan.local,
2003 )?
2004 }
2005 }
2006 }
2007 Ok(ExecuteResponse::SetVariable {
2008 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2009 reset: false,
2010 })
2011 }
2012
2013 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2014 if session.transaction().contains_ops() {
2015 Err(AdapterError::InvalidSetIsolationLevel)
2016 } else {
2017 Ok(())
2018 }
2019 }
2020
2021 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2022 if session.transaction().contains_ops() {
2023 Err(AdapterError::InvalidSetCluster)
2024 } else {
2025 Ok(())
2026 }
2027 }
2028
2029 #[instrument]
2030 pub(super) async fn sequence_end_transaction(
2031 &mut self,
2032 mut ctx: ExecuteContext,
2033 mut action: EndTransactionAction,
2034 ) {
2035 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2037 (&action, ctx.session().transaction())
2038 {
2039 action = EndTransactionAction::Rollback;
2040 }
2041 let response = match action {
2042 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2043 params: BTreeMap::new(),
2044 }),
2045 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2046 params: BTreeMap::new(),
2047 }),
2048 };
2049
2050 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2051
2052 let (response, action) = match result {
2053 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2054 (response, action)
2055 }
2056 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2057 let validated_locks = match write_lock_guards {
2061 None => None,
2062 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2063 Ok(locks) => Some(locks),
2064 Err(missing) => {
2065 tracing::error!(?missing, "programming error, missing write locks");
2066 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2067 }
2068 },
2069 };
2070
2071 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2072 for WriteOp { id, rows } in writes {
2073 let total_rows = collected_writes.entry(id).or_default();
2074 total_rows.push(rows);
2075 }
2076
2077 self.submit_write(PendingWriteTxn::User {
2078 span: Span::current(),
2079 writes: collected_writes,
2080 write_locks: validated_locks,
2081 pending_txn: PendingTxn {
2082 ctx,
2083 response,
2084 action,
2085 },
2086 });
2087 return;
2088 }
2089 Ok((
2090 Some(TransactionOps::Peeks {
2091 determination,
2092 requires_linearization: RequireLinearization::Required,
2093 ..
2094 }),
2095 _,
2096 )) if ctx.session().vars().transaction_isolation()
2097 == &IsolationLevel::StrictSerializable =>
2098 {
2099 let conn_id = ctx.session().conn_id().clone();
2100 let pending_read_txn = PendingReadTxn {
2101 txn: PendingRead::Read {
2102 txn: PendingTxn {
2103 ctx,
2104 response,
2105 action,
2106 },
2107 },
2108 timestamp_context: determination.timestamp_context,
2109 created: Instant::now(),
2110 num_requeues: 0,
2111 otel_ctx: OpenTelemetryContext::obtain(),
2112 };
2113 self.strict_serializable_reads_tx
2114 .send((conn_id, pending_read_txn))
2115 .expect("sending to strict_serializable_reads_tx cannot fail");
2116 return;
2117 }
2118 Ok((
2119 Some(TransactionOps::Peeks {
2120 determination,
2121 requires_linearization: RequireLinearization::Required,
2122 ..
2123 }),
2124 _,
2125 )) if ctx.session().vars().transaction_isolation()
2126 == &IsolationLevel::StrongSessionSerializable =>
2127 {
2128 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2129 ctx.session_mut()
2130 .ensure_timestamp_oracle(timeline.clone())
2131 .apply_write(*ts);
2132 }
2133 (response, action)
2134 }
2135 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2136 self.internal_cmd_tx
2137 .send(Message::ExecuteSingleStatementTransaction {
2138 ctx,
2139 otel_ctx: OpenTelemetryContext::obtain(),
2140 stmt,
2141 params,
2142 })
2143 .expect("must send");
2144 return;
2145 }
2146 Ok((_, _)) => (response, action),
2147 Err(err) => (Err(err), EndTransactionAction::Rollback),
2148 };
2149 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2150 let response = response.map(|mut r| {
2152 r.extend_params(changed);
2153 ExecuteResponse::from(r)
2154 });
2155
2156 ctx.retire(response);
2157 }
2158
2159 #[instrument]
2160 async fn sequence_end_transaction_inner(
2161 &mut self,
2162 ctx: &mut ExecuteContext,
2163 action: EndTransactionAction,
2164 ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2165 let txn = self.clear_transaction(ctx.session_mut()).await;
2166
2167 if let EndTransactionAction::Commit = action {
2168 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2169 match &mut ops {
2170 TransactionOps::Writes(writes) => {
2171 for WriteOp { id, .. } in &mut writes.iter() {
2172 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2174 AdapterError::Catalog(mz_catalog::memory::error::Error {
2175 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2176 })
2177 })?;
2178 }
2179
2180 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2182 }
2183 TransactionOps::DDL {
2184 ops,
2185 state: _,
2186 side_effects,
2187 revision,
2188 snapshot: _,
2189 } => {
2190 if *revision != self.catalog().transient_revision() {
2192 return Err(AdapterError::DDLTransactionRace);
2193 }
2194 let ops = std::mem::take(ops);
2196 let side_effects = std::mem::take(side_effects);
2197 self.catalog_transact_with_side_effects(
2198 Some(ctx),
2199 ops,
2200 move |a, mut ctx| {
2201 Box::pin(async move {
2202 for side_effect in side_effects {
2203 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2204 }
2205 })
2206 },
2207 )
2208 .await?;
2209 }
2210 _ => (),
2211 }
2212 return Ok((Some(ops), write_lock_guards));
2213 }
2214 }
2215
2216 Ok((None, None))
2217 }
2218
2219 pub(super) async fn sequence_side_effecting_func(
2220 &mut self,
2221 ctx: ExecuteContext,
2222 plan: SideEffectingFunc,
2223 ) {
2224 match plan {
2225 SideEffectingFunc::PgCancelBackend { connection_id } => {
2226 if ctx.session().conn_id().unhandled() == connection_id {
2227 ctx.retire(Err(AdapterError::Canceled));
2231 return;
2232 }
2233
2234 let res = if let Some((id_handle, _conn_meta)) =
2235 self.active_conns.get_key_value(&connection_id)
2236 {
2237 self.handle_privileged_cancel(id_handle.clone()).await;
2239 Datum::True
2240 } else {
2241 Datum::False
2242 };
2243 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2244 }
2245 }
2246 }
2247
2248 pub(crate) async fn execute_side_effecting_func(
2257 &mut self,
2258 plan: SideEffectingFunc,
2259 conn_id: ConnectionId,
2260 current_role: RoleId,
2261 ) -> Result<ExecuteResponse, AdapterError> {
2262 match plan {
2263 SideEffectingFunc::PgCancelBackend { connection_id } => {
2264 if conn_id.unhandled() == connection_id {
2265 return Err(AdapterError::Canceled);
2269 }
2270
2271 if let Some((_id_handle, conn_meta)) =
2274 self.active_conns.get_key_value(&connection_id)
2275 {
2276 let target_role = *conn_meta.authenticated_role_id();
2277 let role_membership = self
2278 .catalog()
2279 .state()
2280 .collect_role_membership(¤t_role);
2281 if !role_membership.contains(&target_role) {
2282 let target_role_name = self
2283 .catalog()
2284 .try_get_role(&target_role)
2285 .map(|role| role.name().to_string())
2286 .unwrap_or_else(|| target_role.to_string());
2287 return Err(AdapterError::Unauthorized(
2288 rbac::UnauthorizedError::RoleMembership {
2289 role_names: vec![target_role_name],
2290 },
2291 ));
2292 }
2293
2294 let id_handle = self
2296 .active_conns
2297 .get_key_value(&connection_id)
2298 .map(|(id, _)| id.clone())
2299 .expect("checked above");
2300 self.handle_privileged_cancel(id_handle).await;
2301 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2302 } else {
2303 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2305 }
2306 }
2307 }
2308 }
2309
2310 pub(crate) async fn determine_real_time_recent_timestamp(
2314 &self,
2315 source_ids: impl Iterator<Item = GlobalId>,
2316 real_time_recency_timeout: Duration,
2317 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2318 let item_ids = source_ids
2319 .map(|gid| {
2320 self.catalog
2321 .try_resolve_item_id(&gid)
2322 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2323 })
2324 .collect::<Result<Vec<_>, _>>()?;
2325
2326 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2332 if to_visit.is_empty() {
2335 return Ok(None);
2336 }
2337
2338 let mut timestamp_objects = BTreeSet::new();
2339
2340 while let Some(id) = to_visit.pop_front() {
2341 timestamp_objects.insert(id);
2342 to_visit.extend(
2343 self.catalog()
2344 .get_entry(&id)
2345 .uses()
2346 .into_iter()
2347 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2348 );
2349 }
2350 let timestamp_objects = timestamp_objects
2351 .into_iter()
2352 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2353 .collect();
2354
2355 let r = self
2356 .controller
2357 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2358 .await?;
2359
2360 Ok(Some(r))
2361 }
2362
2363 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2366 &self,
2367 session: &Session,
2368 source_ids: impl Iterator<Item = GlobalId>,
2369 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2370 let vars = session.vars();
2371
2372 if vars.real_time_recency()
2373 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2374 && !session.contains_read_timestamp()
2375 {
2376 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2377 .await
2378 } else {
2379 Ok(None)
2380 }
2381 }
2382
2383 #[instrument]
2384 pub(super) async fn sequence_explain_plan(
2385 &mut self,
2386 ctx: ExecuteContext,
2387 plan: plan::ExplainPlanPlan,
2388 target_cluster: TargetCluster,
2389 ) {
2390 match &plan.explainee {
2391 plan::Explainee::Statement(stmt) => match stmt {
2392 plan::ExplaineeStatement::CreateView { .. } => {
2393 self.explain_create_view(ctx, plan).await;
2394 }
2395 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2396 self.explain_create_materialized_view(ctx, plan).await;
2397 }
2398 plan::ExplaineeStatement::CreateIndex { .. } => {
2399 self.explain_create_index(ctx, plan).await;
2400 }
2401 plan::ExplaineeStatement::Select { .. } => {
2402 self.explain_peek(ctx, plan, target_cluster).await;
2403 }
2404 plan::ExplaineeStatement::Subscribe { .. } => {
2405 self.explain_subscribe(ctx, plan, target_cluster).await;
2406 }
2407 },
2408 plan::Explainee::View(_) => {
2409 let result = self.explain_view(&ctx, plan);
2410 ctx.retire(result);
2411 }
2412 plan::Explainee::MaterializedView(_) => {
2413 let result = self.explain_materialized_view(&ctx, plan);
2414 ctx.retire(result);
2415 }
2416 plan::Explainee::Index(_) => {
2417 let result = self.explain_index(&ctx, plan);
2418 ctx.retire(result);
2419 }
2420 plan::Explainee::ReplanView(_) => {
2421 self.explain_replan_view(ctx, plan).await;
2422 }
2423 plan::Explainee::ReplanMaterializedView(_) => {
2424 self.explain_replan_materialized_view(ctx, plan).await;
2425 }
2426 plan::Explainee::ReplanIndex(_) => {
2427 self.explain_replan_index(ctx, plan).await;
2428 }
2429 };
2430 }
2431
2432 pub(super) async fn sequence_explain_pushdown(
2433 &mut self,
2434 ctx: ExecuteContext,
2435 plan: plan::ExplainPushdownPlan,
2436 target_cluster: TargetCluster,
2437 ) {
2438 match plan.explainee {
2439 Explainee::Statement(ExplaineeStatement::Select {
2440 broken: false,
2441 plan,
2442 desc: _,
2443 }) => {
2444 let stage = return_if_err!(
2445 self.peek_validate(
2446 ctx.session(),
2447 plan,
2448 target_cluster,
2449 None,
2450 ExplainContext::Pushdown,
2451 Some(ctx.session().vars().max_query_result_size()),
2452 ),
2453 ctx
2454 );
2455 self.sequence_staged(ctx, Span::current(), stage).await;
2456 }
2457 Explainee::MaterializedView(item_id) => {
2458 self.explain_pushdown_materialized_view(ctx, item_id).await;
2459 }
2460 _ => {
2461 ctx.retire(Err(AdapterError::Unsupported(
2462 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2463 )));
2464 }
2465 };
2466 }
2467
2468 async fn execute_explain_pushdown_with_read_holds(
2470 &self,
2471 ctx: ExecuteContext,
2472 as_of: Antichain<Timestamp>,
2473 mz_now: ResultSpec<'static>,
2474 read_holds: Option<ReadHolds>,
2475 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2476 ) {
2477 let fut = self
2478 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2479 .await;
2480 task::spawn(|| "render explain pushdown", async move {
2481 let _read_holds = read_holds;
2483 let res = fut.await;
2484 ctx.retire(res);
2485 });
2486 }
2487
2488 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2490 &self,
2491 session: &Session,
2492 as_of: Antichain<Timestamp>,
2493 mz_now: ResultSpec<'static>,
2494 imports: I,
2495 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2496 super::explain_pushdown_future_inner(
2498 session,
2499 &self.catalog,
2500 &self.controller.storage_collections,
2501 as_of,
2502 mz_now,
2503 imports,
2504 )
2505 .await
2506 }
2507
2508 #[instrument]
2509 pub(super) async fn sequence_insert(
2510 &mut self,
2511 mut ctx: ExecuteContext,
2512 plan: plan::InsertPlan,
2513 ) {
2514 if !ctx.session_mut().transaction().allows_writes() {
2522 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2523 return;
2524 }
2525
2526 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2540 let expr = return_if_err!(
2543 plan.values
2544 .clone()
2545 .lower(self.catalog().system_config(), None),
2546 ctx
2547 );
2548 OptimizedMirRelationExpr(expr)
2549 } else {
2550 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2552
2553 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2555
2556 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2558 };
2559
2560 match optimized_mir.into_inner() {
2561 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2562 let catalog = self.owned_catalog();
2563 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2564 let result =
2565 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2566 ctx.retire(result);
2567 });
2568 }
2569 _ => {
2571 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2572 Some(table) => {
2573 let desc = table.relation_desc_latest().expect("table has a desc");
2575 desc.arity()
2576 }
2577 None => {
2578 ctx.retire(Err(AdapterError::Catalog(
2579 mz_catalog::memory::error::Error {
2580 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2581 plan.id.to_string(),
2582 )),
2583 },
2584 )));
2585 return;
2586 }
2587 };
2588
2589 let finishing = RowSetFinishing {
2590 order_by: vec![],
2591 limit: None,
2592 offset: 0,
2593 project: (0..desc_arity).collect(),
2594 };
2595
2596 let read_then_write_plan = plan::ReadThenWritePlan {
2597 id: plan.id,
2598 selection: plan.values,
2599 finishing,
2600 assignments: BTreeMap::new(),
2601 kind: MutationKind::Insert,
2602 returning: plan.returning,
2603 };
2604
2605 self.sequence_read_then_write(ctx, read_then_write_plan)
2606 .await;
2607 }
2608 }
2609 }
2610
2611 #[instrument]
2616 pub(super) async fn sequence_read_then_write(
2617 &mut self,
2618 mut ctx: ExecuteContext,
2619 plan: plan::ReadThenWritePlan,
2620 ) {
2621 let mut source_ids: BTreeSet<_> = plan
2622 .selection
2623 .depends_on()
2624 .into_iter()
2625 .map(|gid| self.catalog().resolve_item_id(&gid))
2626 .collect();
2627 source_ids.insert(plan.id);
2628
2629 if ctx.session().transaction().write_locks().is_none() {
2631 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2633
2634 for id in &source_ids {
2636 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2637 write_locks.insert_lock(*id, lock);
2638 }
2639 }
2640
2641 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2643 Ok(locks) => locks,
2644 Err(missing) => {
2645 let role_metadata = ctx.session().role_metadata().clone();
2647 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2648 let plan = DeferredPlan {
2649 ctx,
2650 plan: Plan::ReadThenWrite(plan),
2651 validity: PlanValidity::new(
2652 self.catalog.transient_revision(),
2653 source_ids.clone(),
2654 None,
2655 None,
2656 role_metadata,
2657 ),
2658 requires_locks: source_ids,
2659 };
2660 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2661 }
2662 };
2663
2664 ctx.session_mut()
2665 .try_grant_write_locks(write_locks)
2666 .expect("session has already been granted write locks");
2667 }
2668
2669 let plan::ReadThenWritePlan {
2670 id,
2671 kind,
2672 selection,
2673 mut assignments,
2674 finishing,
2675 mut returning,
2676 } = plan;
2677
2678 let desc = match self.catalog().try_get_entry(&id) {
2680 Some(table) => {
2681 table
2683 .relation_desc_latest()
2684 .expect("table has a desc")
2685 .into_owned()
2686 }
2687 None => {
2688 ctx.retire(Err(AdapterError::Catalog(
2689 mz_catalog::memory::error::Error {
2690 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2691 },
2692 )));
2693 return;
2694 }
2695 };
2696
2697 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2699 || assignments.values().any(|e| e.contains_temporal())
2700 || returning.iter().any(|e| e.contains_temporal());
2701 if contains_temporal {
2702 ctx.retire(Err(AdapterError::Unsupported(
2703 "calls to mz_now in write statements",
2704 )));
2705 return;
2706 }
2707
2708 fn validate_read_dependencies(
2716 catalog: &Catalog,
2717 id: &CatalogItemId,
2718 ) -> Result<(), AdapterError> {
2719 use CatalogItemType::*;
2720 use mz_catalog::memory::objects;
2721 let mut ids_to_check = Vec::new();
2722 let valid = match catalog.try_get_entry(id) {
2723 Some(entry) => {
2724 if let CatalogItem::View(objects::View {
2725 locally_optimized_expr: optimized_expr,
2726 ..
2727 })
2728 | CatalogItem::MaterializedView(objects::MaterializedView {
2729 locally_optimized_expr: optimized_expr,
2730 ..
2731 }) = entry.item()
2732 {
2733 if optimized_expr.contains_temporal() {
2734 return Err(AdapterError::Unsupported(
2735 "calls to mz_now in write statements",
2736 ));
2737 }
2738 }
2739 match entry.item().typ() {
2740 typ @ (Func | View | MaterializedView | ContinualTask) => {
2741 ids_to_check.extend(entry.uses());
2742 let valid_id = id.is_user() || matches!(typ, Func);
2743 valid_id
2744 }
2745 Source | Secret | Connection => false,
2746 Sink | Index => unreachable!(),
2748 Table => {
2749 if !id.is_user() {
2750 false
2752 } else {
2753 entry.source_export_details().is_none()
2755 }
2756 }
2757 Type => true,
2758 }
2759 }
2760 None => false,
2761 };
2762 if !valid {
2763 let (object_name, object_type) = match catalog.try_get_entry(id) {
2764 Some(entry) => {
2765 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2766 let object_type = match entry.item().typ() {
2767 Source => "source",
2769 Secret => "secret",
2770 Connection => "connection",
2771 Table => {
2772 if !id.is_user() {
2773 "system table"
2774 } else {
2775 "source-export table"
2776 }
2777 }
2778 View => "system view",
2779 MaterializedView => "system materialized view",
2780 ContinualTask => "system task",
2781 _ => "invalid dependency",
2782 };
2783 (object_name, object_type.to_string())
2784 }
2785 None => (id.to_string(), "unknown".to_string()),
2786 };
2787 return Err(AdapterError::InvalidTableMutationSelection {
2788 object_name,
2789 object_type,
2790 });
2791 }
2792 for id in ids_to_check {
2793 validate_read_dependencies(catalog, &id)?;
2794 }
2795 Ok(())
2796 }
2797
2798 for gid in selection.depends_on() {
2799 let item_id = self.catalog().resolve_item_id(&gid);
2800 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2801 ctx.retire(Err(err));
2802 return;
2803 }
2804 }
2805
2806 let (peek_tx, peek_rx) = oneshot::channel();
2807 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2808 let (tx, _, session, extra) = ctx.into_parts();
2809 let peek_ctx = ExecuteContext::from_parts(
2821 peek_client_tx,
2822 self.internal_cmd_tx.clone(),
2823 session,
2824 Default::default(),
2825 );
2826
2827 self.sequence_peek(
2828 peek_ctx,
2829 plan::SelectPlan {
2830 select: None,
2831 source: selection,
2832 when: QueryWhen::FreshestTableWrite,
2833 finishing,
2834 copy_to: None,
2835 },
2836 TargetCluster::Active,
2837 None,
2838 )
2839 .await;
2840
2841 let internal_cmd_tx = self.internal_cmd_tx.clone();
2842 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2843 let catalog = self.owned_catalog();
2844 let max_result_size = self.catalog().system_config().max_result_size();
2845
2846 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2847 let (peek_response, session) = match peek_rx.await {
2848 Ok(Response {
2849 result: Ok(resp),
2850 session,
2851 otel_ctx,
2852 }) => {
2853 otel_ctx.attach_as_parent();
2854 (resp, session)
2855 }
2856 Ok(Response {
2857 result: Err(e),
2858 session,
2859 otel_ctx,
2860 }) => {
2861 let ctx =
2862 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2863 otel_ctx.attach_as_parent();
2864 ctx.retire(Err(e));
2865 return;
2866 }
2867 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2869 };
2870 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2871 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2872
2873 if timeout_dur == Duration::ZERO {
2875 timeout_dur = Duration::MAX;
2876 }
2877
2878 let style = ExprPrepOneShot {
2879 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2881 catalog_state: catalog.state(),
2882 };
2883 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2884 return_if_err!(style.prep_scalar_expr(expr), ctx);
2885 }
2886
2887 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2888 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2889 let arena = RowArena::new();
2890 let mut diffs = Vec::new();
2891 let mut datum_vec = mz_repr::DatumVec::new();
2892
2893 while let Some(row) = rows.next() {
2894 if !assignments.is_empty() {
2895 assert!(
2896 matches!(kind, MutationKind::Update),
2897 "only updates support assignments"
2898 );
2899 let mut datums = datum_vec.borrow_with(row);
2900 let mut updates = vec![];
2901 for (idx, expr) in &assignments {
2902 let updated = match expr.eval(&datums, &arena) {
2903 Ok(updated) => updated,
2904 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2905 };
2906 updates.push((*idx, updated));
2907 }
2908 for (idx, new_value) in updates {
2909 datums[idx] = new_value;
2910 }
2911 let updated = Row::pack_slice(&datums);
2912 diffs.push((updated, Diff::ONE));
2913 }
2914 match kind {
2915 MutationKind::Update | MutationKind::Delete => {
2919 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2920 }
2921 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2922 }
2923 }
2924
2925 let mut byte_size: u64 = 0;
2928 for (row, diff) in &diffs {
2929 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2930 if diff.is_positive() {
2931 for (idx, datum) in row.iter().enumerate() {
2932 desc.constraints_met(idx, &datum)?;
2933 }
2934 }
2935 }
2936 Ok((diffs, byte_size))
2937 };
2938
2939 let diffs = match peek_response {
2940 ExecuteResponse::SendingRowsStreaming {
2941 rows: mut rows_stream,
2942 ..
2943 } => {
2944 let mut byte_size: u64 = 0;
2945 let mut diffs = Vec::new();
2946 let result = loop {
2947 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2948 Ok(Some(res)) => match res {
2949 PeekResponseUnary::Rows(new_rows) => {
2950 match make_diffs(new_rows) {
2951 Ok((mut new_diffs, new_byte_size)) => {
2952 byte_size = byte_size.saturating_add(new_byte_size);
2953 if byte_size > max_result_size {
2954 break Err(AdapterError::ResultSize(format!(
2955 "result exceeds max size of {max_result_size}"
2956 )));
2957 }
2958 diffs.append(&mut new_diffs)
2959 }
2960 Err(e) => break Err(e),
2961 };
2962 }
2963 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2964 PeekResponseUnary::Error(e) => {
2965 break Err(AdapterError::Unstructured(anyhow!(e)));
2966 }
2967 },
2968 Ok(None) => break Ok(diffs),
2969 Err(_) => {
2970 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2975 conn_id: ctx.session().conn_id().clone(),
2976 });
2977 if let Err(e) = result {
2978 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2979 }
2980 break Err(AdapterError::StatementTimeout);
2981 }
2982 }
2983 };
2984
2985 result
2986 }
2987 ExecuteResponse::SendingRowsImmediate { rows } => {
2988 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2989 }
2990 resp => Err(AdapterError::Unstructured(anyhow!(
2991 "unexpected peek response: {resp:?}"
2992 ))),
2993 };
2994
2995 let mut returning_rows = Vec::new();
2996 let mut diff_err: Option<AdapterError> = None;
2997 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2998 let arena = RowArena::new();
2999 for (row, diff) in diffs {
3000 if !diff.is_positive() {
3001 continue;
3002 }
3003 let mut returning_row = Row::with_capacity(returning.len());
3004 let mut packer = returning_row.packer();
3005 for expr in &returning {
3006 let datums: Vec<_> = row.iter().collect();
3007 match expr.eval(&datums, &arena) {
3008 Ok(datum) => {
3009 packer.push(datum);
3010 }
3011 Err(err) => {
3012 diff_err = Some(err.into());
3013 break;
3014 }
3015 }
3016 }
3017 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3018 let diff = match NonZeroUsize::try_from(diff) {
3019 Ok(diff) => diff,
3020 Err(err) => {
3021 diff_err = Some(err.into());
3022 break;
3023 }
3024 };
3025 returning_rows.push((returning_row, diff));
3026 if diff_err.is_some() {
3027 break;
3028 }
3029 }
3030 }
3031 let diffs = if let Some(err) = diff_err {
3032 Err(err)
3033 } else {
3034 diffs
3035 };
3036
3037 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3040 if let Some(timestamp_context) = timestamp_context {
3049 let (tx, rx) = tokio::sync::oneshot::channel();
3050 let conn_id = ctx.session().conn_id().clone();
3051 let pending_read_txn = PendingReadTxn {
3052 txn: PendingRead::ReadThenWrite { ctx, tx },
3053 timestamp_context,
3054 created: Instant::now(),
3055 num_requeues: 0,
3056 otel_ctx: OpenTelemetryContext::obtain(),
3057 };
3058 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3059 if let Err(e) = result {
3061 warn!(
3062 "strict_serializable_reads_tx dropped before we could send: {:?}",
3063 e
3064 );
3065 return;
3066 }
3067 let result = rx.await;
3068 ctx = match result {
3070 Ok(Some(ctx)) => ctx,
3071 Ok(None) => {
3072 return;
3075 }
3076 Err(e) => {
3077 warn!(
3078 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3079 e
3080 );
3081 return;
3082 }
3083 };
3084 }
3085
3086 match diffs {
3087 Ok(diffs) => {
3088 let result = Self::send_diffs(
3089 ctx.session_mut(),
3090 plan::SendDiffsPlan {
3091 id,
3092 updates: diffs,
3093 kind,
3094 returning: returning_rows,
3095 max_result_size,
3096 },
3097 );
3098 ctx.retire(result);
3099 }
3100 Err(e) => {
3101 ctx.retire(Err(e));
3102 }
3103 }
3104 });
3105 }
3106
3107 #[instrument]
3108 pub(super) async fn sequence_alter_item_rename(
3109 &mut self,
3110 ctx: &mut ExecuteContext,
3111 plan: plan::AlterItemRenamePlan,
3112 ) -> Result<ExecuteResponse, AdapterError> {
3113 let op = catalog::Op::RenameItem {
3114 id: plan.id,
3115 current_full_name: plan.current_full_name,
3116 to_name: plan.to_name,
3117 };
3118 match self
3119 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3120 .await
3121 {
3122 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3123 Err(err) => Err(err),
3124 }
3125 }
3126
3127 #[instrument]
3128 pub(super) async fn sequence_alter_retain_history(
3129 &mut self,
3130 ctx: &mut ExecuteContext,
3131 plan: plan::AlterRetainHistoryPlan,
3132 ) -> Result<ExecuteResponse, AdapterError> {
3133 soft_assert_or_log!(
3134 !matches!(
3135 self.catalog().get_entry(&plan.id).item(),
3136 CatalogItem::ContinualTask(_)
3137 ),
3138 "RETAIN HISTORY is not supported on continual tasks"
3139 );
3140 let ops = vec![catalog::Op::AlterRetainHistory {
3141 id: plan.id,
3142 value: plan.value,
3143 window: plan.window,
3144 }];
3145 self.catalog_transact_with_context(None, Some(ctx), ops)
3146 .await?;
3147 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3148 }
3149
3150 #[instrument]
3151 pub(super) async fn sequence_alter_source_timestamp_interval(
3152 &mut self,
3153 ctx: &mut ExecuteContext,
3154 plan: plan::AlterSourceTimestampIntervalPlan,
3155 ) -> Result<ExecuteResponse, AdapterError> {
3156 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3157 id: plan.id,
3158 value: plan.value,
3159 interval: plan.interval,
3160 }];
3161 self.catalog_transact_with_context(None, Some(ctx), ops)
3162 .await?;
3163 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3164 }
3165
3166 #[instrument]
3167 pub(super) async fn sequence_alter_schema_rename(
3168 &mut self,
3169 ctx: &mut ExecuteContext,
3170 plan: plan::AlterSchemaRenamePlan,
3171 ) -> Result<ExecuteResponse, AdapterError> {
3172 let (database_spec, schema_spec) = plan.cur_schema_spec;
3173 let op = catalog::Op::RenameSchema {
3174 database_spec,
3175 schema_spec,
3176 new_name: plan.new_schema_name,
3177 check_reserved_names: true,
3178 };
3179 match self
3180 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3181 .await
3182 {
3183 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3184 Err(err) => Err(err),
3185 }
3186 }
3187
3188 #[instrument]
3189 pub(super) async fn sequence_alter_schema_swap(
3190 &mut self,
3191 ctx: &mut ExecuteContext,
3192 plan: plan::AlterSchemaSwapPlan,
3193 ) -> Result<ExecuteResponse, AdapterError> {
3194 let plan::AlterSchemaSwapPlan {
3195 schema_a_spec: (schema_a_db, schema_a),
3196 schema_a_name,
3197 schema_b_spec: (schema_b_db, schema_b),
3198 schema_b_name,
3199 name_temp,
3200 } = plan;
3201
3202 let op_a = catalog::Op::RenameSchema {
3203 database_spec: schema_a_db,
3204 schema_spec: schema_a,
3205 new_name: name_temp,
3206 check_reserved_names: false,
3207 };
3208 let op_b = catalog::Op::RenameSchema {
3209 database_spec: schema_b_db,
3210 schema_spec: schema_b,
3211 new_name: schema_a_name,
3212 check_reserved_names: false,
3213 };
3214 let op_c = catalog::Op::RenameSchema {
3215 database_spec: schema_a_db,
3216 schema_spec: schema_a,
3217 new_name: schema_b_name,
3218 check_reserved_names: false,
3219 };
3220
3221 match self
3222 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3223 Box::pin(async {})
3224 })
3225 .await
3226 {
3227 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3228 Err(err) => Err(err),
3229 }
3230 }
3231
3232 #[instrument]
3233 pub(super) async fn sequence_alter_role(
3234 &mut self,
3235 session: &Session,
3236 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3237 ) -> Result<ExecuteResponse, AdapterError> {
3238 let catalog = self.catalog().for_session(session);
3239 let role = catalog.get_role(&id);
3240
3241 let mut notices = vec![];
3243
3244 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3246 let mut vars = role.vars().clone();
3247
3248 let mut nopassword = false;
3251
3252 match option {
3254 PlannedAlterRoleOption::Attributes(attrs) => {
3255 self.validate_role_attributes(&attrs.clone().into())?;
3256
3257 if let Some(inherit) = attrs.inherit {
3258 attributes.inherit = inherit;
3259 }
3260
3261 if let Some(password) = attrs.password {
3262 attributes.password = Some(password);
3263 attributes.scram_iterations =
3264 Some(self.catalog().system_config().scram_iterations())
3265 }
3266
3267 if let Some(superuser) = attrs.superuser {
3268 attributes.superuser = Some(superuser);
3269 }
3270
3271 if let Some(login) = attrs.login {
3272 attributes.login = Some(login);
3273 }
3274
3275 if attrs.nopassword.unwrap_or(false) {
3276 nopassword = true;
3277 }
3278
3279 if let Some(notice) = self.should_emit_rbac_notice(session) {
3280 notices.push(notice);
3281 }
3282 }
3283 PlannedAlterRoleOption::Variable(variable) => {
3284 let session_var = session.vars().inspect(variable.name())?;
3286 session_var.visible(session.user(), catalog.system_vars())?;
3288
3289 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3292 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3293 } else if let PlannedRoleVariable::Set {
3294 name,
3295 value: VariableValue::Values(vals),
3296 } = &variable
3297 {
3298 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3299 notices.push(AdapterNotice::IntrospectionClusterUsage);
3300 }
3301 }
3302
3303 let var_name = match variable {
3304 PlannedRoleVariable::Set { name, value } => {
3305 match &value {
3307 VariableValue::Default => {
3308 vars.remove(&name);
3309 }
3310 VariableValue::Values(vals) => {
3311 let var = match &vals[..] {
3312 [val] => OwnedVarInput::Flat(val.clone()),
3313 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3314 };
3315 session_var.check(var.borrow())?;
3317
3318 vars.insert(name.clone(), var);
3319 }
3320 };
3321 name
3322 }
3323 PlannedRoleVariable::Reset { name } => {
3324 vars.remove(&name);
3326 name
3327 }
3328 };
3329
3330 notices.push(AdapterNotice::VarDefaultUpdated {
3332 role: Some(name.clone()),
3333 var_name: Some(var_name),
3334 });
3335 }
3336 }
3337
3338 let op = catalog::Op::AlterRole {
3339 id,
3340 name,
3341 attributes,
3342 nopassword,
3343 vars: RoleVars { map: vars },
3344 };
3345 let response = self
3346 .catalog_transact(Some(session), vec![op])
3347 .await
3348 .map(|_| ExecuteResponse::AlteredRole)?;
3349
3350 session.add_notices(notices);
3352
3353 Ok(response)
3354 }
3355
3356 #[instrument]
3357 pub(super) async fn sequence_alter_sink_prepare(
3358 &mut self,
3359 ctx: ExecuteContext,
3360 plan: plan::AlterSinkPlan,
3361 ) {
3362 let id_bundle = crate::CollectionIdBundle {
3364 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3365 compute_ids: BTreeMap::new(),
3366 };
3367 let read_hold = self.acquire_read_holds(&id_bundle);
3368
3369 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3370 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3371 return;
3372 };
3373
3374 let otel_ctx = OpenTelemetryContext::obtain();
3375 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3376
3377 let plan_validity = PlanValidity::new(
3378 self.catalog().transient_revision(),
3379 BTreeSet::from_iter([plan.item_id, from_item_id]),
3380 Some(plan.in_cluster),
3381 None,
3382 ctx.session().role_metadata().clone(),
3383 );
3384
3385 info!(
3386 "preparing alter sink for {}: frontiers={:?} export={:?}",
3387 plan.global_id,
3388 self.controller
3389 .storage_collections
3390 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3391 self.controller.storage.export(plan.global_id)
3392 );
3393
3394 self.install_storage_watch_set(
3400 ctx.session().conn_id().clone(),
3401 BTreeSet::from_iter([plan.global_id]),
3402 read_ts,
3403 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3404 ctx: Some(ctx),
3405 otel_ctx,
3406 plan,
3407 plan_validity,
3408 read_hold,
3409 }),
3410 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3411 }
3412
3413 #[instrument]
3414 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3415 ctx.otel_ctx.attach_as_parent();
3416
3417 let plan::AlterSinkPlan {
3418 item_id,
3419 global_id,
3420 sink: sink_plan,
3421 with_snapshot,
3422 in_cluster,
3423 } = ctx.plan.clone();
3424
3425 match ctx.plan_validity.check(self.catalog()) {
3434 Ok(()) => {}
3435 Err(err) => {
3436 ctx.retire(Err(err));
3437 return;
3438 }
3439 }
3440
3441 let entry = self.catalog().get_entry(&item_id);
3442 let CatalogItem::Sink(old_sink) = entry.item() else {
3443 panic!("invalid item kind for `AlterSinkPlan`");
3444 };
3445
3446 if sink_plan.version != old_sink.version + 1 {
3447 ctx.retire(Err(AdapterError::ChangedPlan(
3448 "sink was altered concurrently".into(),
3449 )));
3450 return;
3451 }
3452
3453 info!(
3454 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3455 self.controller
3456 .storage_collections
3457 .collections_frontiers(vec![global_id, sink_plan.from]),
3458 self.controller.storage.export(global_id),
3459 );
3460
3461 let write_frontier = &self
3464 .controller
3465 .storage
3466 .export(global_id)
3467 .expect("sink known to exist")
3468 .write_frontier;
3469 let as_of = ctx.read_hold.least_valid_read();
3470 assert!(
3471 write_frontier.iter().all(|t| as_of.less_than(t)),
3472 "{:?} should be strictly less than {:?}",
3473 &*as_of,
3474 &**write_frontier
3475 );
3476
3477 let create_sql = &old_sink.create_sql;
3483 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3484 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3485 unreachable!("invalid statement kind for sink");
3486 };
3487
3488 stmt.with_options
3490 .retain(|o| o.name != CreateSinkOptionName::Version);
3491 stmt.with_options.push(CreateSinkOption {
3492 name: CreateSinkOptionName::Version,
3493 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3494 sink_plan.version.to_string(),
3495 ))),
3496 });
3497
3498 let conn_catalog = self.catalog().for_system_session();
3499 let (mut stmt, resolved_ids) =
3500 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3501
3502 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3504 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3505 stmt.from = ResolvedItemName::Item {
3506 id: from_entry.id(),
3507 qualifiers: from_entry.name.qualifiers.clone(),
3508 full_name,
3509 print_id: true,
3510 version: from_entry.version,
3511 };
3512
3513 let new_sink = Sink {
3514 create_sql: stmt.to_ast_string_stable(),
3515 global_id,
3516 from: sink_plan.from,
3517 connection: sink_plan.connection.clone(),
3518 envelope: sink_plan.envelope,
3519 version: sink_plan.version,
3520 with_snapshot,
3521 resolved_ids: resolved_ids.clone(),
3522 cluster_id: in_cluster,
3523 commit_interval: sink_plan.commit_interval,
3524 };
3525
3526 let ops = vec![catalog::Op::UpdateItem {
3527 id: item_id,
3528 name: entry.name().clone(),
3529 to_item: CatalogItem::Sink(new_sink),
3530 }];
3531
3532 match self
3533 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3534 .await
3535 {
3536 Ok(()) => {}
3537 Err(err) => {
3538 ctx.retire(Err(err));
3539 return;
3540 }
3541 }
3542
3543 let storage_sink_desc = StorageSinkDesc {
3544 from: sink_plan.from,
3545 from_desc: from_entry
3546 .relation_desc()
3547 .expect("sinks can only be built on items with descs")
3548 .into_owned(),
3549 connection: sink_plan
3550 .connection
3551 .clone()
3552 .into_inline_connection(self.catalog().state()),
3553 envelope: sink_plan.envelope,
3554 as_of,
3555 with_snapshot,
3556 version: sink_plan.version,
3557 from_storage_metadata: (),
3558 to_storage_metadata: (),
3559 commit_interval: sink_plan.commit_interval,
3560 };
3561
3562 self.controller
3563 .storage
3564 .alter_export(
3565 global_id,
3566 ExportDescription {
3567 sink: storage_sink_desc,
3568 instance_id: in_cluster,
3569 },
3570 )
3571 .await
3572 .unwrap_or_terminate("cannot fail to alter source desc");
3573
3574 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3575 }
3576
3577 #[instrument]
3578 pub(super) async fn sequence_alter_connection(
3579 &mut self,
3580 ctx: ExecuteContext,
3581 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3582 ) {
3583 match action {
3584 AlterConnectionAction::RotateKeys => {
3585 self.sequence_rotate_keys(ctx, id).await;
3586 }
3587 AlterConnectionAction::AlterOptions {
3588 set_options,
3589 drop_options,
3590 validate,
3591 } => {
3592 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3593 .await
3594 }
3595 }
3596 }
3597
3598 #[instrument]
3599 async fn sequence_alter_connection_options(
3600 &mut self,
3601 mut ctx: ExecuteContext,
3602 id: CatalogItemId,
3603 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3604 drop_options: BTreeSet<ConnectionOptionName>,
3605 validate: bool,
3606 ) {
3607 let cur_entry = self.catalog().get_entry(&id);
3608 let cur_conn = cur_entry.connection().expect("known to be connection");
3609 let connection_gid = cur_conn.global_id();
3610
3611 let inner = || -> Result<Connection, AdapterError> {
3612 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3614 .expect("invalid create sql persisted to catalog")
3615 .into_element()
3616 .ast
3617 {
3618 Statement::CreateConnection(stmt) => stmt,
3619 _ => unreachable!("proved type is source"),
3620 };
3621
3622 let catalog = self.catalog().for_system_session();
3623
3624 let (mut create_conn_stmt, resolved_ids) =
3626 mz_sql::names::resolve(&catalog, create_conn_stmt)
3627 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3628
3629 create_conn_stmt
3631 .values
3632 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3633
3634 create_conn_stmt.values.extend(
3636 set_options
3637 .into_iter()
3638 .map(|(name, value)| ConnectionOption { name, value }),
3639 );
3640
3641 let mut catalog = self.catalog().for_system_session();
3644 catalog.mark_id_unresolvable_for_replanning(id);
3645
3646 let plan = match mz_sql::plan::plan(
3648 None,
3649 &catalog,
3650 Statement::CreateConnection(create_conn_stmt),
3651 &Params::empty(),
3652 &resolved_ids,
3653 )
3654 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3655 {
3656 Plan::CreateConnection(plan) => plan,
3657 _ => unreachable!("create source plan is only valid response"),
3658 };
3659
3660 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3662 .expect("invalid create sql persisted to catalog")
3663 .into_element()
3664 .ast
3665 {
3666 Statement::CreateConnection(stmt) => stmt,
3667 _ => unreachable!("proved type is source"),
3668 };
3669
3670 let catalog = self.catalog().for_system_session();
3671
3672 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3674 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3675
3676 Ok(Connection {
3677 create_sql: plan.connection.create_sql,
3678 global_id: cur_conn.global_id,
3679 details: plan.connection.details,
3680 resolved_ids: new_deps,
3681 })
3682 };
3683
3684 let conn = match inner() {
3685 Ok(conn) => conn,
3686 Err(e) => {
3687 return ctx.retire(Err(e));
3688 }
3689 };
3690
3691 if validate {
3692 let connection = conn
3693 .details
3694 .to_connection()
3695 .into_inline_connection(self.catalog().state());
3696
3697 let internal_cmd_tx = self.internal_cmd_tx.clone();
3698 let transient_revision = self.catalog().transient_revision();
3699 let conn_id = ctx.session().conn_id().clone();
3700 let otel_ctx = OpenTelemetryContext::obtain();
3701 let role_metadata = ctx.session().role_metadata().clone();
3702 let current_storage_parameters = self.controller.storage.config().clone();
3703
3704 task::spawn(
3705 || format!("validate_alter_connection:{conn_id}"),
3706 async move {
3707 let resolved_ids = conn.resolved_ids.clone();
3708 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3709 let result = match std::panic::AssertUnwindSafe(
3710 connection.validate(id, ¤t_storage_parameters),
3711 )
3712 .ore_catch_unwind()
3713 .await
3714 {
3715 Ok(Ok(())) => Ok(conn),
3716 Ok(Err(err)) => Err(err.into()),
3717 Err(_panic) => {
3718 tracing::error!("alter connection validation panicked");
3719 Err(AdapterError::Internal(
3720 "connection validation panicked".into(),
3721 ))
3722 }
3723 };
3724
3725 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3727 AlterConnectionValidationReady {
3728 ctx,
3729 result,
3730 connection_id: id,
3731 connection_gid,
3732 plan_validity: PlanValidity::new(
3733 transient_revision,
3734 dependency_ids.clone(),
3735 None,
3736 None,
3737 role_metadata,
3738 ),
3739 otel_ctx,
3740 resolved_ids,
3741 },
3742 ));
3743 if let Err(e) = result {
3744 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3745 }
3746 },
3747 );
3748 } else {
3749 let result = self
3750 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3751 .await;
3752 ctx.retire(result);
3753 }
3754 }
3755
3756 #[instrument]
3757 pub(crate) async fn sequence_alter_connection_stage_finish(
3758 &mut self,
3759 session: &Session,
3760 id: CatalogItemId,
3761 connection: Connection,
3762 ) -> Result<ExecuteResponse, AdapterError> {
3763 match self.catalog.get_entry(&id).item() {
3764 CatalogItem::Connection(curr_conn) => {
3765 curr_conn
3766 .details
3767 .to_connection()
3768 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3769 .map_err(StorageError::from)?;
3770 }
3771 _ => unreachable!("known to be a connection"),
3772 };
3773
3774 let ops = vec![catalog::Op::UpdateItem {
3775 id,
3776 name: self.catalog.get_entry(&id).name().clone(),
3777 to_item: CatalogItem::Connection(connection.clone()),
3778 }];
3779
3780 self.catalog_transact(Some(session), ops).await?;
3781
3782 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3789 }
3790
3791 #[instrument]
3792 pub(super) async fn sequence_alter_source(
3793 &mut self,
3794 session: &Session,
3795 plan::AlterSourcePlan {
3796 item_id,
3797 ingestion_id,
3798 action,
3799 }: plan::AlterSourcePlan,
3800 ) -> Result<ExecuteResponse, AdapterError> {
3801 let cur_entry = self.catalog().get_entry(&item_id);
3802 let cur_source = cur_entry.source().expect("known to be source");
3803
3804 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3805 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3807 .expect("invalid create sql persisted to catalog")
3808 .into_element()
3809 .ast
3810 {
3811 Statement::CreateSource(stmt) => stmt,
3812 _ => unreachable!("proved type is source"),
3813 };
3814
3815 let catalog = coord.catalog().for_system_session();
3816
3817 mz_sql::names::resolve(&catalog, create_source_stmt)
3819 .map_err(|e| AdapterError::internal(err_cx, e))
3820 };
3821
3822 match action {
3823 plan::AlterSourceAction::AddSubsourceExports {
3824 subsources,
3825 options,
3826 } => {
3827 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3828
3829 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3830 text_columns: mut new_text_columns,
3831 exclude_columns: mut new_exclude_columns,
3832 ..
3833 } = options.try_into()?;
3834
3835 let (mut create_source_stmt, resolved_ids) =
3837 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3838
3839 let catalog = self.catalog();
3841 let curr_references: BTreeSet<_> = catalog
3842 .get_entry(&item_id)
3843 .used_by()
3844 .into_iter()
3845 .filter_map(|subsource| {
3846 catalog
3847 .get_entry(subsource)
3848 .subsource_details()
3849 .map(|(_id, reference, _details)| reference)
3850 })
3851 .collect();
3852
3853 let purification_err =
3856 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3857
3858 match &mut create_source_stmt.connection {
3862 CreateSourceConnection::Postgres {
3863 options: curr_options,
3864 ..
3865 } => {
3866 let mz_sql::plan::PgConfigOptionExtracted {
3867 mut text_columns, ..
3868 } = curr_options.clone().try_into()?;
3869
3870 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3873
3874 text_columns.retain(|column_qualified_reference| {
3876 mz_ore::soft_assert_eq_or_log!(
3877 column_qualified_reference.0.len(),
3878 4,
3879 "all TEXT COLUMNS values must be column-qualified references"
3880 );
3881 let mut table = column_qualified_reference.clone();
3882 table.0.truncate(3);
3883 curr_references.contains(&table)
3884 });
3885
3886 new_text_columns.extend(text_columns);
3888
3889 if !new_text_columns.is_empty() {
3891 new_text_columns.sort();
3892 let new_text_columns = new_text_columns
3893 .into_iter()
3894 .map(WithOptionValue::UnresolvedItemName)
3895 .collect();
3896
3897 curr_options.push(PgConfigOption {
3898 name: PgConfigOptionName::TextColumns,
3899 value: Some(WithOptionValue::Sequence(new_text_columns)),
3900 });
3901 }
3902 }
3903 CreateSourceConnection::MySql {
3904 options: curr_options,
3905 ..
3906 } => {
3907 let mz_sql::plan::MySqlConfigOptionExtracted {
3908 mut text_columns,
3909 mut exclude_columns,
3910 ..
3911 } = curr_options.clone().try_into()?;
3912
3913 curr_options.retain(|o| {
3916 !matches!(
3917 o.name,
3918 MySqlConfigOptionName::TextColumns
3919 | MySqlConfigOptionName::ExcludeColumns
3920 )
3921 });
3922
3923 let column_referenced =
3925 |column_qualified_reference: &UnresolvedItemName| {
3926 mz_ore::soft_assert_eq_or_log!(
3927 column_qualified_reference.0.len(),
3928 3,
3929 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3930 );
3931 let mut table = column_qualified_reference.clone();
3932 table.0.truncate(2);
3933 curr_references.contains(&table)
3934 };
3935 text_columns.retain(column_referenced);
3936 exclude_columns.retain(column_referenced);
3937
3938 new_text_columns.extend(text_columns);
3940 new_exclude_columns.extend(exclude_columns);
3941
3942 if !new_text_columns.is_empty() {
3944 new_text_columns.sort();
3945 let new_text_columns = new_text_columns
3946 .into_iter()
3947 .map(WithOptionValue::UnresolvedItemName)
3948 .collect();
3949
3950 curr_options.push(MySqlConfigOption {
3951 name: MySqlConfigOptionName::TextColumns,
3952 value: Some(WithOptionValue::Sequence(new_text_columns)),
3953 });
3954 }
3955 if !new_exclude_columns.is_empty() {
3957 new_exclude_columns.sort();
3958 let new_exclude_columns = new_exclude_columns
3959 .into_iter()
3960 .map(WithOptionValue::UnresolvedItemName)
3961 .collect();
3962
3963 curr_options.push(MySqlConfigOption {
3964 name: MySqlConfigOptionName::ExcludeColumns,
3965 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3966 });
3967 }
3968 }
3969 CreateSourceConnection::SqlServer {
3970 options: curr_options,
3971 ..
3972 } => {
3973 let mz_sql::plan::SqlServerConfigOptionExtracted {
3974 mut text_columns,
3975 mut exclude_columns,
3976 ..
3977 } = curr_options.clone().try_into()?;
3978
3979 curr_options.retain(|o| {
3982 !matches!(
3983 o.name,
3984 SqlServerConfigOptionName::TextColumns
3985 | SqlServerConfigOptionName::ExcludeColumns
3986 )
3987 });
3988
3989 let column_referenced =
3995 |column_qualified_reference: &UnresolvedItemName| {
3996 mz_ore::soft_assert_eq_or_log!(
3997 column_qualified_reference.0.len(),
3998 3,
3999 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4000 );
4001 let mut table = column_qualified_reference.clone();
4002 table.0.truncate(2);
4003 curr_references.iter().any(|r| r.0.ends_with(&table.0))
4004 };
4005 text_columns.retain(column_referenced);
4006 exclude_columns.retain(column_referenced);
4007
4008 new_text_columns.extend(text_columns);
4010 new_exclude_columns.extend(exclude_columns);
4011
4012 if !new_text_columns.is_empty() {
4014 new_text_columns.sort();
4015 let new_text_columns = new_text_columns
4016 .into_iter()
4017 .map(WithOptionValue::UnresolvedItemName)
4018 .collect();
4019
4020 curr_options.push(SqlServerConfigOption {
4021 name: SqlServerConfigOptionName::TextColumns,
4022 value: Some(WithOptionValue::Sequence(new_text_columns)),
4023 });
4024 }
4025 if !new_exclude_columns.is_empty() {
4027 new_exclude_columns.sort();
4028 let new_exclude_columns = new_exclude_columns
4029 .into_iter()
4030 .map(WithOptionValue::UnresolvedItemName)
4031 .collect();
4032
4033 curr_options.push(SqlServerConfigOption {
4034 name: SqlServerConfigOptionName::ExcludeColumns,
4035 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4036 });
4037 }
4038 }
4039 _ => return Err(purification_err()),
4040 };
4041
4042 let mut catalog = self.catalog().for_system_session();
4043 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4044
4045 let planned = mz_sql::plan::plan(
4047 None,
4048 &catalog,
4049 Statement::CreateSource(create_source_stmt),
4050 &Params::empty(),
4051 &resolved_ids,
4052 )
4053 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4054 let plan = match planned {
4055 Plan::CreateSource(plan) => plan,
4056 _ => unreachable!("create source plan is only valid response"),
4057 };
4058
4059 let source = Source::new(
4063 plan,
4064 cur_source.global_id,
4065 resolved_ids,
4066 cur_source.custom_logical_compaction_window,
4067 cur_source.is_retained_metrics_object,
4068 );
4069
4070 let desc = match &source.data_source {
4072 DataSourceDesc::Ingestion { desc, .. }
4073 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4074 desc.clone().into_inline_connection(self.catalog().state())
4075 }
4076 _ => unreachable!("already verified of type ingestion"),
4077 };
4078
4079 self.controller
4080 .storage
4081 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4082 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4083
4084 let mut ops = vec![catalog::Op::UpdateItem {
4087 id: item_id,
4088 name: self.catalog.get_entry(&item_id).name().clone(),
4091 to_item: CatalogItem::Source(source),
4092 }];
4093
4094 let CreateSourceInner {
4095 ops: new_ops,
4096 sources: _,
4097 if_not_exists_ids,
4098 } = self.create_source_inner(session, subsources).await?;
4099
4100 ops.extend(new_ops.into_iter());
4101
4102 assert!(
4103 if_not_exists_ids.is_empty(),
4104 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4105 );
4106
4107 self.catalog_transact(Some(session), ops).await?;
4108 }
4109 plan::AlterSourceAction::RefreshReferences { references } => {
4110 self.catalog_transact(
4111 Some(session),
4112 vec![catalog::Op::UpdateSourceReferences {
4113 source_id: item_id,
4114 references: references.into(),
4115 }],
4116 )
4117 .await?;
4118 }
4119 }
4120
4121 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4122 }
4123
4124 #[instrument]
4125 pub(super) async fn sequence_alter_system_set(
4126 &mut self,
4127 session: &Session,
4128 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4129 ) -> Result<ExecuteResponse, AdapterError> {
4130 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4131 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4133 self.validate_alter_system_network_policy(session, &value)?;
4134 }
4135
4136 let op = match value {
4137 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4138 name: name.clone(),
4139 value: OwnedVarInput::SqlSet(values),
4140 },
4141 plan::VariableValue::Default => {
4142 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4143 }
4144 };
4145 self.catalog_transact(Some(session), vec![op]).await?;
4146
4147 session.add_notice(AdapterNotice::VarDefaultUpdated {
4148 role: None,
4149 var_name: Some(name),
4150 });
4151 Ok(ExecuteResponse::AlteredSystemConfiguration)
4152 }
4153
4154 #[instrument]
4155 pub(super) async fn sequence_alter_system_reset(
4156 &mut self,
4157 session: &Session,
4158 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4159 ) -> Result<ExecuteResponse, AdapterError> {
4160 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4161 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4162 self.catalog_transact(Some(session), vec![op]).await?;
4163 session.add_notice(AdapterNotice::VarDefaultUpdated {
4164 role: None,
4165 var_name: Some(name),
4166 });
4167 Ok(ExecuteResponse::AlteredSystemConfiguration)
4168 }
4169
4170 #[instrument]
4171 pub(super) async fn sequence_alter_system_reset_all(
4172 &mut self,
4173 session: &Session,
4174 _: plan::AlterSystemResetAllPlan,
4175 ) -> Result<ExecuteResponse, AdapterError> {
4176 self.is_user_allowed_to_alter_system(session, None)?;
4177 let op = catalog::Op::ResetAllSystemConfiguration;
4178 self.catalog_transact(Some(session), vec![op]).await?;
4179 session.add_notice(AdapterNotice::VarDefaultUpdated {
4180 role: None,
4181 var_name: None,
4182 });
4183 Ok(ExecuteResponse::AlteredSystemConfiguration)
4184 }
4185
4186 fn is_user_allowed_to_alter_system(
4188 &self,
4189 session: &Session,
4190 var_name: Option<&str>,
4191 ) -> Result<(), AdapterError> {
4192 match (session.user().kind(), var_name) {
4193 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4195 (UserKind::Superuser, Some(name))
4197 if session.user().is_internal()
4198 || self.catalog().system_config().user_modifiable(name) =>
4199 {
4200 let var = self.catalog().system_config().get(name)?;
4203 var.visible(session.user(), self.catalog().system_config())?;
4204 Ok(())
4205 }
4206 (UserKind::Regular, Some(name))
4209 if self.catalog().system_config().user_modifiable(name) =>
4210 {
4211 Err(AdapterError::Unauthorized(
4212 rbac::UnauthorizedError::Superuser {
4213 action: format!("toggle the '{name}' system configuration parameter"),
4214 },
4215 ))
4216 }
4217 _ => Err(AdapterError::Unauthorized(
4218 rbac::UnauthorizedError::MzSystem {
4219 action: "alter system".into(),
4220 },
4221 )),
4222 }
4223 }
4224
4225 fn validate_alter_system_network_policy(
4226 &self,
4227 session: &Session,
4228 policy_value: &plan::VariableValue,
4229 ) -> Result<(), AdapterError> {
4230 let policy_name = match &policy_value {
4231 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4233 plan::VariableValue::Values(values) if values.len() == 1 => {
4234 values.iter().next().cloned()
4235 }
4236 plan::VariableValue::Values(values) => {
4237 tracing::warn!(?values, "can't set multiple network policies at once");
4238 None
4239 }
4240 };
4241 let maybe_network_policy = policy_name
4242 .as_ref()
4243 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4244 let Some(network_policy) = maybe_network_policy else {
4245 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4246 VarError::InvalidParameterValue {
4247 name: NETWORK_POLICY.name(),
4248 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4249 reason: "no network policy with such name exists".to_string(),
4250 },
4251 )));
4252 };
4253 self.validate_alter_network_policy(session, &network_policy.rules)
4254 }
4255
4256 fn validate_alter_network_policy(
4261 &self,
4262 session: &Session,
4263 policy_rules: &Vec<NetworkPolicyRule>,
4264 ) -> Result<(), AdapterError> {
4265 if session.user().is_internal() {
4268 return Ok(());
4269 }
4270 if let Some(ip) = session.meta().client_ip() {
4271 validate_ip_with_policy_rules(ip, policy_rules)
4272 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4273 } else {
4274 return Err(AdapterError::NetworkPolicyDenied(
4277 NetworkPolicyError::MissingIp,
4278 ));
4279 }
4280 Ok(())
4281 }
4282
4283 #[instrument]
4285 pub(super) fn sequence_execute(
4286 &self,
4287 session: &mut Session,
4288 plan: plan::ExecutePlan,
4289 ) -> Result<String, AdapterError> {
4290 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4292 let ps = session
4293 .get_prepared_statement_unverified(&plan.name)
4294 .expect("known to exist");
4295 let stmt = ps.stmt().cloned();
4296 let desc = ps.desc().clone();
4297 let state_revision = ps.state_revision;
4298 let logging = Arc::clone(ps.logging());
4299 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4300 }
4301
4302 #[instrument]
4303 pub(super) async fn sequence_grant_privileges(
4304 &mut self,
4305 session: &Session,
4306 plan::GrantPrivilegesPlan {
4307 update_privileges,
4308 grantees,
4309 }: plan::GrantPrivilegesPlan,
4310 ) -> Result<ExecuteResponse, AdapterError> {
4311 self.sequence_update_privileges(
4312 session,
4313 update_privileges,
4314 grantees,
4315 UpdatePrivilegeVariant::Grant,
4316 )
4317 .await
4318 }
4319
4320 #[instrument]
4321 pub(super) async fn sequence_revoke_privileges(
4322 &mut self,
4323 session: &Session,
4324 plan::RevokePrivilegesPlan {
4325 update_privileges,
4326 revokees,
4327 }: plan::RevokePrivilegesPlan,
4328 ) -> Result<ExecuteResponse, AdapterError> {
4329 self.sequence_update_privileges(
4330 session,
4331 update_privileges,
4332 revokees,
4333 UpdatePrivilegeVariant::Revoke,
4334 )
4335 .await
4336 }
4337
4338 #[instrument]
4339 async fn sequence_update_privileges(
4340 &mut self,
4341 session: &Session,
4342 update_privileges: Vec<UpdatePrivilege>,
4343 grantees: Vec<RoleId>,
4344 variant: UpdatePrivilegeVariant,
4345 ) -> Result<ExecuteResponse, AdapterError> {
4346 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4347 let mut warnings = Vec::new();
4348 let catalog = self.catalog().for_session(session);
4349
4350 for UpdatePrivilege {
4351 acl_mode,
4352 target_id,
4353 grantor,
4354 } in update_privileges
4355 {
4356 let actual_object_type = catalog.get_system_object_type(&target_id);
4357 if actual_object_type.is_relation() {
4360 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4361 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4362 if !non_applicable_privileges.is_empty() {
4363 let object_description =
4364 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4365 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4366 non_applicable_privileges,
4367 object_description,
4368 })
4369 }
4370 }
4371
4372 if let SystemObjectId::Object(object_id) = &target_id {
4373 self.catalog()
4374 .ensure_not_reserved_object(object_id, session.conn_id())?;
4375 }
4376
4377 let privileges = self
4378 .catalog()
4379 .get_privileges(&target_id, session.conn_id())
4380 .ok_or(AdapterError::Unsupported(
4383 "GRANTs/REVOKEs on an object type with no privileges",
4384 ))?;
4385
4386 for grantee in &grantees {
4387 self.catalog().ensure_not_system_role(grantee)?;
4388 self.catalog().ensure_not_predefined_role(grantee)?;
4389 let existing_privilege = privileges
4390 .get_acl_item(grantee, &grantor)
4391 .map(Cow::Borrowed)
4392 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4393
4394 match variant {
4395 UpdatePrivilegeVariant::Grant
4396 if !existing_privilege.acl_mode.contains(acl_mode) =>
4397 {
4398 ops.push(catalog::Op::UpdatePrivilege {
4399 target_id: target_id.clone(),
4400 privilege: MzAclItem {
4401 grantee: *grantee,
4402 grantor,
4403 acl_mode,
4404 },
4405 variant,
4406 });
4407 }
4408 UpdatePrivilegeVariant::Revoke
4409 if !existing_privilege
4410 .acl_mode
4411 .intersection(acl_mode)
4412 .is_empty() =>
4413 {
4414 ops.push(catalog::Op::UpdatePrivilege {
4415 target_id: target_id.clone(),
4416 privilege: MzAclItem {
4417 grantee: *grantee,
4418 grantor,
4419 acl_mode,
4420 },
4421 variant,
4422 });
4423 }
4424 _ => {}
4426 }
4427 }
4428 }
4429
4430 if ops.is_empty() {
4431 session.add_notices(warnings);
4432 return Ok(variant.into());
4433 }
4434
4435 let res = self
4436 .catalog_transact(Some(session), ops)
4437 .await
4438 .map(|_| match variant {
4439 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4440 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4441 });
4442 if res.is_ok() {
4443 session.add_notices(warnings);
4444 }
4445 res
4446 }
4447
4448 #[instrument]
4449 pub(super) async fn sequence_alter_default_privileges(
4450 &mut self,
4451 session: &Session,
4452 plan::AlterDefaultPrivilegesPlan {
4453 privilege_objects,
4454 privilege_acl_items,
4455 is_grant,
4456 }: plan::AlterDefaultPrivilegesPlan,
4457 ) -> Result<ExecuteResponse, AdapterError> {
4458 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4459 let variant = if is_grant {
4460 UpdatePrivilegeVariant::Grant
4461 } else {
4462 UpdatePrivilegeVariant::Revoke
4463 };
4464 for privilege_object in &privilege_objects {
4465 self.catalog()
4466 .ensure_not_system_role(&privilege_object.role_id)?;
4467 self.catalog()
4468 .ensure_not_predefined_role(&privilege_object.role_id)?;
4469 if let Some(database_id) = privilege_object.database_id {
4470 self.catalog()
4471 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4472 }
4473 if let Some(schema_id) = privilege_object.schema_id {
4474 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4475 let schema_spec: SchemaSpecifier = schema_id.into();
4476
4477 self.catalog().ensure_not_reserved_object(
4478 &(database_spec, schema_spec).into(),
4479 session.conn_id(),
4480 )?;
4481 }
4482 for privilege_acl_item in &privilege_acl_items {
4483 self.catalog()
4484 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4485 self.catalog()
4486 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4487 ops.push(catalog::Op::UpdateDefaultPrivilege {
4488 privilege_object: privilege_object.clone(),
4489 privilege_acl_item: privilege_acl_item.clone(),
4490 variant,
4491 })
4492 }
4493 }
4494
4495 self.catalog_transact(Some(session), ops).await?;
4496 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4497 }
4498
4499 #[instrument]
4500 pub(super) async fn sequence_grant_role(
4501 &mut self,
4502 session: &Session,
4503 plan::GrantRolePlan {
4504 role_ids,
4505 member_ids,
4506 grantor_id,
4507 }: plan::GrantRolePlan,
4508 ) -> Result<ExecuteResponse, AdapterError> {
4509 let catalog = self.catalog();
4510 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4511 for role_id in role_ids {
4512 for member_id in &member_ids {
4513 let member_membership: BTreeSet<_> =
4514 catalog.get_role(member_id).membership().keys().collect();
4515 if member_membership.contains(&role_id) {
4516 let role_name = catalog.get_role(&role_id).name().to_string();
4517 let member_name = catalog.get_role(member_id).name().to_string();
4518 catalog.ensure_not_reserved_role(member_id)?;
4520 catalog.ensure_grantable_role(&role_id)?;
4521 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4522 role_name,
4523 member_name,
4524 });
4525 } else {
4526 ops.push(catalog::Op::GrantRole {
4527 role_id,
4528 member_id: *member_id,
4529 grantor_id,
4530 });
4531 }
4532 }
4533 }
4534
4535 if ops.is_empty() {
4536 return Ok(ExecuteResponse::GrantedRole);
4537 }
4538
4539 self.catalog_transact(Some(session), ops)
4540 .await
4541 .map(|_| ExecuteResponse::GrantedRole)
4542 }
4543
4544 #[instrument]
4545 pub(super) async fn sequence_revoke_role(
4546 &mut self,
4547 session: &Session,
4548 plan::RevokeRolePlan {
4549 role_ids,
4550 member_ids,
4551 grantor_id,
4552 }: plan::RevokeRolePlan,
4553 ) -> Result<ExecuteResponse, AdapterError> {
4554 let catalog = self.catalog();
4555 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4556 for role_id in role_ids {
4557 for member_id in &member_ids {
4558 let member_membership: BTreeSet<_> =
4559 catalog.get_role(member_id).membership().keys().collect();
4560 if !member_membership.contains(&role_id) {
4561 let role_name = catalog.get_role(&role_id).name().to_string();
4562 let member_name = catalog.get_role(member_id).name().to_string();
4563 catalog.ensure_not_reserved_role(member_id)?;
4565 catalog.ensure_grantable_role(&role_id)?;
4566 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4567 role_name,
4568 member_name,
4569 });
4570 } else {
4571 ops.push(catalog::Op::RevokeRole {
4572 role_id,
4573 member_id: *member_id,
4574 grantor_id,
4575 });
4576 }
4577 }
4578 }
4579
4580 if ops.is_empty() {
4581 return Ok(ExecuteResponse::RevokedRole);
4582 }
4583
4584 self.catalog_transact(Some(session), ops)
4585 .await
4586 .map(|_| ExecuteResponse::RevokedRole)
4587 }
4588
4589 #[instrument]
4590 pub(super) async fn sequence_alter_owner(
4591 &mut self,
4592 session: &Session,
4593 plan::AlterOwnerPlan {
4594 id,
4595 object_type,
4596 new_owner,
4597 }: plan::AlterOwnerPlan,
4598 ) -> Result<ExecuteResponse, AdapterError> {
4599 let mut ops = vec![catalog::Op::UpdateOwner {
4600 id: id.clone(),
4601 new_owner,
4602 }];
4603
4604 match &id {
4605 ObjectId::Item(global_id) => {
4606 let entry = self.catalog().get_entry(global_id);
4607
4608 if entry.is_index() {
4610 let name = self
4611 .catalog()
4612 .resolve_full_name(entry.name(), Some(session.conn_id()))
4613 .to_string();
4614 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4615 return Ok(ExecuteResponse::AlteredObject(object_type));
4616 }
4617
4618 let dependent_index_ops = entry
4620 .used_by()
4621 .into_iter()
4622 .filter(|id| self.catalog().get_entry(id).is_index())
4623 .map(|id| catalog::Op::UpdateOwner {
4624 id: ObjectId::Item(*id),
4625 new_owner,
4626 });
4627 ops.extend(dependent_index_ops);
4628
4629 let dependent_subsources =
4631 entry
4632 .progress_id()
4633 .into_iter()
4634 .map(|item_id| catalog::Op::UpdateOwner {
4635 id: ObjectId::Item(item_id),
4636 new_owner,
4637 });
4638 ops.extend(dependent_subsources);
4639 }
4640 ObjectId::Cluster(cluster_id) => {
4641 let cluster = self.catalog().get_cluster(*cluster_id);
4642 let managed_cluster_replica_ops =
4644 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4645 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4646 new_owner,
4647 });
4648 ops.extend(managed_cluster_replica_ops);
4649 }
4650 _ => {}
4651 }
4652
4653 self.catalog_transact(Some(session), ops)
4654 .await
4655 .map(|_| ExecuteResponse::AlteredObject(object_type))
4656 }
4657
4658 #[instrument]
4659 pub(super) async fn sequence_reassign_owned(
4660 &mut self,
4661 session: &Session,
4662 plan::ReassignOwnedPlan {
4663 old_roles,
4664 new_role,
4665 reassign_ids,
4666 }: plan::ReassignOwnedPlan,
4667 ) -> Result<ExecuteResponse, AdapterError> {
4668 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4669 self.catalog().ensure_not_reserved_role(role_id)?;
4670 }
4671
4672 let ops = reassign_ids
4673 .into_iter()
4674 .map(|id| catalog::Op::UpdateOwner {
4675 id,
4676 new_owner: new_role,
4677 })
4678 .collect();
4679
4680 self.catalog_transact(Some(session), ops)
4681 .await
4682 .map(|_| ExecuteResponse::ReassignOwned)
4683 }
4684
4685 #[instrument]
4686 pub(crate) async fn handle_deferred_statement(&mut self) {
4687 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4691 return;
4692 };
4693 match ps {
4694 crate::coord::PlanStatement::Statement { stmt, params } => {
4695 self.handle_execute_inner(stmt, params, ctx).await;
4696 }
4697 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4698 self.sequence_plan(ctx, plan, resolved_ids).await;
4699 }
4700 }
4701 }
4702
4703 #[instrument]
4704 #[allow(clippy::unused_async)]
4706 pub(super) async fn sequence_alter_table(
4707 &mut self,
4708 ctx: &mut ExecuteContext,
4709 plan: plan::AlterTablePlan,
4710 ) -> Result<ExecuteResponse, AdapterError> {
4711 let plan::AlterTablePlan {
4712 relation_id,
4713 column_name,
4714 column_type,
4715 raw_sql_type,
4716 } = plan;
4717
4718 let (_, new_global_id) = self.allocate_user_id().await?;
4720 let ops = vec![catalog::Op::AlterAddColumn {
4721 id: relation_id,
4722 new_global_id,
4723 name: column_name,
4724 typ: column_type,
4725 sql: raw_sql_type,
4726 }];
4727
4728 self.catalog_transact_with_context(None, Some(ctx), ops)
4729 .await?;
4730
4731 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4732 }
4733
4734 #[instrument]
4736 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4737 &mut self,
4738 ctx: ExecuteContext,
4739 plan: AlterMaterializedViewApplyReplacementPlan,
4740 ) {
4741 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4751
4752 let plan_validity = PlanValidity::new(
4753 self.catalog().transient_revision(),
4754 BTreeSet::from_iter([id, replacement_id]),
4755 None,
4756 None,
4757 ctx.session().role_metadata().clone(),
4758 );
4759
4760 let target = self.catalog.get_entry(&id);
4761 let target_gid = target.latest_global_id();
4762
4763 let replacement = self.catalog.get_entry(&replacement_id);
4764 let replacement_gid = replacement.latest_global_id();
4765
4766 let target_upper = self
4767 .controller
4768 .storage_collections
4769 .collection_frontiers(target_gid)
4770 .expect("target MV exists")
4771 .write_frontier;
4772 let replacement_upper = self
4773 .controller
4774 .compute
4775 .collection_frontiers(replacement_gid, replacement.cluster_id())
4776 .expect("replacement MV exists")
4777 .write_frontier;
4778
4779 info!(
4780 %id, %replacement_id, ?target_upper, ?replacement_upper,
4781 "preparing materialized view replacement application",
4782 );
4783
4784 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4785 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4794 name: target.name().item.clone(),
4795 }));
4796 return;
4797 };
4798
4799 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4803
4804 self.install_storage_watch_set(
4808 ctx.session().conn_id().clone(),
4809 BTreeSet::from_iter([target_gid]),
4810 replacement_upper_ts,
4811 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4812 ctx: Some(ctx),
4813 otel_ctx: OpenTelemetryContext::obtain(),
4814 plan,
4815 plan_validity,
4816 }),
4817 )
4818 .expect("target collection exists");
4819 }
4820
4821 #[instrument]
4823 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4824 &mut self,
4825 mut ctx: AlterMaterializedViewReadyContext,
4826 ) {
4827 ctx.otel_ctx.attach_as_parent();
4828
4829 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4830
4831 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4836 ctx.retire(Err(err));
4837 return;
4838 }
4839
4840 info!(
4841 %id, %replacement_id,
4842 "finishing materialized view replacement application",
4843 );
4844
4845 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4846 match self
4847 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4848 .await
4849 {
4850 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4851 ObjectType::MaterializedView,
4852 ))),
4853 Err(err) => ctx.retire(Err(err)),
4854 }
4855 }
4856
4857 pub(super) async fn statistics_oracle(
4858 &self,
4859 session: &Session,
4860 source_ids: &BTreeSet<GlobalId>,
4861 query_as_of: &Antichain<Timestamp>,
4862 is_oneshot: bool,
4863 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4864 super::statistics_oracle(
4865 session,
4866 source_ids,
4867 query_as_of,
4868 is_oneshot,
4869 self.catalog().system_config(),
4870 self.controller.storage_collections.as_ref(),
4871 )
4872 .await
4873 }
4874}
4875
4876impl Coordinator {
4877 async fn process_dataflow_metainfo(
4879 &mut self,
4880 df_meta: DataflowMetainfo,
4881 export_id: GlobalId,
4882 ctx: Option<&mut ExecuteContext>,
4883 notice_ids: Vec<GlobalId>,
4884 ) -> Option<BuiltinTableAppendNotify> {
4885 if let Some(ctx) = ctx {
4887 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4888 }
4889
4890 let df_meta = self
4892 .catalog()
4893 .render_notices(df_meta, notice_ids, Some(export_id));
4894
4895 if self.catalog().state().system_config().enable_mz_notices()
4898 && !df_meta.optimizer_notices.is_empty()
4899 {
4900 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4901 self.catalog().state().pack_optimizer_notices(
4902 &mut builtin_table_updates,
4903 df_meta.optimizer_notices.iter(),
4904 Diff::ONE,
4905 );
4906
4907 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4909
4910 Some(
4911 self.builtin_table_update()
4912 .execute(builtin_table_updates)
4913 .await
4914 .0,
4915 )
4916 } else {
4917 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4919
4920 None
4921 }
4922 }
4923}