1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_cloud_resources::VpcEndpointConfig;
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43 RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48 SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58 SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62 StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
103 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
104 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
105 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
106 validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114 WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119mod cluster;
120mod copy_from;
121mod create_continual_task;
122mod create_index;
123mod create_materialized_view;
124mod create_view;
125mod explain_timestamp;
126mod peek;
127mod secret;
128mod subscribe;
129
130macro_rules! return_if_err {
133 ($expr:expr, $ctx:expr) => {
134 match $expr {
135 Ok(v) => v,
136 Err(e) => return $ctx.retire(Err(e.into())),
137 }
138 };
139}
140
141pub(super) use return_if_err;
142
143struct DropOps {
144 ops: Vec<catalog::Op>,
145 dropped_active_db: bool,
146 dropped_active_cluster: bool,
147 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
148}
149
150struct CreateSourceInner {
152 ops: Vec<catalog::Op>,
153 sources: Vec<(CatalogItemId, Source)>,
154 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
155}
156
157impl Coordinator {
158 pub(crate) async fn sequence_staged<S>(
163 &mut self,
164 mut ctx: S::Ctx,
165 parent_span: Span,
166 mut stage: S,
167 ) where
168 S: Staged + 'static,
169 S::Ctx: Send + 'static,
170 {
171 return_if_err!(stage.validity().check(self.catalog()), ctx);
172 loop {
173 let mut cancel_enabled = stage.cancel_enabled();
174 if let Some(session) = ctx.session() {
175 if cancel_enabled {
176 if let Some((_prev_tx, prev_rx)) = self
179 .staged_cancellation
180 .insert(session.conn_id().clone(), watch::channel(false))
181 {
182 let was_canceled = *prev_rx.borrow();
183 if was_canceled {
184 ctx.retire(Err(AdapterError::Canceled));
185 return;
186 }
187 }
188 } else {
189 self.staged_cancellation.remove(session.conn_id());
192 }
193 } else {
194 cancel_enabled = false
195 };
196 let next = stage
197 .stage(self, &mut ctx)
198 .instrument(parent_span.clone())
199 .await;
200 let res = return_if_err!(next, ctx);
201 stage = match res {
202 StageResult::Handle(handle) => {
203 let internal_cmd_tx = self.internal_cmd_tx.clone();
204 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
205 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
206 });
207 return;
208 }
209 StageResult::HandleRetire(handle) => {
210 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
211 ctx.retire(Ok(resp));
212 });
213 return;
214 }
215 StageResult::Response(resp) => {
216 ctx.retire(Ok(resp));
217 return;
218 }
219 StageResult::Immediate(stage) => *stage,
220 }
221 }
222 }
223
224 fn handle_spawn<C, T, F>(
225 &self,
226 ctx: C,
227 handle: JoinHandle<Result<T, AdapterError>>,
228 cancel_enabled: bool,
229 f: F,
230 ) where
231 C: StagedContext + Send + 'static,
232 T: Send + 'static,
233 F: FnOnce(C, T) + Send + 'static,
234 {
235 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
236 .session()
237 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
238 {
239 let mut rx = rx.clone();
240 Box::pin(async move {
241 let _ = rx.wait_for(|v| *v).await;
243 ()
244 })
245 } else {
246 Box::pin(future::pending())
247 };
248 spawn(|| "sequence_staged", async move {
249 tokio::select! {
250 res = handle => {
251 let next = return_if_err!(res, ctx);
252 f(ctx, next);
253 }
254 _ = rx, if cancel_enabled => {
255 ctx.retire(Err(AdapterError::Canceled));
256 }
257 }
258 });
259 }
260
261 async fn create_source_inner(
262 &self,
263 session: &Session,
264 plans: Vec<plan::CreateSourcePlanBundle>,
265 ) -> Result<CreateSourceInner, AdapterError> {
266 let mut ops = vec![];
267 let mut sources = vec![];
268
269 let if_not_exists_ids = plans
270 .iter()
271 .filter_map(
272 |plan::CreateSourcePlanBundle {
273 item_id,
274 global_id: _,
275 plan,
276 resolved_ids: _,
277 available_source_references: _,
278 }| {
279 if plan.if_not_exists {
280 Some((*item_id, plan.name.clone()))
281 } else {
282 None
283 }
284 },
285 )
286 .collect::<BTreeMap<_, _>>();
287
288 for plan::CreateSourcePlanBundle {
289 item_id,
290 global_id,
291 mut plan,
292 resolved_ids,
293 available_source_references,
294 } in plans
295 {
296 let name = plan.name.clone();
297
298 match plan.source.data_source {
299 plan::DataSourceDesc::Ingestion(ref desc)
300 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
301 let cluster_id = plan
302 .in_cluster
303 .expect("ingestion plans must specify cluster");
304 match desc.connection {
305 GenericSourceConnection::Postgres(_)
306 | GenericSourceConnection::MySql(_)
307 | GenericSourceConnection::SqlServer(_)
308 | GenericSourceConnection::Kafka(_)
309 | GenericSourceConnection::LoadGenerator(_) => {
310 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
311 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
312 .get(self.catalog().system_config().dyncfgs());
313
314 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
315 {
316 return Err(AdapterError::Unsupported(
317 "sources in clusters with >1 replicas",
318 ));
319 }
320 }
321 }
322 }
323 }
324 plan::DataSourceDesc::Webhook { .. } => {
325 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
326 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
327 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
328 .get(self.catalog().system_config().dyncfgs());
329
330 if !enable_multi_replica_sources {
331 if cluster.replica_ids().len() > 1 {
332 return Err(AdapterError::Unsupported(
333 "webhook sources in clusters with >1 replicas",
334 ));
335 }
336 }
337 }
338 }
339 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
340 }
341
342 if let mz_sql::plan::DataSourceDesc::Webhook {
344 validate_using: Some(validate),
345 ..
346 } = &mut plan.source.data_source
347 {
348 if let Err(reason) = validate.reduce_expression().await {
349 self.metrics
350 .webhook_validation_reduce_failures
351 .with_label_values(&[reason])
352 .inc();
353 return Err(AdapterError::Internal(format!(
354 "failed to reduce check expression, {reason}"
355 )));
356 }
357 }
358
359 let mut reference_ops = vec![];
362 if let Some(references) = &available_source_references {
363 reference_ops.push(catalog::Op::UpdateSourceReferences {
364 source_id: item_id,
365 references: references.clone().into(),
366 });
367 }
368
369 let source = Source::new(plan, global_id, resolved_ids, None, false);
370 ops.push(catalog::Op::CreateItem {
371 id: item_id,
372 name,
373 item: CatalogItem::Source(source.clone()),
374 owner_id: *session.current_role_id(),
375 });
376 sources.push((item_id, source));
377 ops.extend(reference_ops);
379 }
380
381 Ok(CreateSourceInner {
382 ops,
383 sources,
384 if_not_exists_ids,
385 })
386 }
387
388 pub(crate) fn plan_subsource(
396 &self,
397 session: &Session,
398 params: &mz_sql::plan::Params,
399 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
400 item_id: CatalogItemId,
401 global_id: GlobalId,
402 ) -> Result<CreateSourcePlanBundle, AdapterError> {
403 let catalog = self.catalog().for_session(session);
404 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
405
406 let plan = self.plan_statement(
407 session,
408 Statement::CreateSubsource(subsource_stmt),
409 params,
410 &resolved_ids,
411 )?;
412 let plan = match plan {
413 Plan::CreateSource(plan) => plan,
414 _ => unreachable!(),
415 };
416 Ok(CreateSourcePlanBundle {
417 item_id,
418 global_id,
419 plan,
420 resolved_ids,
421 available_source_references: None,
422 })
423 }
424
425 pub(crate) async fn plan_purified_alter_source_add_subsource(
427 &mut self,
428 session: &Session,
429 params: Params,
430 source_name: ResolvedItemName,
431 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
432 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
433 ) -> Result<(Plan, ResolvedIds), AdapterError> {
434 let mut subsource_plans = Vec::with_capacity(subsources.len());
435
436 let conn_catalog = self.catalog().for_system_session();
438 let pcx = plan::PlanContext::zero();
439 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
440
441 let entry = self.catalog().get_entry(source_name.item_id());
442 let source = entry.source().ok_or_else(|| {
443 AdapterError::internal(
444 "plan alter source",
445 format!("expected Source found {entry:?}"),
446 )
447 })?;
448
449 let item_id = entry.id();
450 let ingestion_id = source.global_id();
451 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
452
453 let id_ts = self.get_catalog_write_ts().await;
454 let ids = self
455 .catalog()
456 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
457 .await?;
458 for (subsource_stmt, (item_id, global_id)) in
459 subsource_stmts.into_iter().zip_eq(ids.into_iter())
460 {
461 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
462 subsource_plans.push(s);
463 }
464
465 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
466 subsources: subsource_plans,
467 options,
468 };
469
470 Ok((
471 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
472 item_id,
473 ingestion_id,
474 action,
475 }),
476 ResolvedIds::empty(),
477 ))
478 }
479
480 pub(crate) fn plan_purified_alter_source_refresh_references(
482 &self,
483 _session: &Session,
484 _params: Params,
485 source_name: ResolvedItemName,
486 available_source_references: plan::SourceReferences,
487 ) -> Result<(Plan, ResolvedIds), AdapterError> {
488 let entry = self.catalog().get_entry(source_name.item_id());
489 let source = entry.source().ok_or_else(|| {
490 AdapterError::internal(
491 "plan alter source",
492 format!("expected Source found {entry:?}"),
493 )
494 })?;
495 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
496 references: available_source_references,
497 };
498
499 Ok((
500 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
501 item_id: entry.id(),
502 ingestion_id: source.global_id(),
503 action,
504 }),
505 ResolvedIds::empty(),
506 ))
507 }
508
509 pub(crate) async fn plan_purified_create_source(
513 &mut self,
514 ctx: &ExecuteContext,
515 params: Params,
516 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
517 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
518 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
519 available_source_references: plan::SourceReferences,
520 ) -> Result<(Plan, ResolvedIds), AdapterError> {
521 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
522
523 if let Some(progress_stmt) = progress_stmt {
525 assert_none!(progress_stmt.of_source);
530 let id_ts = self.get_catalog_write_ts().await;
531 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
532 let progress_plan =
533 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
534 let progress_full_name = self
535 .catalog()
536 .resolve_full_name(&progress_plan.plan.name, None);
537 let progress_subsource = ResolvedItemName::Item {
538 id: progress_plan.item_id,
539 qualifiers: progress_plan.plan.name.qualifiers.clone(),
540 full_name: progress_full_name,
541 print_id: true,
542 version: RelationVersionSelector::Latest,
543 };
544
545 create_source_plans.push(progress_plan);
546
547 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548 }
549
550 let catalog = self.catalog().for_session(ctx.session());
551 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553 let propagated_with_options: Vec<_> = source_stmt
554 .with_options
555 .iter()
556 .filter_map(|opt| match opt.name {
557 CreateSourceOptionName::TimestampInterval => None,
558 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559 name: CreateSubsourceOptionName::RetainHistory,
560 value: opt.value.clone(),
561 }),
562 })
563 .collect();
564
565 let source_plan = match self.plan_statement(
567 ctx.session(),
568 Statement::CreateSource(source_stmt),
569 ¶ms,
570 &resolved_ids,
571 )? {
572 Plan::CreateSource(plan) => plan,
573 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574 };
575
576 let id_ts = self.get_catalog_write_ts().await;
577 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
578
579 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
580 let of_source = ResolvedItemName::Item {
581 id: item_id,
582 qualifiers: source_plan.name.qualifiers.clone(),
583 full_name: source_full_name,
584 print_id: true,
585 version: RelationVersionSelector::Latest,
586 };
587
588 let conn_catalog = self.catalog().for_system_session();
590 let pcx = plan::PlanContext::zero();
591 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
592
593 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
594
595 for subsource_stmt in subsource_stmts.iter_mut() {
596 subsource_stmt
597 .with_options
598 .extend(propagated_with_options.iter().cloned())
599 }
600
601 create_source_plans.push(CreateSourcePlanBundle {
602 item_id,
603 global_id,
604 plan: source_plan,
605 resolved_ids: resolved_ids.clone(),
606 available_source_references: Some(available_source_references),
607 });
608
609 let id_ts = self.get_catalog_write_ts().await;
611 let ids = self
612 .catalog()
613 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
614 .await?;
615 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
616 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
617 create_source_plans.push(plan);
618 }
619
620 Ok((
621 Plan::CreateSources(create_source_plans),
622 ResolvedIds::empty(),
623 ))
624 }
625
626 #[instrument]
627 pub(super) async fn sequence_create_source(
628 &mut self,
629 ctx: &mut ExecuteContext,
630 plans: Vec<plan::CreateSourcePlanBundle>,
631 ) -> Result<ExecuteResponse, AdapterError> {
632 let CreateSourceInner {
633 ops,
634 sources,
635 if_not_exists_ids,
636 } = self.create_source_inner(ctx.session(), plans).await?;
637
638 let transact_result = self
639 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
640 .await;
641
642 for (item_id, source) in &sources {
644 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
645 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
646 ctx.session()
647 .add_notice(AdapterNotice::WebhookSourceCreated { url });
648 }
649 }
650 }
651
652 match transact_result {
653 Ok(()) => Ok(ExecuteResponse::CreatedSource),
654 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
655 kind:
656 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
657 })) if if_not_exists_ids.contains_key(&id) => {
658 ctx.session()
659 .add_notice(AdapterNotice::ObjectAlreadyExists {
660 name: if_not_exists_ids[&id].item.clone(),
661 ty: "source",
662 });
663 Ok(ExecuteResponse::CreatedSource)
664 }
665 Err(err) => Err(err),
666 }
667 }
668
669 #[instrument]
670 pub(super) async fn sequence_create_connection(
671 &mut self,
672 mut ctx: ExecuteContext,
673 plan: plan::CreateConnectionPlan,
674 resolved_ids: ResolvedIds,
675 ) {
676 let id_ts = self.get_catalog_write_ts().await;
677 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
678 Ok(item_id) => item_id,
679 Err(err) => return ctx.retire(Err(err.into())),
680 };
681
682 match &plan.connection.details {
683 ConnectionDetails::Ssh { key_1, key_2, .. } => {
684 let key_1 = match key_1.as_key_pair() {
685 Some(key_1) => key_1.clone(),
686 None => {
687 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
688 "the PUBLIC KEY 1 option cannot be explicitly specified"
689 ))));
690 }
691 };
692
693 let key_2 = match key_2.as_key_pair() {
694 Some(key_2) => key_2.clone(),
695 None => {
696 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
697 "the PUBLIC KEY 2 option cannot be explicitly specified"
698 ))));
699 }
700 };
701
702 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
703 let secret = key_set.to_bytes();
704 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
705 return ctx.retire(Err(err.into()));
706 }
707 }
708 _ => (),
709 };
710
711 if plan.validate {
712 let internal_cmd_tx = self.internal_cmd_tx.clone();
713 let transient_revision = self.catalog().transient_revision();
714 let conn_id = ctx.session().conn_id().clone();
715 let otel_ctx = OpenTelemetryContext::obtain();
716 let role_metadata = ctx.session().role_metadata().clone();
717
718 let connection = plan
719 .connection
720 .details
721 .to_connection()
722 .into_inline_connection(self.catalog().state());
723
724 let current_storage_parameters = self.controller.storage.config().clone();
725 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
726 let result = match connection
727 .validate(connection_id, ¤t_storage_parameters)
728 .await
729 {
730 Ok(()) => Ok(plan),
731 Err(err) => Err(err.into()),
732 };
733
734 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
736 CreateConnectionValidationReady {
737 ctx,
738 result,
739 connection_id,
740 connection_gid,
741 plan_validity: PlanValidity::new(
742 transient_revision,
743 resolved_ids.items().copied().collect(),
744 None,
745 None,
746 role_metadata,
747 ),
748 otel_ctx,
749 resolved_ids: resolved_ids.clone(),
750 },
751 ));
752 if let Err(e) = result {
753 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
754 }
755 });
756 } else {
757 let result = self
758 .sequence_create_connection_stage_finish(
759 &mut ctx,
760 connection_id,
761 connection_gid,
762 plan,
763 resolved_ids,
764 )
765 .await;
766 ctx.retire(result);
767 }
768 }
769
770 #[instrument]
771 pub(crate) async fn sequence_create_connection_stage_finish(
772 &mut self,
773 ctx: &mut ExecuteContext,
774 connection_id: CatalogItemId,
775 connection_gid: GlobalId,
776 plan: plan::CreateConnectionPlan,
777 resolved_ids: ResolvedIds,
778 ) -> Result<ExecuteResponse, AdapterError> {
779 let ops = vec![catalog::Op::CreateItem {
780 id: connection_id,
781 name: plan.name.clone(),
782 item: CatalogItem::Connection(Connection {
783 create_sql: plan.connection.create_sql,
784 global_id: connection_gid,
785 details: plan.connection.details.clone(),
786 resolved_ids,
787 }),
788 owner_id: *ctx.session().current_role_id(),
789 }];
790
791 let transact_result = self
792 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
793 Box::pin(async move {
794 match plan.connection.details {
795 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
796 let spec = VpcEndpointConfig {
797 aws_service_name: privatelink.service_name.to_owned(),
798 availability_zone_ids: privatelink.availability_zones.to_owned(),
799 };
800 let cloud_resource_controller =
801 match coord.cloud_resource_controller.as_ref().cloned() {
802 Some(controller) => controller,
803 None => {
804 tracing::warn!("AWS PrivateLink connections unsupported");
805 return;
806 }
807 };
808 if let Err(err) = cloud_resource_controller
809 .ensure_vpc_endpoint(connection_id, spec)
810 .await
811 {
812 tracing::warn!(?err, "failed to ensure vpc endpoint!");
813 }
814 }
815 _ => {}
816 }
817 })
818 })
819 .await;
820
821 match transact_result {
822 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
823 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
824 kind:
825 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
826 })) if plan.if_not_exists => {
827 ctx.session()
828 .add_notice(AdapterNotice::ObjectAlreadyExists {
829 name: plan.name.item,
830 ty: "connection",
831 });
832 Ok(ExecuteResponse::CreatedConnection)
833 }
834 Err(err) => Err(err),
835 }
836 }
837
838 #[instrument]
839 pub(super) async fn sequence_create_database(
840 &mut self,
841 session: &Session,
842 plan: plan::CreateDatabasePlan,
843 ) -> Result<ExecuteResponse, AdapterError> {
844 let ops = vec![catalog::Op::CreateDatabase {
845 name: plan.name.clone(),
846 owner_id: *session.current_role_id(),
847 }];
848 match self.catalog_transact(Some(session), ops).await {
849 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
850 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851 kind:
852 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
853 })) if plan.if_not_exists => {
854 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
855 Ok(ExecuteResponse::CreatedDatabase)
856 }
857 Err(err) => Err(err),
858 }
859 }
860
861 #[instrument]
862 pub(super) async fn sequence_create_schema(
863 &mut self,
864 session: &Session,
865 plan: plan::CreateSchemaPlan,
866 ) -> Result<ExecuteResponse, AdapterError> {
867 let op = catalog::Op::CreateSchema {
868 database_id: plan.database_spec,
869 schema_name: plan.schema_name.clone(),
870 owner_id: *session.current_role_id(),
871 };
872 match self.catalog_transact(Some(session), vec![op]).await {
873 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
874 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
875 kind:
876 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
877 })) if plan.if_not_exists => {
878 session.add_notice(AdapterNotice::SchemaAlreadyExists {
879 name: plan.schema_name,
880 });
881 Ok(ExecuteResponse::CreatedSchema)
882 }
883 Err(err) => Err(err),
884 }
885 }
886
887 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
889 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
890 if attributes.superuser.is_some()
891 || attributes.password.is_some()
892 || attributes.login.is_some()
893 {
894 return Err(AdapterError::UnavailableFeature {
895 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
896 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
897 });
898 }
899 }
900 Ok(())
901 }
902
903 #[instrument]
904 pub(super) async fn sequence_create_role(
905 &mut self,
906 conn_id: Option<&ConnectionId>,
907 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
908 ) -> Result<ExecuteResponse, AdapterError> {
909 self.validate_role_attributes(&attributes.clone())?;
910 let op = catalog::Op::CreateRole { name, attributes };
911 self.catalog_transact_with_context(conn_id, None, vec![op])
912 .await
913 .map(|_| ExecuteResponse::CreatedRole)
914 }
915
916 #[instrument]
917 pub(super) async fn sequence_create_network_policy(
918 &mut self,
919 session: &Session,
920 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
921 ) -> Result<ExecuteResponse, AdapterError> {
922 let op = catalog::Op::CreateNetworkPolicy {
923 rules,
924 name,
925 owner_id: *session.current_role_id(),
926 };
927 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
928 .await
929 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
930 }
931
932 #[instrument]
933 pub(super) async fn sequence_alter_network_policy(
934 &mut self,
935 session: &Session,
936 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
937 ) -> Result<ExecuteResponse, AdapterError> {
938 let current_network_policy_name =
940 self.catalog().system_config().default_network_policy_name();
941 if current_network_policy_name == name {
943 self.validate_alter_network_policy(session, &rules)?;
944 }
945
946 let op = catalog::Op::AlterNetworkPolicy {
947 id,
948 rules,
949 name,
950 owner_id: *session.current_role_id(),
951 };
952 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
953 .await
954 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
955 }
956
957 #[instrument]
958 pub(super) async fn sequence_create_table(
959 &mut self,
960 ctx: &mut ExecuteContext,
961 plan: plan::CreateTablePlan,
962 resolved_ids: ResolvedIds,
963 ) -> Result<ExecuteResponse, AdapterError> {
964 let plan::CreateTablePlan {
965 name,
966 table,
967 if_not_exists,
968 } = plan;
969
970 let conn_id = if table.temporary {
971 Some(ctx.session().conn_id())
972 } else {
973 None
974 };
975 let id_ts = self.get_catalog_write_ts().await;
976 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
977 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
978
979 let data_source = match table.data_source {
980 plan::TableDataSource::TableWrites { defaults } => {
981 TableDataSource::TableWrites { defaults }
982 }
983 plan::TableDataSource::DataSource {
984 desc: data_source_plan,
985 timeline,
986 } => match data_source_plan {
987 plan::DataSourceDesc::IngestionExport {
988 ingestion_id,
989 external_reference,
990 details,
991 data_config,
992 } => TableDataSource::DataSource {
993 desc: DataSourceDesc::IngestionExport {
994 ingestion_id,
995 external_reference,
996 details,
997 data_config,
998 },
999 timeline,
1000 },
1001 plan::DataSourceDesc::Webhook {
1002 validate_using,
1003 body_format,
1004 headers,
1005 cluster_id,
1006 } => TableDataSource::DataSource {
1007 desc: DataSourceDesc::Webhook {
1008 validate_using,
1009 body_format,
1010 headers,
1011 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1012 },
1013 timeline,
1014 },
1015 o => {
1016 unreachable!("CREATE TABLE data source got {:?}", o)
1017 }
1018 },
1019 };
1020
1021 let is_webhook = if let TableDataSource::DataSource {
1022 desc: DataSourceDesc::Webhook { .. },
1023 timeline: _,
1024 } = &data_source
1025 {
1026 true
1027 } else {
1028 false
1029 };
1030
1031 let table = Table {
1032 create_sql: Some(table.create_sql),
1033 desc: table.desc,
1034 collections,
1035 conn_id: conn_id.cloned(),
1036 resolved_ids,
1037 custom_logical_compaction_window: table.compaction_window,
1038 is_retained_metrics_object: false,
1039 data_source,
1040 };
1041 let ops = vec![catalog::Op::CreateItem {
1042 id: table_id,
1043 name: name.clone(),
1044 item: CatalogItem::Table(table.clone()),
1045 owner_id: *ctx.session().current_role_id(),
1046 }];
1047
1048 let catalog_result = self
1049 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1050 .await;
1051
1052 if is_webhook {
1053 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1056 ctx.session()
1057 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1058 }
1059 }
1060
1061 match catalog_result {
1062 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1063 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1064 kind:
1065 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1066 })) if if_not_exists => {
1067 ctx.session_mut()
1068 .add_notice(AdapterNotice::ObjectAlreadyExists {
1069 name: name.item,
1070 ty: "table",
1071 });
1072 Ok(ExecuteResponse::CreatedTable)
1073 }
1074 Err(err) => Err(err),
1075 }
1076 }
1077
1078 #[instrument]
1079 pub(super) async fn sequence_create_sink(
1080 &mut self,
1081 ctx: ExecuteContext,
1082 plan: plan::CreateSinkPlan,
1083 resolved_ids: ResolvedIds,
1084 ) {
1085 let plan::CreateSinkPlan {
1086 name,
1087 sink,
1088 with_snapshot,
1089 if_not_exists,
1090 in_cluster,
1091 } = plan;
1092
1093 let id_ts = self.get_catalog_write_ts().await;
1095 let (item_id, global_id) =
1096 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1097
1098 let catalog_sink = Sink {
1099 create_sql: sink.create_sql,
1100 global_id,
1101 from: sink.from,
1102 connection: sink.connection,
1103 envelope: sink.envelope,
1104 version: sink.version,
1105 with_snapshot,
1106 resolved_ids,
1107 cluster_id: in_cluster,
1108 };
1109
1110 let ops = vec![catalog::Op::CreateItem {
1111 id: item_id,
1112 name: name.clone(),
1113 item: CatalogItem::Sink(catalog_sink.clone()),
1114 owner_id: *ctx.session().current_role_id(),
1115 }];
1116
1117 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1118
1119 match result {
1120 Ok(()) => {}
1121 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1122 kind:
1123 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1124 })) if if_not_exists => {
1125 ctx.session()
1126 .add_notice(AdapterNotice::ObjectAlreadyExists {
1127 name: name.item,
1128 ty: "sink",
1129 });
1130 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1131 return;
1132 }
1133 Err(e) => {
1134 ctx.retire(Err(e));
1135 return;
1136 }
1137 };
1138
1139 self.create_storage_export(global_id, &catalog_sink)
1140 .await
1141 .unwrap_or_terminate("cannot fail to create exports");
1142
1143 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1144 .await;
1145
1146 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1147 }
1148
1149 pub(super) fn validate_system_column_references(
1172 &self,
1173 uses_ambiguous_columns: bool,
1174 depends_on: &BTreeSet<GlobalId>,
1175 ) -> Result<(), AdapterError> {
1176 if uses_ambiguous_columns
1177 && depends_on
1178 .iter()
1179 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1180 {
1181 Err(AdapterError::AmbiguousSystemColumnReference)
1182 } else {
1183 Ok(())
1184 }
1185 }
1186
1187 #[instrument]
1188 pub(super) async fn sequence_create_type(
1189 &mut self,
1190 session: &Session,
1191 plan: plan::CreateTypePlan,
1192 resolved_ids: ResolvedIds,
1193 ) -> Result<ExecuteResponse, AdapterError> {
1194 let id_ts = self.get_catalog_write_ts().await;
1195 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1196 plan.typ
1198 .inner
1199 .desc(&self.catalog().for_session(session))
1200 .map_err(AdapterError::from)?;
1201 let typ = Type {
1202 create_sql: Some(plan.typ.create_sql),
1203 global_id,
1204 details: CatalogTypeDetails {
1205 array_id: None,
1206 typ: plan.typ.inner,
1207 pg_metadata: None,
1208 },
1209 resolved_ids,
1210 };
1211 let op = catalog::Op::CreateItem {
1212 id: item_id,
1213 name: plan.name,
1214 item: CatalogItem::Type(typ),
1215 owner_id: *session.current_role_id(),
1216 };
1217 match self.catalog_transact(Some(session), vec![op]).await {
1218 Ok(()) => Ok(ExecuteResponse::CreatedType),
1219 Err(err) => Err(err),
1220 }
1221 }
1222
1223 #[instrument]
1224 pub(super) async fn sequence_comment_on(
1225 &mut self,
1226 session: &Session,
1227 plan: plan::CommentPlan,
1228 ) -> Result<ExecuteResponse, AdapterError> {
1229 let op = catalog::Op::Comment {
1230 object_id: plan.object_id,
1231 sub_component: plan.sub_component,
1232 comment: plan.comment,
1233 };
1234 self.catalog_transact(Some(session), vec![op]).await?;
1235 Ok(ExecuteResponse::Comment)
1236 }
1237
1238 #[instrument]
1239 pub(super) async fn sequence_drop_objects(
1240 &mut self,
1241 ctx: &mut ExecuteContext,
1242 plan::DropObjectsPlan {
1243 drop_ids,
1244 object_type,
1245 referenced_ids,
1246 }: plan::DropObjectsPlan,
1247 ) -> Result<ExecuteResponse, AdapterError> {
1248 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1249 let mut objects = Vec::new();
1250 for obj_id in &drop_ids {
1251 if !referenced_ids_hashset.contains(obj_id) {
1252 let object_info = ErrorMessageObjectDescription::from_id(
1253 obj_id,
1254 &self.catalog().for_session(ctx.session()),
1255 )
1256 .to_string();
1257 objects.push(object_info);
1258 }
1259 }
1260
1261 if !objects.is_empty() {
1262 ctx.session()
1263 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1264 }
1265
1266 let DropOps {
1267 ops,
1268 dropped_active_db,
1269 dropped_active_cluster,
1270 dropped_in_use_indexes,
1271 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1272
1273 self.catalog_transact_with_context(None, Some(ctx), ops)
1274 .await?;
1275
1276 fail::fail_point!("after_sequencer_drop_replica");
1277
1278 if dropped_active_db {
1279 ctx.session()
1280 .add_notice(AdapterNotice::DroppedActiveDatabase {
1281 name: ctx.session().vars().database().to_string(),
1282 });
1283 }
1284 if dropped_active_cluster {
1285 ctx.session()
1286 .add_notice(AdapterNotice::DroppedActiveCluster {
1287 name: ctx.session().vars().cluster().to_string(),
1288 });
1289 }
1290 for dropped_in_use_index in dropped_in_use_indexes {
1291 ctx.session()
1292 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1293 self.metrics
1294 .optimization_notices
1295 .with_label_values(&["DroppedInUseIndex"])
1296 .inc_by(1);
1297 }
1298 Ok(ExecuteResponse::DroppedObject(object_type))
1299 }
1300
1301 fn validate_dropped_role_ownership(
1302 &self,
1303 session: &Session,
1304 dropped_roles: &BTreeMap<RoleId, &str>,
1305 ) -> Result<(), AdapterError> {
1306 fn privilege_check(
1307 privileges: &PrivilegeMap,
1308 dropped_roles: &BTreeMap<RoleId, &str>,
1309 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1310 object_id: &SystemObjectId,
1311 catalog: &ConnCatalog,
1312 ) {
1313 for privilege in privileges.all_values() {
1314 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1315 let grantor_name = catalog.get_role(&privilege.grantor).name();
1316 let object_description =
1317 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1318 dependent_objects
1319 .entry(role_name.to_string())
1320 .or_default()
1321 .push(format!(
1322 "privileges on {object_description} granted by {grantor_name}",
1323 ));
1324 }
1325 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1326 let grantee_name = catalog.get_role(&privilege.grantee).name();
1327 let object_description =
1328 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1329 dependent_objects
1330 .entry(role_name.to_string())
1331 .or_default()
1332 .push(format!(
1333 "privileges granted on {object_description} to {grantee_name}"
1334 ));
1335 }
1336 }
1337 }
1338
1339 let catalog = self.catalog().for_session(session);
1340 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1341 for entry in self.catalog.entries() {
1342 let id = SystemObjectId::Object(entry.id().into());
1343 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1344 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1345 dependent_objects
1346 .entry(role_name.to_string())
1347 .or_default()
1348 .push(format!("owner of {object_description}"));
1349 }
1350 privilege_check(
1351 entry.privileges(),
1352 dropped_roles,
1353 &mut dependent_objects,
1354 &id,
1355 &catalog,
1356 );
1357 }
1358 for database in self.catalog.databases() {
1359 let database_id = SystemObjectId::Object(database.id().into());
1360 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1361 let object_description =
1362 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1363 dependent_objects
1364 .entry(role_name.to_string())
1365 .or_default()
1366 .push(format!("owner of {object_description}"));
1367 }
1368 privilege_check(
1369 &database.privileges,
1370 dropped_roles,
1371 &mut dependent_objects,
1372 &database_id,
1373 &catalog,
1374 );
1375 for schema in database.schemas_by_id.values() {
1376 let schema_id = SystemObjectId::Object(
1377 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1378 );
1379 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1380 let object_description =
1381 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1382 dependent_objects
1383 .entry(role_name.to_string())
1384 .or_default()
1385 .push(format!("owner of {object_description}"));
1386 }
1387 privilege_check(
1388 &schema.privileges,
1389 dropped_roles,
1390 &mut dependent_objects,
1391 &schema_id,
1392 &catalog,
1393 );
1394 }
1395 }
1396 for cluster in self.catalog.clusters() {
1397 let cluster_id = SystemObjectId::Object(cluster.id().into());
1398 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1399 let object_description =
1400 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1401 dependent_objects
1402 .entry(role_name.to_string())
1403 .or_default()
1404 .push(format!("owner of {object_description}"));
1405 }
1406 privilege_check(
1407 &cluster.privileges,
1408 dropped_roles,
1409 &mut dependent_objects,
1410 &cluster_id,
1411 &catalog,
1412 );
1413 for replica in cluster.replicas() {
1414 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1415 let replica_id =
1416 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1417 let object_description =
1418 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1419 dependent_objects
1420 .entry(role_name.to_string())
1421 .or_default()
1422 .push(format!("owner of {object_description}"));
1423 }
1424 }
1425 }
1426 privilege_check(
1427 self.catalog().system_privileges(),
1428 dropped_roles,
1429 &mut dependent_objects,
1430 &SystemObjectId::System,
1431 &catalog,
1432 );
1433 for (default_privilege_object, default_privilege_acl_items) in
1434 self.catalog.default_privileges()
1435 {
1436 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1437 dependent_objects
1438 .entry(role_name.to_string())
1439 .or_default()
1440 .push(format!(
1441 "default privileges on {}S created by {}",
1442 default_privilege_object.object_type, role_name
1443 ));
1444 }
1445 for default_privilege_acl_item in default_privilege_acl_items {
1446 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1447 dependent_objects
1448 .entry(role_name.to_string())
1449 .or_default()
1450 .push(format!(
1451 "default privileges on {}S granted to {}",
1452 default_privilege_object.object_type, role_name
1453 ));
1454 }
1455 }
1456 }
1457
1458 if !dependent_objects.is_empty() {
1459 Err(AdapterError::DependentObject(dependent_objects))
1460 } else {
1461 Ok(())
1462 }
1463 }
1464
1465 #[instrument]
1466 pub(super) async fn sequence_drop_owned(
1467 &mut self,
1468 session: &Session,
1469 plan: plan::DropOwnedPlan,
1470 ) -> Result<ExecuteResponse, AdapterError> {
1471 for role_id in &plan.role_ids {
1472 self.catalog().ensure_not_reserved_role(role_id)?;
1473 }
1474
1475 let mut privilege_revokes = plan.privilege_revokes;
1476
1477 let session_catalog = self.catalog().for_session(session);
1479 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1480 && !session.is_superuser()
1481 {
1482 let role_membership =
1484 session_catalog.collect_role_membership(session.current_role_id());
1485 let invalid_revokes: BTreeSet<_> = privilege_revokes
1486 .extract_if(.., |(_, privilege)| {
1487 !role_membership.contains(&privilege.grantor)
1488 })
1489 .map(|(object_id, _)| object_id)
1490 .collect();
1491 for invalid_revoke in invalid_revokes {
1492 let object_description =
1493 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1494 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1495 }
1496 }
1497
1498 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1499 catalog::Op::UpdatePrivilege {
1500 target_id: object_id,
1501 privilege,
1502 variant: UpdatePrivilegeVariant::Revoke,
1503 }
1504 });
1505 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1506 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1507 privilege_object,
1508 privilege_acl_item,
1509 variant: UpdatePrivilegeVariant::Revoke,
1510 },
1511 );
1512 let DropOps {
1513 ops: drop_ops,
1514 dropped_active_db,
1515 dropped_active_cluster,
1516 dropped_in_use_indexes,
1517 } = self.sequence_drop_common(session, plan.drop_ids)?;
1518
1519 let ops = privilege_revoke_ops
1520 .chain(default_privilege_revoke_ops)
1521 .chain(drop_ops.into_iter())
1522 .collect();
1523
1524 self.catalog_transact(Some(session), ops).await?;
1525
1526 if dropped_active_db {
1527 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1528 name: session.vars().database().to_string(),
1529 });
1530 }
1531 if dropped_active_cluster {
1532 session.add_notice(AdapterNotice::DroppedActiveCluster {
1533 name: session.vars().cluster().to_string(),
1534 });
1535 }
1536 for dropped_in_use_index in dropped_in_use_indexes {
1537 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1538 }
1539 Ok(ExecuteResponse::DroppedOwned)
1540 }
1541
1542 fn sequence_drop_common(
1543 &self,
1544 session: &Session,
1545 ids: Vec<ObjectId>,
1546 ) -> Result<DropOps, AdapterError> {
1547 let mut dropped_active_db = false;
1548 let mut dropped_active_cluster = false;
1549 let mut dropped_in_use_indexes = Vec::new();
1550 let mut dropped_roles = BTreeMap::new();
1551 let mut dropped_databases = BTreeSet::new();
1552 let mut dropped_schemas = BTreeSet::new();
1553 let mut role_revokes = BTreeSet::new();
1557 let mut default_privilege_revokes = BTreeSet::new();
1560
1561 let mut clusters_to_drop = BTreeSet::new();
1563
1564 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1565 for id in &ids {
1566 match id {
1567 ObjectId::Database(id) => {
1568 let name = self.catalog().get_database(id).name();
1569 if name == session.vars().database() {
1570 dropped_active_db = true;
1571 }
1572 dropped_databases.insert(id);
1573 }
1574 ObjectId::Schema((_, spec)) => {
1575 if let SchemaSpecifier::Id(id) = spec {
1576 dropped_schemas.insert(id);
1577 }
1578 }
1579 ObjectId::Cluster(id) => {
1580 clusters_to_drop.insert(*id);
1581 if let Some(active_id) = self
1582 .catalog()
1583 .active_cluster(session)
1584 .ok()
1585 .map(|cluster| cluster.id())
1586 {
1587 if id == &active_id {
1588 dropped_active_cluster = true;
1589 }
1590 }
1591 }
1592 ObjectId::Role(id) => {
1593 let role = self.catalog().get_role(id);
1594 let name = role.name();
1595 dropped_roles.insert(*id, name);
1596 for (group_id, grantor_id) in &role.membership.map {
1598 role_revokes.insert((*group_id, *id, *grantor_id));
1599 }
1600 }
1601 ObjectId::Item(id) => {
1602 if let Some(index) = self.catalog().get_entry(id).index() {
1603 let humanizer = self.catalog().for_session(session);
1604 let dependants = self
1605 .controller
1606 .compute
1607 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1608 .ok()
1609 .into_iter()
1610 .flatten()
1611 .filter(|dependant_id| {
1612 if dependant_id.is_transient() {
1619 return false;
1620 }
1621 let Some(dependent_id) = humanizer
1623 .try_get_item_by_global_id(dependant_id)
1624 .map(|item| item.id())
1625 else {
1626 return false;
1627 };
1628 !ids_set.contains(&ObjectId::Item(dependent_id))
1631 })
1632 .flat_map(|dependant_id| {
1633 humanizer.humanize_id(dependant_id)
1637 })
1638 .collect_vec();
1639 if !dependants.is_empty() {
1640 dropped_in_use_indexes.push(DroppedInUseIndex {
1641 index_name: humanizer
1642 .humanize_id(index.global_id())
1643 .unwrap_or_else(|| id.to_string()),
1644 dependant_objects: dependants,
1645 });
1646 }
1647 }
1648 }
1649 _ => {}
1650 }
1651 }
1652
1653 for id in &ids {
1654 match id {
1655 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1659 if !clusters_to_drop.contains(cluster_id) {
1660 let cluster = self.catalog.get_cluster(*cluster_id);
1661 if cluster.is_managed() {
1662 let replica =
1663 cluster.replica(*replica_id).expect("Catalog out of sync");
1664 if !replica.config.location.internal() {
1665 coord_bail!("cannot drop replica of managed cluster");
1666 }
1667 }
1668 }
1669 }
1670 _ => {}
1671 }
1672 }
1673
1674 for role_id in dropped_roles.keys() {
1675 self.catalog().ensure_not_reserved_role(role_id)?;
1676 }
1677 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1678 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1680 for role in self.catalog().user_roles() {
1681 for dropped_role_id in
1682 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1683 {
1684 role_revokes.insert((
1685 **dropped_role_id,
1686 role.id(),
1687 *role
1688 .membership
1689 .map
1690 .get(*dropped_role_id)
1691 .expect("included in keys above"),
1692 ));
1693 }
1694 }
1695
1696 for (default_privilege_object, default_privilege_acls) in
1697 self.catalog().default_privileges()
1698 {
1699 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1700 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1701 {
1702 for default_privilege_acl in default_privilege_acls {
1703 default_privilege_revokes.insert((
1704 default_privilege_object.clone(),
1705 default_privilege_acl.clone(),
1706 ));
1707 }
1708 }
1709 }
1710
1711 let ops = role_revokes
1712 .into_iter()
1713 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1714 role_id,
1715 member_id,
1716 grantor_id,
1717 })
1718 .chain(default_privilege_revokes.into_iter().map(
1719 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1720 privilege_object,
1721 privilege_acl_item,
1722 variant: UpdatePrivilegeVariant::Revoke,
1723 },
1724 ))
1725 .chain(iter::once(catalog::Op::DropObjects(
1726 ids.into_iter()
1727 .map(DropObjectInfo::manual_drop_from_object_id)
1728 .collect(),
1729 )))
1730 .collect();
1731
1732 Ok(DropOps {
1733 ops,
1734 dropped_active_db,
1735 dropped_active_cluster,
1736 dropped_in_use_indexes,
1737 })
1738 }
1739
1740 pub(super) fn sequence_explain_schema(
1741 &self,
1742 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1743 ) -> Result<ExecuteResponse, AdapterError> {
1744 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1745 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1746 })?;
1747
1748 let json_string = json_string(&json_value);
1749 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1750 Ok(Self::send_immediate_rows(row))
1751 }
1752
1753 pub(super) fn sequence_show_all_variables(
1754 &self,
1755 session: &Session,
1756 ) -> Result<ExecuteResponse, AdapterError> {
1757 let mut rows = viewable_variables(self.catalog().state(), session)
1758 .map(|v| (v.name(), v.value(), v.description()))
1759 .collect::<Vec<_>>();
1760 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1761
1762 let rows: Vec<_> = rows
1764 .into_iter()
1765 .map(|(name, val, desc)| {
1766 Row::pack_slice(&[
1767 Datum::String(name),
1768 Datum::String(&val),
1769 Datum::String(desc),
1770 ])
1771 })
1772 .collect();
1773 Ok(Self::send_immediate_rows(rows))
1774 }
1775
1776 pub(super) fn sequence_show_variable(
1777 &self,
1778 session: &Session,
1779 plan: plan::ShowVariablePlan,
1780 ) -> Result<ExecuteResponse, AdapterError> {
1781 if &plan.name == SCHEMA_ALIAS {
1782 let schemas = self.catalog.resolve_search_path(session);
1783 let schema = schemas.first();
1784 return match schema {
1785 Some((database_spec, schema_spec)) => {
1786 let schema_name = &self
1787 .catalog
1788 .get_schema(database_spec, schema_spec, session.conn_id())
1789 .name()
1790 .schema;
1791 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1792 Ok(Self::send_immediate_rows(row))
1793 }
1794 None => {
1795 if session.vars().current_object_missing_warnings() {
1796 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1797 search_path: session
1798 .vars()
1799 .search_path()
1800 .into_iter()
1801 .map(|schema| schema.to_string())
1802 .collect(),
1803 });
1804 }
1805 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1806 }
1807 };
1808 }
1809
1810 let variable = session
1811 .vars()
1812 .get(self.catalog().system_config(), &plan.name)
1813 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1814
1815 variable.visible(session.user(), self.catalog().system_config())?;
1818
1819 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1820 if variable.name() == vars::DATABASE.name()
1821 && matches!(
1822 self.catalog().resolve_database(&variable.value()),
1823 Err(CatalogError::UnknownDatabase(_))
1824 )
1825 && session.vars().current_object_missing_warnings()
1826 {
1827 let name = variable.value();
1828 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1829 } else if variable.name() == vars::CLUSTER.name()
1830 && matches!(
1831 self.catalog().resolve_cluster(&variable.value()),
1832 Err(CatalogError::UnknownCluster(_))
1833 )
1834 && session.vars().current_object_missing_warnings()
1835 {
1836 let name = variable.value();
1837 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1838 }
1839 Ok(Self::send_immediate_rows(row))
1840 }
1841
1842 #[instrument]
1843 pub(super) async fn sequence_inspect_shard(
1844 &self,
1845 session: &Session,
1846 plan: plan::InspectShardPlan,
1847 ) -> Result<ExecuteResponse, AdapterError> {
1848 if !session.user().is_internal() {
1851 return Err(AdapterError::Unauthorized(
1852 rbac::UnauthorizedError::MzSystem {
1853 action: "inspect".into(),
1854 },
1855 ));
1856 }
1857 let state = self
1858 .controller
1859 .storage
1860 .inspect_persist_state(plan.id)
1861 .await?;
1862 let jsonb = Jsonb::from_serde_json(state)?;
1863 Ok(Self::send_immediate_rows(jsonb.into_row()))
1864 }
1865
1866 #[instrument]
1867 pub(super) fn sequence_set_variable(
1868 &self,
1869 session: &mut Session,
1870 plan: plan::SetVariablePlan,
1871 ) -> Result<ExecuteResponse, AdapterError> {
1872 let (name, local) = (plan.name, plan.local);
1873 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1874 self.validate_set_isolation_level(session)?;
1875 }
1876 if &name == vars::CLUSTER.name() {
1877 self.validate_set_cluster(session)?;
1878 }
1879
1880 let vars = session.vars_mut();
1881 let values = match plan.value {
1882 plan::VariableValue::Default => None,
1883 plan::VariableValue::Values(values) => Some(values),
1884 };
1885
1886 match values {
1887 Some(values) => {
1888 vars.set(
1889 self.catalog().system_config(),
1890 &name,
1891 VarInput::SqlSet(&values),
1892 local,
1893 )?;
1894
1895 let vars = session.vars();
1896
1897 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1900 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1901 } else if name == vars::CLUSTER.name()
1902 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1903 {
1904 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1905 }
1906
1907 if name.as_str() == vars::DATABASE.name()
1909 && matches!(
1910 self.catalog().resolve_database(vars.database()),
1911 Err(CatalogError::UnknownDatabase(_))
1912 )
1913 && session.vars().current_object_missing_warnings()
1914 {
1915 let name = vars.database().to_string();
1916 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1917 } else if name.as_str() == vars::CLUSTER.name()
1918 && matches!(
1919 self.catalog().resolve_cluster(vars.cluster()),
1920 Err(CatalogError::UnknownCluster(_))
1921 )
1922 && session.vars().current_object_missing_warnings()
1923 {
1924 let name = vars.cluster().to_string();
1925 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1926 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1927 let v = values.into_first().to_lowercase();
1928 if v == IsolationLevel::ReadUncommitted.as_str()
1929 || v == IsolationLevel::ReadCommitted.as_str()
1930 || v == IsolationLevel::RepeatableRead.as_str()
1931 {
1932 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1933 isolation_level: v,
1934 });
1935 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1936 session.add_notice(AdapterNotice::StrongSessionSerializable);
1937 }
1938 }
1939 }
1940 None => vars.reset(self.catalog().system_config(), &name, local)?,
1941 }
1942
1943 Ok(ExecuteResponse::SetVariable { name, reset: false })
1944 }
1945
1946 pub(super) fn sequence_reset_variable(
1947 &self,
1948 session: &mut Session,
1949 plan: plan::ResetVariablePlan,
1950 ) -> Result<ExecuteResponse, AdapterError> {
1951 let name = plan.name;
1952 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1953 self.validate_set_isolation_level(session)?;
1954 }
1955 if &name == vars::CLUSTER.name() {
1956 self.validate_set_cluster(session)?;
1957 }
1958 session
1959 .vars_mut()
1960 .reset(self.catalog().system_config(), &name, false)?;
1961 Ok(ExecuteResponse::SetVariable { name, reset: true })
1962 }
1963
1964 pub(super) fn sequence_set_transaction(
1965 &self,
1966 session: &mut Session,
1967 plan: plan::SetTransactionPlan,
1968 ) -> Result<ExecuteResponse, AdapterError> {
1969 for mode in plan.modes {
1971 match mode {
1972 TransactionMode::AccessMode(_) => {
1973 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1974 }
1975 TransactionMode::IsolationLevel(isolation_level) => {
1976 self.validate_set_isolation_level(session)?;
1977
1978 session.vars_mut().set(
1979 self.catalog().system_config(),
1980 TRANSACTION_ISOLATION_VAR_NAME,
1981 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1982 plan.local,
1983 )?
1984 }
1985 }
1986 }
1987 Ok(ExecuteResponse::SetVariable {
1988 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1989 reset: false,
1990 })
1991 }
1992
1993 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1994 if session.transaction().contains_ops() {
1995 Err(AdapterError::InvalidSetIsolationLevel)
1996 } else {
1997 Ok(())
1998 }
1999 }
2000
2001 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2002 if session.transaction().contains_ops() {
2003 Err(AdapterError::InvalidSetCluster)
2004 } else {
2005 Ok(())
2006 }
2007 }
2008
2009 #[instrument]
2010 pub(super) async fn sequence_end_transaction(
2011 &mut self,
2012 mut ctx: ExecuteContext,
2013 mut action: EndTransactionAction,
2014 ) {
2015 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2017 (&action, ctx.session().transaction())
2018 {
2019 action = EndTransactionAction::Rollback;
2020 }
2021 let response = match action {
2022 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2023 params: BTreeMap::new(),
2024 }),
2025 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2026 params: BTreeMap::new(),
2027 }),
2028 };
2029
2030 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2031
2032 let (response, action) = match result {
2033 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2034 (response, action)
2035 }
2036 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2037 let validated_locks = match write_lock_guards {
2041 None => None,
2042 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2043 Ok(locks) => Some(locks),
2044 Err(missing) => {
2045 tracing::error!(?missing, "programming error, missing write locks");
2046 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2047 }
2048 },
2049 };
2050
2051 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2052 for WriteOp { id, rows } in writes {
2053 let total_rows = collected_writes.entry(id).or_default();
2054 total_rows.push(rows);
2055 }
2056
2057 self.submit_write(PendingWriteTxn::User {
2058 span: Span::current(),
2059 writes: collected_writes,
2060 write_locks: validated_locks,
2061 pending_txn: PendingTxn {
2062 ctx,
2063 response,
2064 action,
2065 },
2066 });
2067 return;
2068 }
2069 Ok((
2070 Some(TransactionOps::Peeks {
2071 determination,
2072 requires_linearization: RequireLinearization::Required,
2073 ..
2074 }),
2075 _,
2076 )) if ctx.session().vars().transaction_isolation()
2077 == &IsolationLevel::StrictSerializable =>
2078 {
2079 let conn_id = ctx.session().conn_id().clone();
2080 let pending_read_txn = PendingReadTxn {
2081 txn: PendingRead::Read {
2082 txn: PendingTxn {
2083 ctx,
2084 response,
2085 action,
2086 },
2087 },
2088 timestamp_context: determination.timestamp_context,
2089 created: Instant::now(),
2090 num_requeues: 0,
2091 otel_ctx: OpenTelemetryContext::obtain(),
2092 };
2093 self.strict_serializable_reads_tx
2094 .send((conn_id, pending_read_txn))
2095 .expect("sending to strict_serializable_reads_tx cannot fail");
2096 return;
2097 }
2098 Ok((
2099 Some(TransactionOps::Peeks {
2100 determination,
2101 requires_linearization: RequireLinearization::Required,
2102 ..
2103 }),
2104 _,
2105 )) if ctx.session().vars().transaction_isolation()
2106 == &IsolationLevel::StrongSessionSerializable =>
2107 {
2108 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2109 ctx.session_mut()
2110 .ensure_timestamp_oracle(timeline.clone())
2111 .apply_write(*ts);
2112 }
2113 (response, action)
2114 }
2115 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2116 self.internal_cmd_tx
2117 .send(Message::ExecuteSingleStatementTransaction {
2118 ctx,
2119 otel_ctx: OpenTelemetryContext::obtain(),
2120 stmt,
2121 params,
2122 })
2123 .expect("must send");
2124 return;
2125 }
2126 Ok((_, _)) => (response, action),
2127 Err(err) => (Err(err), EndTransactionAction::Rollback),
2128 };
2129 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2130 let response = response.map(|mut r| {
2132 r.extend_params(changed);
2133 ExecuteResponse::from(r)
2134 });
2135
2136 ctx.retire(response);
2137 }
2138
2139 #[instrument]
2140 async fn sequence_end_transaction_inner(
2141 &mut self,
2142 ctx: &mut ExecuteContext,
2143 action: EndTransactionAction,
2144 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2145 let txn = self.clear_transaction(ctx.session_mut()).await;
2146
2147 if let EndTransactionAction::Commit = action {
2148 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2149 match &mut ops {
2150 TransactionOps::Writes(writes) => {
2151 for WriteOp { id, .. } in &mut writes.iter() {
2152 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2154 AdapterError::Catalog(mz_catalog::memory::error::Error {
2155 kind: mz_catalog::memory::error::ErrorKind::Sql(
2156 CatalogError::UnknownItem(id.to_string()),
2157 ),
2158 })
2159 })?;
2160 }
2161
2162 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2164 }
2165 TransactionOps::DDL {
2166 ops,
2167 state: _,
2168 side_effects,
2169 revision,
2170 } => {
2171 if *revision != self.catalog().transient_revision() {
2173 return Err(AdapterError::DDLTransactionRace);
2174 }
2175 let ops = std::mem::take(ops);
2177 let side_effects = std::mem::take(side_effects);
2178 self.catalog_transact_with_side_effects(
2179 Some(ctx),
2180 ops,
2181 move |a, mut ctx| {
2182 Box::pin(async move {
2183 for side_effect in side_effects {
2184 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2185 }
2186 })
2187 },
2188 )
2189 .await?;
2190 }
2191 _ => (),
2192 }
2193 return Ok((Some(ops), write_lock_guards));
2194 }
2195 }
2196
2197 Ok((None, None))
2198 }
2199
2200 pub(super) async fn sequence_side_effecting_func(
2201 &mut self,
2202 ctx: ExecuteContext,
2203 plan: SideEffectingFunc,
2204 ) {
2205 match plan {
2206 SideEffectingFunc::PgCancelBackend { connection_id } => {
2207 if ctx.session().conn_id().unhandled() == connection_id {
2208 ctx.retire(Err(AdapterError::Canceled));
2212 return;
2213 }
2214
2215 let res = if let Some((id_handle, _conn_meta)) =
2216 self.active_conns.get_key_value(&connection_id)
2217 {
2218 self.handle_privileged_cancel(id_handle.clone()).await;
2220 Datum::True
2221 } else {
2222 Datum::False
2223 };
2224 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2225 }
2226 }
2227 }
2228
2229 pub(crate) async fn determine_real_time_recent_timestamp(
2233 &self,
2234 source_ids: impl Iterator<Item = GlobalId>,
2235 real_time_recency_timeout: Duration,
2236 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2237 {
2238 let item_ids = source_ids
2239 .map(|gid| {
2240 self.catalog
2241 .try_resolve_item_id(&gid)
2242 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2243 })
2244 .collect::<Result<Vec<_>, _>>()?;
2245
2246 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2252 if to_visit.is_empty() {
2255 return Ok(None);
2256 }
2257
2258 let mut timestamp_objects = BTreeSet::new();
2259
2260 while let Some(id) = to_visit.pop_front() {
2261 timestamp_objects.insert(id);
2262 to_visit.extend(
2263 self.catalog()
2264 .get_entry(&id)
2265 .uses()
2266 .into_iter()
2267 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2268 );
2269 }
2270 let timestamp_objects = timestamp_objects
2271 .into_iter()
2272 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2273 .collect();
2274
2275 let r = self
2276 .controller
2277 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2278 .await?;
2279
2280 Ok(Some(r))
2281 }
2282
2283 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2286 &self,
2287 session: &Session,
2288 source_ids: impl Iterator<Item = GlobalId>,
2289 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2290 {
2291 let vars = session.vars();
2292
2293 if vars.real_time_recency()
2294 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2295 && !session.contains_read_timestamp()
2296 {
2297 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2298 .await
2299 } else {
2300 Ok(None)
2301 }
2302 }
2303
2304 #[instrument]
2305 pub(super) async fn sequence_explain_plan(
2306 &mut self,
2307 ctx: ExecuteContext,
2308 plan: plan::ExplainPlanPlan,
2309 target_cluster: TargetCluster,
2310 ) {
2311 match &plan.explainee {
2312 plan::Explainee::Statement(stmt) => match stmt {
2313 plan::ExplaineeStatement::CreateView { .. } => {
2314 self.explain_create_view(ctx, plan).await;
2315 }
2316 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2317 self.explain_create_materialized_view(ctx, plan).await;
2318 }
2319 plan::ExplaineeStatement::CreateIndex { .. } => {
2320 self.explain_create_index(ctx, plan).await;
2321 }
2322 plan::ExplaineeStatement::Select { .. } => {
2323 self.explain_peek(ctx, plan, target_cluster).await;
2324 }
2325 },
2326 plan::Explainee::View(_) => {
2327 let result = self.explain_view(&ctx, plan);
2328 ctx.retire(result);
2329 }
2330 plan::Explainee::MaterializedView(_) => {
2331 let result = self.explain_materialized_view(&ctx, plan);
2332 ctx.retire(result);
2333 }
2334 plan::Explainee::Index(_) => {
2335 let result = self.explain_index(&ctx, plan);
2336 ctx.retire(result);
2337 }
2338 plan::Explainee::ReplanView(_) => {
2339 self.explain_replan_view(ctx, plan).await;
2340 }
2341 plan::Explainee::ReplanMaterializedView(_) => {
2342 self.explain_replan_materialized_view(ctx, plan).await;
2343 }
2344 plan::Explainee::ReplanIndex(_) => {
2345 self.explain_replan_index(ctx, plan).await;
2346 }
2347 };
2348 }
2349
2350 pub(super) async fn sequence_explain_pushdown(
2351 &mut self,
2352 ctx: ExecuteContext,
2353 plan: plan::ExplainPushdownPlan,
2354 target_cluster: TargetCluster,
2355 ) {
2356 match plan.explainee {
2357 Explainee::Statement(ExplaineeStatement::Select {
2358 broken: false,
2359 plan,
2360 desc: _,
2361 }) => {
2362 let stage = return_if_err!(
2363 self.peek_validate(
2364 ctx.session(),
2365 plan,
2366 target_cluster,
2367 None,
2368 ExplainContext::Pushdown,
2369 Some(ctx.session().vars().max_query_result_size()),
2370 ),
2371 ctx
2372 );
2373 self.sequence_staged(ctx, Span::current(), stage).await;
2374 }
2375 Explainee::MaterializedView(item_id) => {
2376 self.explain_pushdown_materialized_view(ctx, item_id).await;
2377 }
2378 _ => {
2379 ctx.retire(Err(AdapterError::Unsupported(
2380 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2381 )));
2382 }
2383 };
2384 }
2385
2386 async fn execute_explain_pushdown_with_read_holds(
2388 &self,
2389 ctx: ExecuteContext,
2390 as_of: Antichain<Timestamp>,
2391 mz_now: ResultSpec<'static>,
2392 read_holds: Option<ReadHolds<Timestamp>>,
2393 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2394 ) {
2395 let fut = self
2396 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2397 .await;
2398 task::spawn(|| "render explain pushdown", async move {
2399 let _read_holds = read_holds;
2401 let res = fut.await;
2402 ctx.retire(res);
2403 });
2404 }
2405
2406 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2408 &self,
2409 session: &Session,
2410 as_of: Antichain<Timestamp>,
2411 mz_now: ResultSpec<'static>,
2412 imports: I,
2413 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2414 super::explain_pushdown_future_inner(
2416 session,
2417 &self.catalog,
2418 &self.controller.storage_collections,
2419 as_of,
2420 mz_now,
2421 imports,
2422 )
2423 .await
2424 }
2425
2426 #[instrument]
2427 pub(super) async fn sequence_insert(
2428 &mut self,
2429 mut ctx: ExecuteContext,
2430 plan: plan::InsertPlan,
2431 ) {
2432 if !ctx.session_mut().transaction().allows_writes() {
2440 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2441 return;
2442 }
2443
2444 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2458 let expr = return_if_err!(
2461 plan.values
2462 .clone()
2463 .lower(self.catalog().system_config(), None),
2464 ctx
2465 );
2466 OptimizedMirRelationExpr(expr)
2467 } else {
2468 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2470
2471 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2473
2474 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2476 };
2477
2478 match optimized_mir.into_inner() {
2479 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2480 let catalog = self.owned_catalog();
2481 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2482 let result =
2483 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2484 ctx.retire(result);
2485 });
2486 }
2487 _ => {
2489 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2490 Some(table) => {
2491 let desc = table.relation_desc_latest().expect("table has a desc");
2493 desc.arity()
2494 }
2495 None => {
2496 ctx.retire(Err(AdapterError::Catalog(
2497 mz_catalog::memory::error::Error {
2498 kind: mz_catalog::memory::error::ErrorKind::Sql(
2499 CatalogError::UnknownItem(plan.id.to_string()),
2500 ),
2501 },
2502 )));
2503 return;
2504 }
2505 };
2506
2507 let finishing = RowSetFinishing {
2508 order_by: vec![],
2509 limit: None,
2510 offset: 0,
2511 project: (0..desc_arity).collect(),
2512 };
2513
2514 let read_then_write_plan = plan::ReadThenWritePlan {
2515 id: plan.id,
2516 selection: plan.values,
2517 finishing,
2518 assignments: BTreeMap::new(),
2519 kind: MutationKind::Insert,
2520 returning: plan.returning,
2521 };
2522
2523 self.sequence_read_then_write(ctx, read_then_write_plan)
2524 .await;
2525 }
2526 }
2527 }
2528
2529 #[instrument]
2534 pub(super) async fn sequence_read_then_write(
2535 &mut self,
2536 mut ctx: ExecuteContext,
2537 plan: plan::ReadThenWritePlan,
2538 ) {
2539 let mut source_ids: BTreeSet<_> = plan
2540 .selection
2541 .depends_on()
2542 .into_iter()
2543 .map(|gid| self.catalog().resolve_item_id(&gid))
2544 .collect();
2545 source_ids.insert(plan.id);
2546
2547 if ctx.session().transaction().write_locks().is_none() {
2549 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2551
2552 for id in &source_ids {
2554 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2555 write_locks.insert_lock(*id, lock);
2556 }
2557 }
2558
2559 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2561 Ok(locks) => locks,
2562 Err(missing) => {
2563 let role_metadata = ctx.session().role_metadata().clone();
2565 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2566 let plan = DeferredPlan {
2567 ctx,
2568 plan: Plan::ReadThenWrite(plan),
2569 validity: PlanValidity::new(
2570 self.catalog.transient_revision(),
2571 source_ids.clone(),
2572 None,
2573 None,
2574 role_metadata,
2575 ),
2576 requires_locks: source_ids,
2577 };
2578 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2579 }
2580 };
2581
2582 ctx.session_mut()
2583 .try_grant_write_locks(write_locks)
2584 .expect("session has already been granted write locks");
2585 }
2586
2587 let plan::ReadThenWritePlan {
2588 id,
2589 kind,
2590 selection,
2591 mut assignments,
2592 finishing,
2593 mut returning,
2594 } = plan;
2595
2596 let desc = match self.catalog().try_get_entry(&id) {
2598 Some(table) => {
2599 table
2601 .relation_desc_latest()
2602 .expect("table has a desc")
2603 .into_owned()
2604 }
2605 None => {
2606 ctx.retire(Err(AdapterError::Catalog(
2607 mz_catalog::memory::error::Error {
2608 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2609 id.to_string(),
2610 )),
2611 },
2612 )));
2613 return;
2614 }
2615 };
2616
2617 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2619 || assignments.values().any(|e| e.contains_temporal())
2620 || returning.iter().any(|e| e.contains_temporal());
2621 if contains_temporal {
2622 ctx.retire(Err(AdapterError::Unsupported(
2623 "calls to mz_now in write statements",
2624 )));
2625 return;
2626 }
2627
2628 fn validate_read_dependencies(
2636 catalog: &Catalog,
2637 id: &CatalogItemId,
2638 ) -> Result<(), AdapterError> {
2639 use CatalogItemType::*;
2640 use mz_catalog::memory::objects;
2641 let mut ids_to_check = Vec::new();
2642 let valid = match catalog.try_get_entry(id) {
2643 Some(entry) => {
2644 if let CatalogItem::View(objects::View { optimized_expr, .. })
2645 | CatalogItem::MaterializedView(objects::MaterializedView {
2646 optimized_expr,
2647 ..
2648 }) = entry.item()
2649 {
2650 if optimized_expr.contains_temporal() {
2651 return Err(AdapterError::Unsupported(
2652 "calls to mz_now in write statements",
2653 ));
2654 }
2655 }
2656 match entry.item().typ() {
2657 typ @ (Func | View | MaterializedView | ContinualTask) => {
2658 ids_to_check.extend(entry.uses());
2659 let valid_id = id.is_user() || matches!(typ, Func);
2660 valid_id
2661 }
2662 Source | Secret | Connection => false,
2663 Sink | Index => unreachable!(),
2665 Table => {
2666 if !id.is_user() {
2667 false
2669 } else {
2670 entry.source_export_details().is_none()
2672 }
2673 }
2674 Type => true,
2675 }
2676 }
2677 None => false,
2678 };
2679 if !valid {
2680 return Err(AdapterError::InvalidTableMutationSelection);
2681 }
2682 for id in ids_to_check {
2683 validate_read_dependencies(catalog, &id)?;
2684 }
2685 Ok(())
2686 }
2687
2688 for gid in selection.depends_on() {
2689 let item_id = self.catalog().resolve_item_id(&gid);
2690 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2691 ctx.retire(Err(err));
2692 return;
2693 }
2694 }
2695
2696 let (peek_tx, peek_rx) = oneshot::channel();
2697 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2698 let (tx, _, session, extra) = ctx.into_parts();
2699 let peek_ctx = ExecuteContext::from_parts(
2711 peek_client_tx,
2712 self.internal_cmd_tx.clone(),
2713 session,
2714 Default::default(),
2715 );
2716
2717 self.sequence_peek(
2718 peek_ctx,
2719 plan::SelectPlan {
2720 select: None,
2721 source: selection,
2722 when: QueryWhen::FreshestTableWrite,
2723 finishing,
2724 copy_to: None,
2725 },
2726 TargetCluster::Active,
2727 None,
2728 )
2729 .await;
2730
2731 let internal_cmd_tx = self.internal_cmd_tx.clone();
2732 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2733 let catalog = self.owned_catalog();
2734 let max_result_size = self.catalog().system_config().max_result_size();
2735
2736 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2737 let (peek_response, session) = match peek_rx.await {
2738 Ok(Response {
2739 result: Ok(resp),
2740 session,
2741 otel_ctx,
2742 }) => {
2743 otel_ctx.attach_as_parent();
2744 (resp, session)
2745 }
2746 Ok(Response {
2747 result: Err(e),
2748 session,
2749 otel_ctx,
2750 }) => {
2751 let ctx =
2752 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2753 otel_ctx.attach_as_parent();
2754 ctx.retire(Err(e));
2755 return;
2756 }
2757 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2759 };
2760 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2761 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2762
2763 if timeout_dur == Duration::ZERO {
2765 timeout_dur = Duration::MAX;
2766 }
2767
2768 let style = ExprPrepStyle::OneShot {
2769 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2771 catalog_state: catalog.state(),
2772 };
2773 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2774 return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
2775 }
2776
2777 let make_diffs =
2778 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2779 let arena = RowArena::new();
2780 let mut diffs = Vec::new();
2781 let mut datum_vec = mz_repr::DatumVec::new();
2782
2783 while let Some(row) = rows.next() {
2784 if !assignments.is_empty() {
2785 assert!(
2786 matches!(kind, MutationKind::Update),
2787 "only updates support assignments"
2788 );
2789 let mut datums = datum_vec.borrow_with(row);
2790 let mut updates = vec![];
2791 for (idx, expr) in &assignments {
2792 let updated = match expr.eval(&datums, &arena) {
2793 Ok(updated) => updated,
2794 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2795 };
2796 updates.push((*idx, updated));
2797 }
2798 for (idx, new_value) in updates {
2799 datums[idx] = new_value;
2800 }
2801 let updated = Row::pack_slice(&datums);
2802 diffs.push((updated, Diff::ONE));
2803 }
2804 match kind {
2805 MutationKind::Update | MutationKind::Delete => {
2809 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2810 }
2811 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2812 }
2813 }
2814
2815 let mut byte_size: u64 = 0;
2818 for (row, diff) in &diffs {
2819 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2820 if diff.is_positive() {
2821 for (idx, datum) in row.iter().enumerate() {
2822 desc.constraints_met(idx, &datum)?;
2823 }
2824 }
2825 }
2826 Ok((diffs, byte_size))
2827 };
2828
2829 let diffs = match peek_response {
2830 ExecuteResponse::SendingRowsStreaming {
2831 rows: mut rows_stream,
2832 ..
2833 } => {
2834 let mut byte_size: u64 = 0;
2835 let mut diffs = Vec::new();
2836 let result = loop {
2837 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2838 Ok(Some(res)) => match res {
2839 PeekResponseUnary::Rows(new_rows) => {
2840 match make_diffs(new_rows) {
2841 Ok((mut new_diffs, new_byte_size)) => {
2842 byte_size = byte_size.saturating_add(new_byte_size);
2843 if byte_size > max_result_size {
2844 break Err(AdapterError::ResultSize(format!(
2845 "result exceeds max size of {max_result_size}"
2846 )));
2847 }
2848 diffs.append(&mut new_diffs)
2849 }
2850 Err(e) => break Err(e),
2851 };
2852 }
2853 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2854 PeekResponseUnary::Error(e) => {
2855 break Err(AdapterError::Unstructured(anyhow!(e)));
2856 }
2857 },
2858 Ok(None) => break Ok(diffs),
2859 Err(_) => {
2860 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2865 conn_id: ctx.session().conn_id().clone(),
2866 });
2867 if let Err(e) = result {
2868 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2869 }
2870 break Err(AdapterError::StatementTimeout);
2871 }
2872 }
2873 };
2874
2875 result
2876 }
2877 ExecuteResponse::SendingRowsImmediate { rows } => {
2878 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2879 }
2880 resp => Err(AdapterError::Unstructured(anyhow!(
2881 "unexpected peek response: {resp:?}"
2882 ))),
2883 };
2884
2885 let mut returning_rows = Vec::new();
2886 let mut diff_err: Option<AdapterError> = None;
2887 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2888 let arena = RowArena::new();
2889 for (row, diff) in diffs {
2890 if !diff.is_positive() {
2891 continue;
2892 }
2893 let mut returning_row = Row::with_capacity(returning.len());
2894 let mut packer = returning_row.packer();
2895 for expr in &returning {
2896 let datums: Vec<_> = row.iter().collect();
2897 match expr.eval(&datums, &arena) {
2898 Ok(datum) => {
2899 packer.push(datum);
2900 }
2901 Err(err) => {
2902 diff_err = Some(err.into());
2903 break;
2904 }
2905 }
2906 }
2907 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2908 let diff = match NonZeroUsize::try_from(diff) {
2909 Ok(diff) => diff,
2910 Err(err) => {
2911 diff_err = Some(err.into());
2912 break;
2913 }
2914 };
2915 returning_rows.push((returning_row, diff));
2916 if diff_err.is_some() {
2917 break;
2918 }
2919 }
2920 }
2921 let diffs = if let Some(err) = diff_err {
2922 Err(err)
2923 } else {
2924 diffs
2925 };
2926
2927 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2930 if let Some(timestamp_context) = timestamp_context {
2939 let (tx, rx) = tokio::sync::oneshot::channel();
2940 let conn_id = ctx.session().conn_id().clone();
2941 let pending_read_txn = PendingReadTxn {
2942 txn: PendingRead::ReadThenWrite { ctx, tx },
2943 timestamp_context,
2944 created: Instant::now(),
2945 num_requeues: 0,
2946 otel_ctx: OpenTelemetryContext::obtain(),
2947 };
2948 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2949 if let Err(e) = result {
2951 warn!(
2952 "strict_serializable_reads_tx dropped before we could send: {:?}",
2953 e
2954 );
2955 return;
2956 }
2957 let result = rx.await;
2958 ctx = match result {
2960 Ok(Some(ctx)) => ctx,
2961 Ok(None) => {
2962 return;
2965 }
2966 Err(e) => {
2967 warn!(
2968 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2969 e
2970 );
2971 return;
2972 }
2973 };
2974 }
2975
2976 match diffs {
2977 Ok(diffs) => {
2978 let result = Self::send_diffs(
2979 ctx.session_mut(),
2980 plan::SendDiffsPlan {
2981 id,
2982 updates: diffs,
2983 kind,
2984 returning: returning_rows,
2985 max_result_size,
2986 },
2987 );
2988 ctx.retire(result);
2989 }
2990 Err(e) => {
2991 ctx.retire(Err(e));
2992 }
2993 }
2994 });
2995 }
2996
2997 #[instrument]
2998 pub(super) async fn sequence_alter_item_rename(
2999 &mut self,
3000 ctx: &mut ExecuteContext,
3001 plan: plan::AlterItemRenamePlan,
3002 ) -> Result<ExecuteResponse, AdapterError> {
3003 let op = catalog::Op::RenameItem {
3004 id: plan.id,
3005 current_full_name: plan.current_full_name,
3006 to_name: plan.to_name,
3007 };
3008 match self
3009 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3010 .await
3011 {
3012 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3013 Err(err) => Err(err),
3014 }
3015 }
3016
3017 #[instrument]
3018 pub(super) async fn sequence_alter_retain_history(
3019 &mut self,
3020 ctx: &mut ExecuteContext,
3021 plan: plan::AlterRetainHistoryPlan,
3022 ) -> Result<ExecuteResponse, AdapterError> {
3023 let ops = vec![catalog::Op::AlterRetainHistory {
3024 id: plan.id,
3025 value: plan.value,
3026 window: plan.window,
3027 }];
3028 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3029 Box::pin(async move {
3030 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3031 let cluster = match catalog_item {
3032 CatalogItem::Table(_)
3033 | CatalogItem::MaterializedView(_)
3034 | CatalogItem::Source(_)
3035 | CatalogItem::ContinualTask(_) => None,
3036 CatalogItem::Index(index) => Some(index.cluster_id),
3037 CatalogItem::Log(_)
3038 | CatalogItem::View(_)
3039 | CatalogItem::Sink(_)
3040 | CatalogItem::Type(_)
3041 | CatalogItem::Func(_)
3042 | CatalogItem::Secret(_)
3043 | CatalogItem::Connection(_) => unreachable!(),
3044 };
3045 match cluster {
3046 Some(cluster) => {
3047 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3048 }
3049 None => {
3050 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3051 }
3052 }
3053 })
3054 })
3055 .await?;
3056 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3057 }
3058
3059 #[instrument]
3060 pub(super) async fn sequence_alter_schema_rename(
3061 &mut self,
3062 ctx: &mut ExecuteContext,
3063 plan: plan::AlterSchemaRenamePlan,
3064 ) -> Result<ExecuteResponse, AdapterError> {
3065 let (database_spec, schema_spec) = plan.cur_schema_spec;
3066 let op = catalog::Op::RenameSchema {
3067 database_spec,
3068 schema_spec,
3069 new_name: plan.new_schema_name,
3070 check_reserved_names: true,
3071 };
3072 match self
3073 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3074 .await
3075 {
3076 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3077 Err(err) => Err(err),
3078 }
3079 }
3080
3081 #[instrument]
3082 pub(super) async fn sequence_alter_schema_swap(
3083 &mut self,
3084 ctx: &mut ExecuteContext,
3085 plan: plan::AlterSchemaSwapPlan,
3086 ) -> Result<ExecuteResponse, AdapterError> {
3087 let plan::AlterSchemaSwapPlan {
3088 schema_a_spec: (schema_a_db, schema_a),
3089 schema_a_name,
3090 schema_b_spec: (schema_b_db, schema_b),
3091 schema_b_name,
3092 name_temp,
3093 } = plan;
3094
3095 let op_a = catalog::Op::RenameSchema {
3096 database_spec: schema_a_db,
3097 schema_spec: schema_a,
3098 new_name: name_temp,
3099 check_reserved_names: false,
3100 };
3101 let op_b = catalog::Op::RenameSchema {
3102 database_spec: schema_b_db,
3103 schema_spec: schema_b,
3104 new_name: schema_a_name,
3105 check_reserved_names: false,
3106 };
3107 let op_c = catalog::Op::RenameSchema {
3108 database_spec: schema_a_db,
3109 schema_spec: schema_a,
3110 new_name: schema_b_name,
3111 check_reserved_names: false,
3112 };
3113
3114 match self
3115 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3116 Box::pin(async {})
3117 })
3118 .await
3119 {
3120 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3121 Err(err) => Err(err),
3122 }
3123 }
3124
3125 #[instrument]
3126 pub(super) async fn sequence_alter_role(
3127 &mut self,
3128 session: &Session,
3129 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3130 ) -> Result<ExecuteResponse, AdapterError> {
3131 let catalog = self.catalog().for_session(session);
3132 let role = catalog.get_role(&id);
3133
3134 let mut notices = vec![];
3136
3137 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3139 let mut vars = role.vars().clone();
3140
3141 let mut nopassword = false;
3144
3145 match option {
3147 PlannedAlterRoleOption::Attributes(attrs) => {
3148 self.validate_role_attributes(&attrs.clone().into())?;
3149
3150 if let Some(inherit) = attrs.inherit {
3151 attributes.inherit = inherit;
3152 }
3153
3154 if let Some(password) = attrs.password {
3155 attributes.password = Some(password);
3156 attributes.scram_iterations =
3157 Some(self.catalog().system_config().scram_iterations())
3158 }
3159
3160 if let Some(superuser) = attrs.superuser {
3161 attributes.superuser = Some(superuser);
3162 }
3163
3164 if let Some(login) = attrs.login {
3165 attributes.login = Some(login);
3166 }
3167
3168 if attrs.nopassword.unwrap_or(false) {
3169 nopassword = true;
3170 }
3171
3172 if let Some(notice) = self.should_emit_rbac_notice(session) {
3173 notices.push(notice);
3174 }
3175 }
3176 PlannedAlterRoleOption::Variable(variable) => {
3177 let session_var = session.vars().inspect(variable.name())?;
3179 session_var.visible(session.user(), catalog.system_vars())?;
3181
3182 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3185 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3186 } else if let PlannedRoleVariable::Set {
3187 name,
3188 value: VariableValue::Values(vals),
3189 } = &variable
3190 {
3191 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3192 notices.push(AdapterNotice::IntrospectionClusterUsage);
3193 }
3194 }
3195
3196 let var_name = match variable {
3197 PlannedRoleVariable::Set { name, value } => {
3198 match &value {
3200 VariableValue::Default => {
3201 vars.remove(&name);
3202 }
3203 VariableValue::Values(vals) => {
3204 let var = match &vals[..] {
3205 [val] => OwnedVarInput::Flat(val.clone()),
3206 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3207 };
3208 session_var.check(var.borrow())?;
3210
3211 vars.insert(name.clone(), var);
3212 }
3213 };
3214 name
3215 }
3216 PlannedRoleVariable::Reset { name } => {
3217 vars.remove(&name);
3219 name
3220 }
3221 };
3222
3223 notices.push(AdapterNotice::VarDefaultUpdated {
3225 role: Some(name.clone()),
3226 var_name: Some(var_name),
3227 });
3228 }
3229 }
3230
3231 let op = catalog::Op::AlterRole {
3232 id,
3233 name,
3234 attributes,
3235 nopassword,
3236 vars: RoleVars { map: vars },
3237 };
3238 let response = self
3239 .catalog_transact(Some(session), vec![op])
3240 .await
3241 .map(|_| ExecuteResponse::AlteredRole)?;
3242
3243 session.add_notices(notices);
3245
3246 Ok(response)
3247 }
3248
3249 #[instrument]
3250 pub(super) async fn sequence_alter_sink_prepare(
3251 &mut self,
3252 ctx: ExecuteContext,
3253 plan: plan::AlterSinkPlan,
3254 ) {
3255 let id_bundle = crate::CollectionIdBundle {
3257 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3258 compute_ids: BTreeMap::new(),
3259 };
3260 let read_hold = self.acquire_read_holds(&id_bundle);
3261
3262 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3263 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3264 return;
3265 };
3266
3267 let otel_ctx = OpenTelemetryContext::obtain();
3268 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3269
3270 let plan_validity = PlanValidity::new(
3271 self.catalog().transient_revision(),
3272 BTreeSet::from_iter([plan.item_id, from_item_id]),
3273 Some(plan.in_cluster),
3274 None,
3275 ctx.session().role_metadata().clone(),
3276 );
3277
3278 info!(
3279 "preparing alter sink for {}: frontiers={:?} export={:?}",
3280 plan.global_id,
3281 self.controller
3282 .storage_collections
3283 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3284 self.controller.storage.export(plan.global_id)
3285 );
3286
3287 self.install_storage_watch_set(
3293 ctx.session().conn_id().clone(),
3294 BTreeSet::from_iter([plan.global_id]),
3295 read_ts,
3296 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3297 ctx: Some(ctx),
3298 otel_ctx,
3299 plan,
3300 plan_validity,
3301 read_hold,
3302 }),
3303 );
3304 }
3305
3306 #[instrument]
3307 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3308 ctx.otel_ctx.attach_as_parent();
3309
3310 let plan::AlterSinkPlan {
3311 item_id,
3312 global_id,
3313 sink: sink_plan,
3314 with_snapshot,
3315 in_cluster,
3316 } = ctx.plan.clone();
3317
3318 match ctx.plan_validity.check(self.catalog()) {
3327 Ok(()) => {}
3328 Err(err) => {
3329 ctx.retire(Err(err));
3330 return;
3331 }
3332 }
3333
3334 let entry = self.catalog().get_entry(&item_id);
3335 let CatalogItem::Sink(old_sink) = entry.item() else {
3336 panic!("invalid item kind for `AlterSinkPlan`");
3337 };
3338
3339 if sink_plan.version != old_sink.version + 1 {
3340 ctx.retire(Err(AdapterError::ChangedPlan(
3341 "sink was altered concurrently".into(),
3342 )));
3343 return;
3344 }
3345
3346 info!(
3347 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3348 self.controller
3349 .storage_collections
3350 .collections_frontiers(vec![global_id, sink_plan.from]),
3351 self.controller.storage.export(global_id),
3352 );
3353
3354 let write_frontier = &self
3357 .controller
3358 .storage
3359 .export(global_id)
3360 .expect("sink known to exist")
3361 .write_frontier;
3362 let as_of = ctx.read_hold.least_valid_read();
3363 assert!(
3364 write_frontier.iter().all(|t| as_of.less_than(t)),
3365 "{:?} should be strictly less than {:?}",
3366 &*as_of,
3367 &**write_frontier
3368 );
3369
3370 let create_sql = &old_sink.create_sql;
3376 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3377 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3378 unreachable!("invalid statement kind for sink");
3379 };
3380
3381 stmt.with_options
3383 .retain(|o| o.name != CreateSinkOptionName::Version);
3384 stmt.with_options.push(CreateSinkOption {
3385 name: CreateSinkOptionName::Version,
3386 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3387 sink_plan.version.to_string(),
3388 ))),
3389 });
3390
3391 let conn_catalog = self.catalog().for_system_session();
3392 let (mut stmt, resolved_ids) =
3393 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3394
3395 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3397 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3398 stmt.from = ResolvedItemName::Item {
3399 id: from_entry.id(),
3400 qualifiers: from_entry.name.qualifiers.clone(),
3401 full_name,
3402 print_id: true,
3403 version: from_entry.version,
3404 };
3405
3406 let new_sink = Sink {
3407 create_sql: stmt.to_ast_string_stable(),
3408 global_id,
3409 from: sink_plan.from,
3410 connection: sink_plan.connection.clone(),
3411 envelope: sink_plan.envelope,
3412 version: sink_plan.version,
3413 with_snapshot,
3414 resolved_ids: resolved_ids.clone(),
3415 cluster_id: in_cluster,
3416 };
3417
3418 let ops = vec![catalog::Op::UpdateItem {
3419 id: item_id,
3420 name: entry.name().clone(),
3421 to_item: CatalogItem::Sink(new_sink),
3422 }];
3423
3424 match self
3425 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3426 .await
3427 {
3428 Ok(()) => {}
3429 Err(err) => {
3430 ctx.retire(Err(err));
3431 return;
3432 }
3433 }
3434
3435 let storage_sink_desc = StorageSinkDesc {
3436 from: sink_plan.from,
3437 from_desc: from_entry
3438 .relation_desc()
3439 .expect("sinks can only be built on items with descs")
3440 .into_owned(),
3441 connection: sink_plan
3442 .connection
3443 .clone()
3444 .into_inline_connection(self.catalog().state()),
3445 envelope: sink_plan.envelope,
3446 as_of,
3447 with_snapshot,
3448 version: sink_plan.version,
3449 from_storage_metadata: (),
3450 to_storage_metadata: (),
3451 };
3452
3453 self.controller
3454 .storage
3455 .alter_export(
3456 global_id,
3457 ExportDescription {
3458 sink: storage_sink_desc,
3459 instance_id: in_cluster,
3460 },
3461 )
3462 .await
3463 .unwrap_or_terminate("cannot fail to alter source desc");
3464
3465 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3466 }
3467
3468 #[instrument]
3469 pub(super) async fn sequence_alter_connection(
3470 &mut self,
3471 ctx: ExecuteContext,
3472 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3473 ) {
3474 match action {
3475 AlterConnectionAction::RotateKeys => {
3476 self.sequence_rotate_keys(ctx, id).await;
3477 }
3478 AlterConnectionAction::AlterOptions {
3479 set_options,
3480 drop_options,
3481 validate,
3482 } => {
3483 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3484 .await
3485 }
3486 }
3487 }
3488
3489 #[instrument]
3490 async fn sequence_alter_connection_options(
3491 &mut self,
3492 mut ctx: ExecuteContext,
3493 id: CatalogItemId,
3494 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3495 drop_options: BTreeSet<ConnectionOptionName>,
3496 validate: bool,
3497 ) {
3498 let cur_entry = self.catalog().get_entry(&id);
3499 let cur_conn = cur_entry.connection().expect("known to be connection");
3500 let connection_gid = cur_conn.global_id();
3501
3502 let inner = || -> Result<Connection, AdapterError> {
3503 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3505 .expect("invalid create sql persisted to catalog")
3506 .into_element()
3507 .ast
3508 {
3509 Statement::CreateConnection(stmt) => stmt,
3510 _ => unreachable!("proved type is source"),
3511 };
3512
3513 let catalog = self.catalog().for_system_session();
3514
3515 let (mut create_conn_stmt, resolved_ids) =
3517 mz_sql::names::resolve(&catalog, create_conn_stmt)
3518 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3519
3520 create_conn_stmt
3522 .values
3523 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3524
3525 create_conn_stmt.values.extend(
3527 set_options
3528 .into_iter()
3529 .map(|(name, value)| ConnectionOption { name, value }),
3530 );
3531
3532 let mut catalog = self.catalog().for_system_session();
3535 catalog.mark_id_unresolvable_for_replanning(id);
3536
3537 let plan = match mz_sql::plan::plan(
3539 None,
3540 &catalog,
3541 Statement::CreateConnection(create_conn_stmt),
3542 &Params::empty(),
3543 &resolved_ids,
3544 )
3545 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3546 {
3547 Plan::CreateConnection(plan) => plan,
3548 _ => unreachable!("create source plan is only valid response"),
3549 };
3550
3551 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3553 .expect("invalid create sql persisted to catalog")
3554 .into_element()
3555 .ast
3556 {
3557 Statement::CreateConnection(stmt) => stmt,
3558 _ => unreachable!("proved type is source"),
3559 };
3560
3561 let catalog = self.catalog().for_system_session();
3562
3563 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3565 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3566
3567 Ok(Connection {
3568 create_sql: plan.connection.create_sql,
3569 global_id: cur_conn.global_id,
3570 details: plan.connection.details,
3571 resolved_ids: new_deps,
3572 })
3573 };
3574
3575 let conn = match inner() {
3576 Ok(conn) => conn,
3577 Err(e) => {
3578 return ctx.retire(Err(e));
3579 }
3580 };
3581
3582 if validate {
3583 let connection = conn
3584 .details
3585 .to_connection()
3586 .into_inline_connection(self.catalog().state());
3587
3588 let internal_cmd_tx = self.internal_cmd_tx.clone();
3589 let transient_revision = self.catalog().transient_revision();
3590 let conn_id = ctx.session().conn_id().clone();
3591 let otel_ctx = OpenTelemetryContext::obtain();
3592 let role_metadata = ctx.session().role_metadata().clone();
3593 let current_storage_parameters = self.controller.storage.config().clone();
3594
3595 task::spawn(
3596 || format!("validate_alter_connection:{conn_id}"),
3597 async move {
3598 let resolved_ids = conn.resolved_ids.clone();
3599 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3600 let result = match connection.validate(id, ¤t_storage_parameters).await {
3601 Ok(()) => Ok(conn),
3602 Err(err) => Err(err.into()),
3603 };
3604
3605 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3607 AlterConnectionValidationReady {
3608 ctx,
3609 result,
3610 connection_id: id,
3611 connection_gid,
3612 plan_validity: PlanValidity::new(
3613 transient_revision,
3614 dependency_ids.clone(),
3615 None,
3616 None,
3617 role_metadata,
3618 ),
3619 otel_ctx,
3620 resolved_ids,
3621 },
3622 ));
3623 if let Err(e) = result {
3624 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3625 }
3626 },
3627 );
3628 } else {
3629 let result = self
3630 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3631 .await;
3632 ctx.retire(result);
3633 }
3634 }
3635
3636 #[instrument]
3637 pub(crate) async fn sequence_alter_connection_stage_finish(
3638 &mut self,
3639 session: &Session,
3640 id: CatalogItemId,
3641 connection: Connection,
3642 ) -> Result<ExecuteResponse, AdapterError> {
3643 match self.catalog.get_entry(&id).item() {
3644 CatalogItem::Connection(curr_conn) => {
3645 curr_conn
3646 .details
3647 .to_connection()
3648 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3649 .map_err(StorageError::from)?;
3650 }
3651 _ => unreachable!("known to be a connection"),
3652 };
3653
3654 let ops = vec![catalog::Op::UpdateItem {
3655 id,
3656 name: self.catalog.get_entry(&id).name().clone(),
3657 to_item: CatalogItem::Connection(connection.clone()),
3658 }];
3659
3660 self.catalog_transact(Some(session), ops).await?;
3661
3662 match connection.details {
3663 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3664 let spec = VpcEndpointConfig {
3665 aws_service_name: privatelink.service_name.to_owned(),
3666 availability_zone_ids: privatelink.availability_zones.to_owned(),
3667 };
3668 self.cloud_resource_controller
3669 .as_ref()
3670 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3671 .ensure_vpc_endpoint(id, spec)
3672 .await?;
3673 }
3674 _ => {}
3675 };
3676
3677 let entry = self.catalog().get_entry(&id);
3678
3679 let mut connections = VecDeque::new();
3680 connections.push_front(entry.id());
3681
3682 let mut source_connections = BTreeMap::new();
3683 let mut sink_connections = BTreeMap::new();
3684 let mut source_export_data_configs = BTreeMap::new();
3685
3686 while let Some(id) = connections.pop_front() {
3687 for id in self.catalog.get_entry(&id).used_by() {
3688 let entry = self.catalog.get_entry(id);
3689 match entry.item() {
3690 CatalogItem::Connection(_) => connections.push_back(*id),
3691 CatalogItem::Source(source) => {
3692 let desc = match &entry.source().expect("known to be source").data_source {
3693 DataSourceDesc::Ingestion { desc, .. }
3694 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3695 desc.clone().into_inline_connection(self.catalog().state())
3696 }
3697 _ => unreachable!("only ingestions reference connections"),
3698 };
3699
3700 source_connections.insert(source.global_id, desc.connection);
3701 }
3702 CatalogItem::Sink(sink) => {
3703 let export = entry.sink().expect("known to be sink");
3704 sink_connections.insert(
3705 sink.global_id,
3706 export
3707 .connection
3708 .clone()
3709 .into_inline_connection(self.catalog().state()),
3710 );
3711 }
3712 CatalogItem::Table(table) => {
3713 if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
3716 let data_config = export_data_config.clone();
3717 source_export_data_configs.insert(
3718 table.global_id_writes(),
3719 data_config.into_inline_connection(self.catalog().state()),
3720 );
3721 }
3722 }
3723 t => unreachable!("connection dependency not expected on {:?}", t),
3724 }
3725 }
3726 }
3727
3728 if !source_connections.is_empty() {
3729 self.controller
3730 .storage
3731 .alter_ingestion_connections(source_connections)
3732 .await
3733 .unwrap_or_terminate("cannot fail to alter ingestion connection");
3734 }
3735
3736 if !sink_connections.is_empty() {
3737 self.controller
3738 .storage
3739 .alter_export_connections(sink_connections)
3740 .await
3741 .unwrap_or_terminate("altering exports after txn must succeed");
3742 }
3743
3744 if !source_export_data_configs.is_empty() {
3745 self.controller
3746 .storage
3747 .alter_ingestion_export_data_configs(source_export_data_configs)
3748 .await
3749 .unwrap_or_terminate("altering source export data configs after txn must succeed");
3750 }
3751
3752 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3753 }
3754
3755 #[instrument]
3756 pub(super) async fn sequence_alter_source(
3757 &mut self,
3758 session: &Session,
3759 plan::AlterSourcePlan {
3760 item_id,
3761 ingestion_id,
3762 action,
3763 }: plan::AlterSourcePlan,
3764 ) -> Result<ExecuteResponse, AdapterError> {
3765 let cur_entry = self.catalog().get_entry(&item_id);
3766 let cur_source = cur_entry.source().expect("known to be source");
3767
3768 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3769 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3771 .expect("invalid create sql persisted to catalog")
3772 .into_element()
3773 .ast
3774 {
3775 Statement::CreateSource(stmt) => stmt,
3776 _ => unreachable!("proved type is source"),
3777 };
3778
3779 let catalog = coord.catalog().for_system_session();
3780
3781 mz_sql::names::resolve(&catalog, create_source_stmt)
3783 .map_err(|e| AdapterError::internal(err_cx, e))
3784 };
3785
3786 match action {
3787 plan::AlterSourceAction::AddSubsourceExports {
3788 subsources,
3789 options,
3790 } => {
3791 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3792
3793 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3794 text_columns: mut new_text_columns,
3795 exclude_columns: mut new_exclude_columns,
3796 ..
3797 } = options.try_into()?;
3798
3799 let (mut create_source_stmt, resolved_ids) =
3801 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3802
3803 let catalog = self.catalog();
3805 let curr_references: BTreeSet<_> = catalog
3806 .get_entry(&item_id)
3807 .used_by()
3808 .into_iter()
3809 .filter_map(|subsource| {
3810 catalog
3811 .get_entry(subsource)
3812 .subsource_details()
3813 .map(|(_id, reference, _details)| reference)
3814 })
3815 .collect();
3816
3817 let purification_err =
3820 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3821
3822 match &mut create_source_stmt.connection {
3826 CreateSourceConnection::Postgres {
3827 options: curr_options,
3828 ..
3829 } => {
3830 let mz_sql::plan::PgConfigOptionExtracted {
3831 mut text_columns, ..
3832 } = curr_options.clone().try_into()?;
3833
3834 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3837
3838 text_columns.retain(|column_qualified_reference| {
3840 mz_ore::soft_assert_eq_or_log!(
3841 column_qualified_reference.0.len(),
3842 4,
3843 "all TEXT COLUMNS values must be column-qualified references"
3844 );
3845 let mut table = column_qualified_reference.clone();
3846 table.0.truncate(3);
3847 curr_references.contains(&table)
3848 });
3849
3850 new_text_columns.extend(text_columns);
3852
3853 if !new_text_columns.is_empty() {
3855 new_text_columns.sort();
3856 let new_text_columns = new_text_columns
3857 .into_iter()
3858 .map(WithOptionValue::UnresolvedItemName)
3859 .collect();
3860
3861 curr_options.push(PgConfigOption {
3862 name: PgConfigOptionName::TextColumns,
3863 value: Some(WithOptionValue::Sequence(new_text_columns)),
3864 });
3865 }
3866 }
3867 CreateSourceConnection::MySql {
3868 options: curr_options,
3869 ..
3870 } => {
3871 let mz_sql::plan::MySqlConfigOptionExtracted {
3872 mut text_columns,
3873 mut exclude_columns,
3874 ..
3875 } = curr_options.clone().try_into()?;
3876
3877 curr_options.retain(|o| {
3880 !matches!(
3881 o.name,
3882 MySqlConfigOptionName::TextColumns
3883 | MySqlConfigOptionName::ExcludeColumns
3884 )
3885 });
3886
3887 let column_referenced =
3889 |column_qualified_reference: &UnresolvedItemName| {
3890 mz_ore::soft_assert_eq_or_log!(
3891 column_qualified_reference.0.len(),
3892 3,
3893 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3894 );
3895 let mut table = column_qualified_reference.clone();
3896 table.0.truncate(2);
3897 curr_references.contains(&table)
3898 };
3899 text_columns.retain(column_referenced);
3900 exclude_columns.retain(column_referenced);
3901
3902 new_text_columns.extend(text_columns);
3904 new_exclude_columns.extend(exclude_columns);
3905
3906 if !new_text_columns.is_empty() {
3908 new_text_columns.sort();
3909 let new_text_columns = new_text_columns
3910 .into_iter()
3911 .map(WithOptionValue::UnresolvedItemName)
3912 .collect();
3913
3914 curr_options.push(MySqlConfigOption {
3915 name: MySqlConfigOptionName::TextColumns,
3916 value: Some(WithOptionValue::Sequence(new_text_columns)),
3917 });
3918 }
3919 if !new_exclude_columns.is_empty() {
3921 new_exclude_columns.sort();
3922 let new_exclude_columns = new_exclude_columns
3923 .into_iter()
3924 .map(WithOptionValue::UnresolvedItemName)
3925 .collect();
3926
3927 curr_options.push(MySqlConfigOption {
3928 name: MySqlConfigOptionName::ExcludeColumns,
3929 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3930 });
3931 }
3932 }
3933 CreateSourceConnection::SqlServer {
3934 options: curr_options,
3935 ..
3936 } => {
3937 let mz_sql::plan::SqlServerConfigOptionExtracted {
3938 mut text_columns,
3939 mut exclude_columns,
3940 ..
3941 } = curr_options.clone().try_into()?;
3942
3943 curr_options.retain(|o| {
3946 !matches!(
3947 o.name,
3948 SqlServerConfigOptionName::TextColumns
3949 | SqlServerConfigOptionName::ExcludeColumns
3950 )
3951 });
3952
3953 let column_referenced =
3955 |column_qualified_reference: &UnresolvedItemName| {
3956 mz_ore::soft_assert_eq_or_log!(
3957 column_qualified_reference.0.len(),
3958 3,
3959 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3960 );
3961 let mut table = column_qualified_reference.clone();
3962 table.0.truncate(2);
3963 curr_references.contains(&table)
3964 };
3965 text_columns.retain(column_referenced);
3966 exclude_columns.retain(column_referenced);
3967
3968 new_text_columns.extend(text_columns);
3970 new_exclude_columns.extend(exclude_columns);
3971
3972 if !new_text_columns.is_empty() {
3974 new_text_columns.sort();
3975 let new_text_columns = new_text_columns
3976 .into_iter()
3977 .map(WithOptionValue::UnresolvedItemName)
3978 .collect();
3979
3980 curr_options.push(SqlServerConfigOption {
3981 name: SqlServerConfigOptionName::TextColumns,
3982 value: Some(WithOptionValue::Sequence(new_text_columns)),
3983 });
3984 }
3985 if !new_exclude_columns.is_empty() {
3987 new_exclude_columns.sort();
3988 let new_exclude_columns = new_exclude_columns
3989 .into_iter()
3990 .map(WithOptionValue::UnresolvedItemName)
3991 .collect();
3992
3993 curr_options.push(SqlServerConfigOption {
3994 name: SqlServerConfigOptionName::ExcludeColumns,
3995 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3996 });
3997 }
3998 }
3999 _ => return Err(purification_err()),
4000 };
4001
4002 let mut catalog = self.catalog().for_system_session();
4003 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4004
4005 let plan = match mz_sql::plan::plan(
4007 None,
4008 &catalog,
4009 Statement::CreateSource(create_source_stmt),
4010 &Params::empty(),
4011 &resolved_ids,
4012 )
4013 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4014 {
4015 Plan::CreateSource(plan) => plan,
4016 _ => unreachable!("create source plan is only valid response"),
4017 };
4018
4019 let source = Source::new(
4023 plan,
4024 cur_source.global_id,
4025 resolved_ids,
4026 cur_source.custom_logical_compaction_window,
4027 cur_source.is_retained_metrics_object,
4028 );
4029
4030 let desc = match &source.data_source {
4032 DataSourceDesc::Ingestion { desc, .. }
4033 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4034 desc.clone().into_inline_connection(self.catalog().state())
4035 }
4036 _ => unreachable!("already verified of type ingestion"),
4037 };
4038
4039 self.controller
4040 .storage
4041 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4042 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4043
4044 let mut ops = vec![catalog::Op::UpdateItem {
4047 id: item_id,
4048 name: self.catalog.get_entry(&item_id).name().clone(),
4051 to_item: CatalogItem::Source(source),
4052 }];
4053
4054 let CreateSourceInner {
4055 ops: new_ops,
4056 sources: _,
4057 if_not_exists_ids,
4058 } = self.create_source_inner(session, subsources).await?;
4059
4060 ops.extend(new_ops.into_iter());
4061
4062 assert!(
4063 if_not_exists_ids.is_empty(),
4064 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4065 );
4066
4067 self.catalog_transact(Some(session), ops).await?;
4068 }
4069 plan::AlterSourceAction::RefreshReferences { references } => {
4070 self.catalog_transact(
4071 Some(session),
4072 vec![catalog::Op::UpdateSourceReferences {
4073 source_id: item_id,
4074 references: references.into(),
4075 }],
4076 )
4077 .await?;
4078 }
4079 }
4080
4081 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4082 }
4083
4084 #[instrument]
4085 pub(super) async fn sequence_alter_system_set(
4086 &mut self,
4087 session: &Session,
4088 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4089 ) -> Result<ExecuteResponse, AdapterError> {
4090 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4091 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4093 self.validate_alter_system_network_policy(session, &value)?;
4094 }
4095
4096 let op = match value {
4097 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4098 name: name.clone(),
4099 value: OwnedVarInput::SqlSet(values),
4100 },
4101 plan::VariableValue::Default => {
4102 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4103 }
4104 };
4105 self.catalog_transact(Some(session), vec![op]).await?;
4106
4107 session.add_notice(AdapterNotice::VarDefaultUpdated {
4108 role: None,
4109 var_name: Some(name),
4110 });
4111 Ok(ExecuteResponse::AlteredSystemConfiguration)
4112 }
4113
4114 #[instrument]
4115 pub(super) async fn sequence_alter_system_reset(
4116 &mut self,
4117 session: &Session,
4118 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4119 ) -> Result<ExecuteResponse, AdapterError> {
4120 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4121 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4122 self.catalog_transact(Some(session), vec![op]).await?;
4123 session.add_notice(AdapterNotice::VarDefaultUpdated {
4124 role: None,
4125 var_name: Some(name),
4126 });
4127 Ok(ExecuteResponse::AlteredSystemConfiguration)
4128 }
4129
4130 #[instrument]
4131 pub(super) async fn sequence_alter_system_reset_all(
4132 &mut self,
4133 session: &Session,
4134 _: plan::AlterSystemResetAllPlan,
4135 ) -> Result<ExecuteResponse, AdapterError> {
4136 self.is_user_allowed_to_alter_system(session, None)?;
4137 let op = catalog::Op::ResetAllSystemConfiguration;
4138 self.catalog_transact(Some(session), vec![op]).await?;
4139 session.add_notice(AdapterNotice::VarDefaultUpdated {
4140 role: None,
4141 var_name: None,
4142 });
4143 Ok(ExecuteResponse::AlteredSystemConfiguration)
4144 }
4145
4146 fn is_user_allowed_to_alter_system(
4148 &self,
4149 session: &Session,
4150 var_name: Option<&str>,
4151 ) -> Result<(), AdapterError> {
4152 match (session.user().kind(), var_name) {
4153 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4155 (UserKind::Superuser, Some(name))
4157 if session.user().is_internal()
4158 || self.catalog().system_config().user_modifiable(name) =>
4159 {
4160 let var = self.catalog().system_config().get(name)?;
4163 var.visible(session.user(), self.catalog().system_config())?;
4164 Ok(())
4165 }
4166 (UserKind::Regular, Some(name))
4169 if self.catalog().system_config().user_modifiable(name) =>
4170 {
4171 Err(AdapterError::Unauthorized(
4172 rbac::UnauthorizedError::Superuser {
4173 action: format!("toggle the '{name}' system configuration parameter"),
4174 },
4175 ))
4176 }
4177 _ => Err(AdapterError::Unauthorized(
4178 rbac::UnauthorizedError::MzSystem {
4179 action: "alter system".into(),
4180 },
4181 )),
4182 }
4183 }
4184
4185 fn validate_alter_system_network_policy(
4186 &self,
4187 session: &Session,
4188 policy_value: &plan::VariableValue,
4189 ) -> Result<(), AdapterError> {
4190 let policy_name = match &policy_value {
4191 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4193 plan::VariableValue::Values(values) if values.len() == 1 => {
4194 values.iter().next().cloned()
4195 }
4196 plan::VariableValue::Values(values) => {
4197 tracing::warn!(?values, "can't set multiple network policies at once");
4198 None
4199 }
4200 };
4201 let maybe_network_policy = policy_name
4202 .as_ref()
4203 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4204 let Some(network_policy) = maybe_network_policy else {
4205 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4206 VarError::InvalidParameterValue {
4207 name: NETWORK_POLICY.name(),
4208 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4209 reason: "no network policy with such name exists".to_string(),
4210 },
4211 )));
4212 };
4213 self.validate_alter_network_policy(session, &network_policy.rules)
4214 }
4215
4216 fn validate_alter_network_policy(
4221 &self,
4222 session: &Session,
4223 policy_rules: &Vec<NetworkPolicyRule>,
4224 ) -> Result<(), AdapterError> {
4225 if session.user().is_internal() {
4228 return Ok(());
4229 }
4230 if let Some(ip) = session.meta().client_ip() {
4231 validate_ip_with_policy_rules(ip, policy_rules)
4232 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4233 } else {
4234 return Err(AdapterError::NetworkPolicyDenied(
4237 NetworkPolicyError::MissingIp,
4238 ));
4239 }
4240 Ok(())
4241 }
4242
4243 #[instrument]
4245 pub(super) fn sequence_execute(
4246 &self,
4247 session: &mut Session,
4248 plan: plan::ExecutePlan,
4249 ) -> Result<String, AdapterError> {
4250 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4252 let ps = session
4253 .get_prepared_statement_unverified(&plan.name)
4254 .expect("known to exist");
4255 let stmt = ps.stmt().cloned();
4256 let desc = ps.desc().clone();
4257 let state_revision = ps.state_revision;
4258 let logging = Arc::clone(ps.logging());
4259 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4260 }
4261
4262 #[instrument]
4263 pub(super) async fn sequence_grant_privileges(
4264 &mut self,
4265 session: &Session,
4266 plan::GrantPrivilegesPlan {
4267 update_privileges,
4268 grantees,
4269 }: plan::GrantPrivilegesPlan,
4270 ) -> Result<ExecuteResponse, AdapterError> {
4271 self.sequence_update_privileges(
4272 session,
4273 update_privileges,
4274 grantees,
4275 UpdatePrivilegeVariant::Grant,
4276 )
4277 .await
4278 }
4279
4280 #[instrument]
4281 pub(super) async fn sequence_revoke_privileges(
4282 &mut self,
4283 session: &Session,
4284 plan::RevokePrivilegesPlan {
4285 update_privileges,
4286 revokees,
4287 }: plan::RevokePrivilegesPlan,
4288 ) -> Result<ExecuteResponse, AdapterError> {
4289 self.sequence_update_privileges(
4290 session,
4291 update_privileges,
4292 revokees,
4293 UpdatePrivilegeVariant::Revoke,
4294 )
4295 .await
4296 }
4297
4298 #[instrument]
4299 async fn sequence_update_privileges(
4300 &mut self,
4301 session: &Session,
4302 update_privileges: Vec<UpdatePrivilege>,
4303 grantees: Vec<RoleId>,
4304 variant: UpdatePrivilegeVariant,
4305 ) -> Result<ExecuteResponse, AdapterError> {
4306 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4307 let mut warnings = Vec::new();
4308 let catalog = self.catalog().for_session(session);
4309
4310 for UpdatePrivilege {
4311 acl_mode,
4312 target_id,
4313 grantor,
4314 } in update_privileges
4315 {
4316 let actual_object_type = catalog.get_system_object_type(&target_id);
4317 if actual_object_type.is_relation() {
4320 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4321 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4322 if !non_applicable_privileges.is_empty() {
4323 let object_description =
4324 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4325 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4326 non_applicable_privileges,
4327 object_description,
4328 })
4329 }
4330 }
4331
4332 if let SystemObjectId::Object(object_id) = &target_id {
4333 self.catalog()
4334 .ensure_not_reserved_object(object_id, session.conn_id())?;
4335 }
4336
4337 let privileges = self
4338 .catalog()
4339 .get_privileges(&target_id, session.conn_id())
4340 .ok_or(AdapterError::Unsupported(
4343 "GRANTs/REVOKEs on an object type with no privileges",
4344 ))?;
4345
4346 for grantee in &grantees {
4347 self.catalog().ensure_not_system_role(grantee)?;
4348 self.catalog().ensure_not_predefined_role(grantee)?;
4349 let existing_privilege = privileges
4350 .get_acl_item(grantee, &grantor)
4351 .map(Cow::Borrowed)
4352 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4353
4354 match variant {
4355 UpdatePrivilegeVariant::Grant
4356 if !existing_privilege.acl_mode.contains(acl_mode) =>
4357 {
4358 ops.push(catalog::Op::UpdatePrivilege {
4359 target_id: target_id.clone(),
4360 privilege: MzAclItem {
4361 grantee: *grantee,
4362 grantor,
4363 acl_mode,
4364 },
4365 variant,
4366 });
4367 }
4368 UpdatePrivilegeVariant::Revoke
4369 if !existing_privilege
4370 .acl_mode
4371 .intersection(acl_mode)
4372 .is_empty() =>
4373 {
4374 ops.push(catalog::Op::UpdatePrivilege {
4375 target_id: target_id.clone(),
4376 privilege: MzAclItem {
4377 grantee: *grantee,
4378 grantor,
4379 acl_mode,
4380 },
4381 variant,
4382 });
4383 }
4384 _ => {}
4386 }
4387 }
4388 }
4389
4390 if ops.is_empty() {
4391 session.add_notices(warnings);
4392 return Ok(variant.into());
4393 }
4394
4395 let res = self
4396 .catalog_transact(Some(session), ops)
4397 .await
4398 .map(|_| match variant {
4399 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4400 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4401 });
4402 if res.is_ok() {
4403 session.add_notices(warnings);
4404 }
4405 res
4406 }
4407
4408 #[instrument]
4409 pub(super) async fn sequence_alter_default_privileges(
4410 &mut self,
4411 session: &Session,
4412 plan::AlterDefaultPrivilegesPlan {
4413 privilege_objects,
4414 privilege_acl_items,
4415 is_grant,
4416 }: plan::AlterDefaultPrivilegesPlan,
4417 ) -> Result<ExecuteResponse, AdapterError> {
4418 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4419 let variant = if is_grant {
4420 UpdatePrivilegeVariant::Grant
4421 } else {
4422 UpdatePrivilegeVariant::Revoke
4423 };
4424 for privilege_object in &privilege_objects {
4425 self.catalog()
4426 .ensure_not_system_role(&privilege_object.role_id)?;
4427 self.catalog()
4428 .ensure_not_predefined_role(&privilege_object.role_id)?;
4429 if let Some(database_id) = privilege_object.database_id {
4430 self.catalog()
4431 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4432 }
4433 if let Some(schema_id) = privilege_object.schema_id {
4434 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4435 let schema_spec: SchemaSpecifier = schema_id.into();
4436
4437 self.catalog().ensure_not_reserved_object(
4438 &(database_spec, schema_spec).into(),
4439 session.conn_id(),
4440 )?;
4441 }
4442 for privilege_acl_item in &privilege_acl_items {
4443 self.catalog()
4444 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4445 self.catalog()
4446 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4447 ops.push(catalog::Op::UpdateDefaultPrivilege {
4448 privilege_object: privilege_object.clone(),
4449 privilege_acl_item: privilege_acl_item.clone(),
4450 variant,
4451 })
4452 }
4453 }
4454
4455 self.catalog_transact(Some(session), ops).await?;
4456 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4457 }
4458
4459 #[instrument]
4460 pub(super) async fn sequence_grant_role(
4461 &mut self,
4462 session: &Session,
4463 plan::GrantRolePlan {
4464 role_ids,
4465 member_ids,
4466 grantor_id,
4467 }: plan::GrantRolePlan,
4468 ) -> Result<ExecuteResponse, AdapterError> {
4469 let catalog = self.catalog();
4470 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4471 for role_id in role_ids {
4472 for member_id in &member_ids {
4473 let member_membership: BTreeSet<_> =
4474 catalog.get_role(member_id).membership().keys().collect();
4475 if member_membership.contains(&role_id) {
4476 let role_name = catalog.get_role(&role_id).name().to_string();
4477 let member_name = catalog.get_role(member_id).name().to_string();
4478 catalog.ensure_not_reserved_role(member_id)?;
4480 catalog.ensure_grantable_role(&role_id)?;
4481 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4482 role_name,
4483 member_name,
4484 });
4485 } else {
4486 ops.push(catalog::Op::GrantRole {
4487 role_id,
4488 member_id: *member_id,
4489 grantor_id,
4490 });
4491 }
4492 }
4493 }
4494
4495 if ops.is_empty() {
4496 return Ok(ExecuteResponse::GrantedRole);
4497 }
4498
4499 self.catalog_transact(Some(session), ops)
4500 .await
4501 .map(|_| ExecuteResponse::GrantedRole)
4502 }
4503
4504 #[instrument]
4505 pub(super) async fn sequence_revoke_role(
4506 &mut self,
4507 session: &Session,
4508 plan::RevokeRolePlan {
4509 role_ids,
4510 member_ids,
4511 grantor_id,
4512 }: plan::RevokeRolePlan,
4513 ) -> Result<ExecuteResponse, AdapterError> {
4514 let catalog = self.catalog();
4515 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4516 for role_id in role_ids {
4517 for member_id in &member_ids {
4518 let member_membership: BTreeSet<_> =
4519 catalog.get_role(member_id).membership().keys().collect();
4520 if !member_membership.contains(&role_id) {
4521 let role_name = catalog.get_role(&role_id).name().to_string();
4522 let member_name = catalog.get_role(member_id).name().to_string();
4523 catalog.ensure_not_reserved_role(member_id)?;
4525 catalog.ensure_grantable_role(&role_id)?;
4526 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4527 role_name,
4528 member_name,
4529 });
4530 } else {
4531 ops.push(catalog::Op::RevokeRole {
4532 role_id,
4533 member_id: *member_id,
4534 grantor_id,
4535 });
4536 }
4537 }
4538 }
4539
4540 if ops.is_empty() {
4541 return Ok(ExecuteResponse::RevokedRole);
4542 }
4543
4544 self.catalog_transact(Some(session), ops)
4545 .await
4546 .map(|_| ExecuteResponse::RevokedRole)
4547 }
4548
4549 #[instrument]
4550 pub(super) async fn sequence_alter_owner(
4551 &mut self,
4552 session: &Session,
4553 plan::AlterOwnerPlan {
4554 id,
4555 object_type,
4556 new_owner,
4557 }: plan::AlterOwnerPlan,
4558 ) -> Result<ExecuteResponse, AdapterError> {
4559 let mut ops = vec![catalog::Op::UpdateOwner {
4560 id: id.clone(),
4561 new_owner,
4562 }];
4563
4564 match &id {
4565 ObjectId::Item(global_id) => {
4566 let entry = self.catalog().get_entry(global_id);
4567
4568 if entry.is_index() {
4570 let name = self
4571 .catalog()
4572 .resolve_full_name(entry.name(), Some(session.conn_id()))
4573 .to_string();
4574 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4575 return Ok(ExecuteResponse::AlteredObject(object_type));
4576 }
4577
4578 let dependent_index_ops = entry
4580 .used_by()
4581 .into_iter()
4582 .filter(|id| self.catalog().get_entry(id).is_index())
4583 .map(|id| catalog::Op::UpdateOwner {
4584 id: ObjectId::Item(*id),
4585 new_owner,
4586 });
4587 ops.extend(dependent_index_ops);
4588
4589 let dependent_subsources =
4591 entry
4592 .progress_id()
4593 .into_iter()
4594 .map(|item_id| catalog::Op::UpdateOwner {
4595 id: ObjectId::Item(item_id),
4596 new_owner,
4597 });
4598 ops.extend(dependent_subsources);
4599 }
4600 ObjectId::Cluster(cluster_id) => {
4601 let cluster = self.catalog().get_cluster(*cluster_id);
4602 let managed_cluster_replica_ops =
4604 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4605 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4606 new_owner,
4607 });
4608 ops.extend(managed_cluster_replica_ops);
4609 }
4610 _ => {}
4611 }
4612
4613 self.catalog_transact(Some(session), ops)
4614 .await
4615 .map(|_| ExecuteResponse::AlteredObject(object_type))
4616 }
4617
4618 #[instrument]
4619 pub(super) async fn sequence_reassign_owned(
4620 &mut self,
4621 session: &Session,
4622 plan::ReassignOwnedPlan {
4623 old_roles,
4624 new_role,
4625 reassign_ids,
4626 }: plan::ReassignOwnedPlan,
4627 ) -> Result<ExecuteResponse, AdapterError> {
4628 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4629 self.catalog().ensure_not_reserved_role(role_id)?;
4630 }
4631
4632 let ops = reassign_ids
4633 .into_iter()
4634 .map(|id| catalog::Op::UpdateOwner {
4635 id,
4636 new_owner: new_role,
4637 })
4638 .collect();
4639
4640 self.catalog_transact(Some(session), ops)
4641 .await
4642 .map(|_| ExecuteResponse::ReassignOwned)
4643 }
4644
4645 #[instrument]
4646 pub(crate) async fn handle_deferred_statement(&mut self) {
4647 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4651 return;
4652 };
4653 match ps {
4654 crate::coord::PlanStatement::Statement { stmt, params } => {
4655 self.handle_execute_inner(stmt, params, ctx).await;
4656 }
4657 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4658 self.sequence_plan(ctx, plan, resolved_ids).await;
4659 }
4660 }
4661 }
4662
4663 #[instrument]
4664 #[allow(clippy::unused_async)]
4666 pub(super) async fn sequence_alter_table(
4667 &mut self,
4668 ctx: &mut ExecuteContext,
4669 plan: plan::AlterTablePlan,
4670 ) -> Result<ExecuteResponse, AdapterError> {
4671 let plan::AlterTablePlan {
4672 relation_id,
4673 column_name,
4674 column_type,
4675 raw_sql_type,
4676 } = plan;
4677
4678 let id_ts = self.get_catalog_write_ts().await;
4680 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4681 let ops = vec![catalog::Op::AlterAddColumn {
4682 id: relation_id,
4683 new_global_id,
4684 name: column_name,
4685 typ: column_type,
4686 sql: raw_sql_type,
4687 }];
4688
4689 self.catalog_transact_with_context(None, Some(ctx), ops)
4690 .await?;
4691
4692 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4693 }
4694
4695 #[instrument]
4696 pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4697 &mut self,
4698 ctx: &ExecuteContext,
4699 plan: AlterMaterializedViewApplyReplacementPlan,
4700 ) -> Result<ExecuteResponse, AdapterError> {
4701 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4702
4703 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4707 self.catalog_transact(Some(ctx.session()), ops).await?;
4708
4709 Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4710 }
4711
4712 pub(super) async fn statistics_oracle(
4713 &self,
4714 session: &Session,
4715 source_ids: &BTreeSet<GlobalId>,
4716 query_as_of: &Antichain<Timestamp>,
4717 is_oneshot: bool,
4718 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4719 super::statistics_oracle(
4720 session,
4721 source_ids,
4722 query_as_of,
4723 is_oneshot,
4724 self.catalog().system_config(),
4725 self.controller.storage_collections.as_ref(),
4726 )
4727 .await
4728 }
4729}
4730
4731impl Coordinator {
4732 async fn process_dataflow_metainfo(
4734 &mut self,
4735 df_meta: DataflowMetainfo,
4736 export_id: GlobalId,
4737 ctx: Option<&mut ExecuteContext>,
4738 notice_ids: Vec<GlobalId>,
4739 ) -> Option<BuiltinTableAppendNotify> {
4740 if let Some(ctx) = ctx {
4742 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4743 }
4744
4745 let df_meta = self
4747 .catalog()
4748 .render_notices(df_meta, notice_ids, Some(export_id));
4749
4750 if self.catalog().state().system_config().enable_mz_notices()
4753 && !df_meta.optimizer_notices.is_empty()
4754 {
4755 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4756 self.catalog().state().pack_optimizer_notices(
4757 &mut builtin_table_updates,
4758 df_meta.optimizer_notices.iter(),
4759 Diff::ONE,
4760 );
4761
4762 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4764
4765 Some(
4766 self.builtin_table_update()
4767 .execute(builtin_table_updates)
4768 .await
4769 .0,
4770 )
4771 } else {
4772 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4774
4775 None
4776 }
4777 }
4778}