1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_cloud_resources::VpcEndpointConfig;
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43 RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48 SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58 SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62 StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
103 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
104 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
105 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
106 validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114 WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119mod cluster;
120mod copy_from;
121mod create_continual_task;
122mod create_index;
123mod create_materialized_view;
124mod create_view;
125mod explain_timestamp;
126mod peek;
127mod secret;
128mod subscribe;
129
130macro_rules! return_if_err {
133 ($expr:expr, $ctx:expr) => {
134 match $expr {
135 Ok(v) => v,
136 Err(e) => return $ctx.retire(Err(e.into())),
137 }
138 };
139}
140
141pub(super) use return_if_err;
142
143struct DropOps {
144 ops: Vec<catalog::Op>,
145 dropped_active_db: bool,
146 dropped_active_cluster: bool,
147 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
148}
149
150struct CreateSourceInner {
152 ops: Vec<catalog::Op>,
153 sources: Vec<(CatalogItemId, Source)>,
154 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
155}
156
157impl Coordinator {
158 pub(crate) async fn sequence_staged<S>(
163 &mut self,
164 mut ctx: S::Ctx,
165 parent_span: Span,
166 mut stage: S,
167 ) where
168 S: Staged + 'static,
169 S::Ctx: Send + 'static,
170 {
171 return_if_err!(stage.validity().check(self.catalog()), ctx);
172 loop {
173 let mut cancel_enabled = stage.cancel_enabled();
174 if let Some(session) = ctx.session() {
175 if cancel_enabled {
176 if let Some((_prev_tx, prev_rx)) = self
179 .staged_cancellation
180 .insert(session.conn_id().clone(), watch::channel(false))
181 {
182 let was_canceled = *prev_rx.borrow();
183 if was_canceled {
184 ctx.retire(Err(AdapterError::Canceled));
185 return;
186 }
187 }
188 } else {
189 self.staged_cancellation.remove(session.conn_id());
192 }
193 } else {
194 cancel_enabled = false
195 };
196 let next = stage
197 .stage(self, &mut ctx)
198 .instrument(parent_span.clone())
199 .await;
200 let res = return_if_err!(next, ctx);
201 stage = match res {
202 StageResult::Handle(handle) => {
203 let internal_cmd_tx = self.internal_cmd_tx.clone();
204 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
205 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
206 });
207 return;
208 }
209 StageResult::HandleRetire(handle) => {
210 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
211 ctx.retire(Ok(resp));
212 });
213 return;
214 }
215 StageResult::Response(resp) => {
216 ctx.retire(Ok(resp));
217 return;
218 }
219 StageResult::Immediate(stage) => *stage,
220 }
221 }
222 }
223
224 fn handle_spawn<C, T, F>(
225 &self,
226 ctx: C,
227 handle: JoinHandle<Result<T, AdapterError>>,
228 cancel_enabled: bool,
229 f: F,
230 ) where
231 C: StagedContext + Send + 'static,
232 T: Send + 'static,
233 F: FnOnce(C, T) + Send + 'static,
234 {
235 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
236 .session()
237 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
238 {
239 let mut rx = rx.clone();
240 Box::pin(async move {
241 let _ = rx.wait_for(|v| *v).await;
243 ()
244 })
245 } else {
246 Box::pin(future::pending())
247 };
248 spawn(|| "sequence_staged", async move {
249 tokio::select! {
250 res = handle => {
251 let next = return_if_err!(res, ctx);
252 f(ctx, next);
253 }
254 _ = rx, if cancel_enabled => {
255 ctx.retire(Err(AdapterError::Canceled));
256 }
257 }
258 });
259 }
260
261 async fn create_source_inner(
262 &self,
263 session: &Session,
264 plans: Vec<plan::CreateSourcePlanBundle>,
265 ) -> Result<CreateSourceInner, AdapterError> {
266 let mut ops = vec![];
267 let mut sources = vec![];
268
269 let if_not_exists_ids = plans
270 .iter()
271 .filter_map(
272 |plan::CreateSourcePlanBundle {
273 item_id,
274 global_id: _,
275 plan,
276 resolved_ids: _,
277 available_source_references: _,
278 }| {
279 if plan.if_not_exists {
280 Some((*item_id, plan.name.clone()))
281 } else {
282 None
283 }
284 },
285 )
286 .collect::<BTreeMap<_, _>>();
287
288 for plan::CreateSourcePlanBundle {
289 item_id,
290 global_id,
291 mut plan,
292 resolved_ids,
293 available_source_references,
294 } in plans
295 {
296 let name = plan.name.clone();
297
298 match plan.source.data_source {
299 plan::DataSourceDesc::Ingestion(ref desc)
300 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
301 let cluster_id = plan
302 .in_cluster
303 .expect("ingestion plans must specify cluster");
304 match desc.connection {
305 GenericSourceConnection::Postgres(_)
306 | GenericSourceConnection::MySql(_)
307 | GenericSourceConnection::SqlServer(_)
308 | GenericSourceConnection::Kafka(_)
309 | GenericSourceConnection::LoadGenerator(_) => {
310 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
311 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
312 .get(self.catalog().system_config().dyncfgs());
313
314 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
315 {
316 return Err(AdapterError::Unsupported(
317 "sources in clusters with >1 replicas",
318 ));
319 }
320 }
321 }
322 }
323 }
324 plan::DataSourceDesc::Webhook { .. } => {
325 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
326 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
327 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
328 .get(self.catalog().system_config().dyncfgs());
329
330 if !enable_multi_replica_sources {
331 if cluster.replica_ids().len() > 1 {
332 return Err(AdapterError::Unsupported(
333 "webhook sources in clusters with >1 replicas",
334 ));
335 }
336 }
337 }
338 }
339 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
340 }
341
342 if let mz_sql::plan::DataSourceDesc::Webhook {
344 validate_using: Some(validate),
345 ..
346 } = &mut plan.source.data_source
347 {
348 if let Err(reason) = validate.reduce_expression().await {
349 self.metrics
350 .webhook_validation_reduce_failures
351 .with_label_values(&[reason])
352 .inc();
353 return Err(AdapterError::Internal(format!(
354 "failed to reduce check expression, {reason}"
355 )));
356 }
357 }
358
359 let mut reference_ops = vec![];
362 if let Some(references) = &available_source_references {
363 reference_ops.push(catalog::Op::UpdateSourceReferences {
364 source_id: item_id,
365 references: references.clone().into(),
366 });
367 }
368
369 let source = Source::new(plan, global_id, resolved_ids, None, false);
370 ops.push(catalog::Op::CreateItem {
371 id: item_id,
372 name,
373 item: CatalogItem::Source(source.clone()),
374 owner_id: *session.current_role_id(),
375 });
376 sources.push((item_id, source));
377 ops.extend(reference_ops);
379 }
380
381 Ok(CreateSourceInner {
382 ops,
383 sources,
384 if_not_exists_ids,
385 })
386 }
387
388 pub(crate) fn plan_subsource(
396 &self,
397 session: &Session,
398 params: &mz_sql::plan::Params,
399 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
400 item_id: CatalogItemId,
401 global_id: GlobalId,
402 ) -> Result<CreateSourcePlanBundle, AdapterError> {
403 let catalog = self.catalog().for_session(session);
404 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
405
406 let plan = self.plan_statement(
407 session,
408 Statement::CreateSubsource(subsource_stmt),
409 params,
410 &resolved_ids,
411 )?;
412 let plan = match plan {
413 Plan::CreateSource(plan) => plan,
414 _ => unreachable!(),
415 };
416 Ok(CreateSourcePlanBundle {
417 item_id,
418 global_id,
419 plan,
420 resolved_ids,
421 available_source_references: None,
422 })
423 }
424
425 pub(crate) async fn plan_purified_alter_source_add_subsource(
427 &mut self,
428 session: &Session,
429 params: Params,
430 source_name: ResolvedItemName,
431 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
432 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
433 ) -> Result<(Plan, ResolvedIds), AdapterError> {
434 let mut subsource_plans = Vec::with_capacity(subsources.len());
435
436 let conn_catalog = self.catalog().for_system_session();
438 let pcx = plan::PlanContext::zero();
439 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
440
441 let entry = self.catalog().get_entry(source_name.item_id());
442 let source = entry.source().ok_or_else(|| {
443 AdapterError::internal(
444 "plan alter source",
445 format!("expected Source found {entry:?}"),
446 )
447 })?;
448
449 let item_id = entry.id();
450 let ingestion_id = source.global_id();
451 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
452
453 let id_ts = self.get_catalog_write_ts().await;
454 let ids = self
455 .catalog()
456 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
457 .await?;
458 for (subsource_stmt, (item_id, global_id)) in
459 subsource_stmts.into_iter().zip_eq(ids.into_iter())
460 {
461 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
462 subsource_plans.push(s);
463 }
464
465 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
466 subsources: subsource_plans,
467 options,
468 };
469
470 Ok((
471 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
472 item_id,
473 ingestion_id,
474 action,
475 }),
476 ResolvedIds::empty(),
477 ))
478 }
479
480 pub(crate) fn plan_purified_alter_source_refresh_references(
482 &self,
483 _session: &Session,
484 _params: Params,
485 source_name: ResolvedItemName,
486 available_source_references: plan::SourceReferences,
487 ) -> Result<(Plan, ResolvedIds), AdapterError> {
488 let entry = self.catalog().get_entry(source_name.item_id());
489 let source = entry.source().ok_or_else(|| {
490 AdapterError::internal(
491 "plan alter source",
492 format!("expected Source found {entry:?}"),
493 )
494 })?;
495 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
496 references: available_source_references,
497 };
498
499 Ok((
500 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
501 item_id: entry.id(),
502 ingestion_id: source.global_id(),
503 action,
504 }),
505 ResolvedIds::empty(),
506 ))
507 }
508
509 pub(crate) async fn plan_purified_create_source(
513 &mut self,
514 ctx: &ExecuteContext,
515 params: Params,
516 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
517 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
518 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
519 available_source_references: plan::SourceReferences,
520 ) -> Result<(Plan, ResolvedIds), AdapterError> {
521 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
522
523 if let Some(progress_stmt) = progress_stmt {
525 assert_none!(progress_stmt.of_source);
530 let id_ts = self.get_catalog_write_ts().await;
531 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
532 let progress_plan =
533 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
534 let progress_full_name = self
535 .catalog()
536 .resolve_full_name(&progress_plan.plan.name, None);
537 let progress_subsource = ResolvedItemName::Item {
538 id: progress_plan.item_id,
539 qualifiers: progress_plan.plan.name.qualifiers.clone(),
540 full_name: progress_full_name,
541 print_id: true,
542 version: RelationVersionSelector::Latest,
543 };
544
545 create_source_plans.push(progress_plan);
546
547 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548 }
549
550 let catalog = self.catalog().for_session(ctx.session());
551 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553 let propagated_with_options: Vec<_> = source_stmt
554 .with_options
555 .iter()
556 .filter_map(|opt| match opt.name {
557 CreateSourceOptionName::TimestampInterval => None,
558 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559 name: CreateSubsourceOptionName::RetainHistory,
560 value: opt.value.clone(),
561 }),
562 })
563 .collect();
564
565 let source_plan = match self.plan_statement(
567 ctx.session(),
568 Statement::CreateSource(source_stmt),
569 ¶ms,
570 &resolved_ids,
571 )? {
572 Plan::CreateSource(plan) => plan,
573 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574 };
575
576 let id_ts = self.get_catalog_write_ts().await;
577 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
578
579 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
580 let of_source = ResolvedItemName::Item {
581 id: item_id,
582 qualifiers: source_plan.name.qualifiers.clone(),
583 full_name: source_full_name,
584 print_id: true,
585 version: RelationVersionSelector::Latest,
586 };
587
588 let conn_catalog = self.catalog().for_system_session();
590 let pcx = plan::PlanContext::zero();
591 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
592
593 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
594
595 for subsource_stmt in subsource_stmts.iter_mut() {
596 subsource_stmt
597 .with_options
598 .extend(propagated_with_options.iter().cloned())
599 }
600
601 create_source_plans.push(CreateSourcePlanBundle {
602 item_id,
603 global_id,
604 plan: source_plan,
605 resolved_ids: resolved_ids.clone(),
606 available_source_references: Some(available_source_references),
607 });
608
609 let id_ts = self.get_catalog_write_ts().await;
611 let ids = self
612 .catalog()
613 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
614 .await?;
615 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
616 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
617 create_source_plans.push(plan);
618 }
619
620 Ok((
621 Plan::CreateSources(create_source_plans),
622 ResolvedIds::empty(),
623 ))
624 }
625
626 #[instrument]
627 pub(super) async fn sequence_create_source(
628 &mut self,
629 ctx: &mut ExecuteContext,
630 plans: Vec<plan::CreateSourcePlanBundle>,
631 ) -> Result<ExecuteResponse, AdapterError> {
632 let CreateSourceInner {
633 ops,
634 sources,
635 if_not_exists_ids,
636 } = self.create_source_inner(ctx.session(), plans).await?;
637
638 let transact_result = self
639 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
640 .await;
641
642 for (item_id, source) in &sources {
644 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
645 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
646 ctx.session()
647 .add_notice(AdapterNotice::WebhookSourceCreated { url });
648 }
649 }
650 }
651
652 match transact_result {
653 Ok(()) => Ok(ExecuteResponse::CreatedSource),
654 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
655 kind:
656 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
657 })) if if_not_exists_ids.contains_key(&id) => {
658 ctx.session()
659 .add_notice(AdapterNotice::ObjectAlreadyExists {
660 name: if_not_exists_ids[&id].item.clone(),
661 ty: "source",
662 });
663 Ok(ExecuteResponse::CreatedSource)
664 }
665 Err(err) => Err(err),
666 }
667 }
668
669 #[instrument]
670 pub(super) async fn sequence_create_connection(
671 &mut self,
672 mut ctx: ExecuteContext,
673 plan: plan::CreateConnectionPlan,
674 resolved_ids: ResolvedIds,
675 ) {
676 let id_ts = self.get_catalog_write_ts().await;
677 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
678 Ok(item_id) => item_id,
679 Err(err) => return ctx.retire(Err(err.into())),
680 };
681
682 match &plan.connection.details {
683 ConnectionDetails::Ssh { key_1, key_2, .. } => {
684 let key_1 = match key_1.as_key_pair() {
685 Some(key_1) => key_1.clone(),
686 None => {
687 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
688 "the PUBLIC KEY 1 option cannot be explicitly specified"
689 ))));
690 }
691 };
692
693 let key_2 = match key_2.as_key_pair() {
694 Some(key_2) => key_2.clone(),
695 None => {
696 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
697 "the PUBLIC KEY 2 option cannot be explicitly specified"
698 ))));
699 }
700 };
701
702 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
703 let secret = key_set.to_bytes();
704 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
705 return ctx.retire(Err(err.into()));
706 }
707 }
708 _ => (),
709 };
710
711 if plan.validate {
712 let internal_cmd_tx = self.internal_cmd_tx.clone();
713 let transient_revision = self.catalog().transient_revision();
714 let conn_id = ctx.session().conn_id().clone();
715 let otel_ctx = OpenTelemetryContext::obtain();
716 let role_metadata = ctx.session().role_metadata().clone();
717
718 let connection = plan
719 .connection
720 .details
721 .to_connection()
722 .into_inline_connection(self.catalog().state());
723
724 let current_storage_parameters = self.controller.storage.config().clone();
725 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
726 let result = match connection
727 .validate(connection_id, ¤t_storage_parameters)
728 .await
729 {
730 Ok(()) => Ok(plan),
731 Err(err) => Err(err.into()),
732 };
733
734 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
736 CreateConnectionValidationReady {
737 ctx,
738 result,
739 connection_id,
740 connection_gid,
741 plan_validity: PlanValidity::new(
742 transient_revision,
743 resolved_ids.items().copied().collect(),
744 None,
745 None,
746 role_metadata,
747 ),
748 otel_ctx,
749 resolved_ids: resolved_ids.clone(),
750 },
751 ));
752 if let Err(e) = result {
753 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
754 }
755 });
756 } else {
757 let result = self
758 .sequence_create_connection_stage_finish(
759 &mut ctx,
760 connection_id,
761 connection_gid,
762 plan,
763 resolved_ids,
764 )
765 .await;
766 ctx.retire(result);
767 }
768 }
769
770 #[instrument]
771 pub(crate) async fn sequence_create_connection_stage_finish(
772 &mut self,
773 ctx: &mut ExecuteContext,
774 connection_id: CatalogItemId,
775 connection_gid: GlobalId,
776 plan: plan::CreateConnectionPlan,
777 resolved_ids: ResolvedIds,
778 ) -> Result<ExecuteResponse, AdapterError> {
779 let ops = vec![catalog::Op::CreateItem {
780 id: connection_id,
781 name: plan.name.clone(),
782 item: CatalogItem::Connection(Connection {
783 create_sql: plan.connection.create_sql,
784 global_id: connection_gid,
785 details: plan.connection.details.clone(),
786 resolved_ids,
787 }),
788 owner_id: *ctx.session().current_role_id(),
789 }];
790
791 let transact_result = self
792 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
793 Box::pin(async move {
794 match plan.connection.details {
795 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
796 let spec = VpcEndpointConfig {
797 aws_service_name: privatelink.service_name.to_owned(),
798 availability_zone_ids: privatelink.availability_zones.to_owned(),
799 };
800 let cloud_resource_controller =
801 match coord.cloud_resource_controller.as_ref().cloned() {
802 Some(controller) => controller,
803 None => {
804 tracing::warn!("AWS PrivateLink connections unsupported");
805 return;
806 }
807 };
808 if let Err(err) = cloud_resource_controller
809 .ensure_vpc_endpoint(connection_id, spec)
810 .await
811 {
812 tracing::warn!(?err, "failed to ensure vpc endpoint!");
813 }
814 }
815 _ => {}
816 }
817 })
818 })
819 .await;
820
821 match transact_result {
822 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
823 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
824 kind:
825 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
826 })) if plan.if_not_exists => {
827 ctx.session()
828 .add_notice(AdapterNotice::ObjectAlreadyExists {
829 name: plan.name.item,
830 ty: "connection",
831 });
832 Ok(ExecuteResponse::CreatedConnection)
833 }
834 Err(err) => Err(err),
835 }
836 }
837
838 #[instrument]
839 pub(super) async fn sequence_create_database(
840 &mut self,
841 session: &mut Session,
842 plan: plan::CreateDatabasePlan,
843 ) -> Result<ExecuteResponse, AdapterError> {
844 let ops = vec![catalog::Op::CreateDatabase {
845 name: plan.name.clone(),
846 owner_id: *session.current_role_id(),
847 }];
848 match self.catalog_transact(Some(session), ops).await {
849 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
850 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851 kind:
852 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
853 })) if plan.if_not_exists => {
854 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
855 Ok(ExecuteResponse::CreatedDatabase)
856 }
857 Err(err) => Err(err),
858 }
859 }
860
861 #[instrument]
862 pub(super) async fn sequence_create_schema(
863 &mut self,
864 session: &mut Session,
865 plan: plan::CreateSchemaPlan,
866 ) -> Result<ExecuteResponse, AdapterError> {
867 let op = catalog::Op::CreateSchema {
868 database_id: plan.database_spec,
869 schema_name: plan.schema_name.clone(),
870 owner_id: *session.current_role_id(),
871 };
872 match self.catalog_transact(Some(session), vec![op]).await {
873 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
874 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
875 kind:
876 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
877 })) if plan.if_not_exists => {
878 session.add_notice(AdapterNotice::SchemaAlreadyExists {
879 name: plan.schema_name,
880 });
881 Ok(ExecuteResponse::CreatedSchema)
882 }
883 Err(err) => Err(err),
884 }
885 }
886
887 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
889 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
890 if attributes.superuser.is_some()
891 || attributes.password.is_some()
892 || attributes.login.is_some()
893 {
894 return Err(AdapterError::UnavailableFeature {
895 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
896 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
897 });
898 }
899 }
900 Ok(())
901 }
902
903 #[instrument]
904 pub(super) async fn sequence_create_role(
905 &mut self,
906 conn_id: Option<&ConnectionId>,
907 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
908 ) -> Result<ExecuteResponse, AdapterError> {
909 self.validate_role_attributes(&attributes.clone())?;
910 let op = catalog::Op::CreateRole { name, attributes };
911 self.catalog_transact_with_context(conn_id, None, vec![op])
912 .await
913 .map(|_| ExecuteResponse::CreatedRole)
914 }
915
916 #[instrument]
917 pub(super) async fn sequence_create_network_policy(
918 &mut self,
919 session: &Session,
920 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
921 ) -> Result<ExecuteResponse, AdapterError> {
922 let op = catalog::Op::CreateNetworkPolicy {
923 rules,
924 name,
925 owner_id: *session.current_role_id(),
926 };
927 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
928 .await
929 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
930 }
931
932 #[instrument]
933 pub(super) async fn sequence_alter_network_policy(
934 &mut self,
935 session: &Session,
936 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
937 ) -> Result<ExecuteResponse, AdapterError> {
938 let current_network_policy_name =
940 self.catalog().system_config().default_network_policy_name();
941 if current_network_policy_name == name {
943 self.validate_alter_network_policy(session, &rules)?;
944 }
945
946 let op = catalog::Op::AlterNetworkPolicy {
947 id,
948 rules,
949 name,
950 owner_id: *session.current_role_id(),
951 };
952 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
953 .await
954 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
955 }
956
957 #[instrument]
958 pub(super) async fn sequence_create_table(
959 &mut self,
960 ctx: &mut ExecuteContext,
961 plan: plan::CreateTablePlan,
962 resolved_ids: ResolvedIds,
963 ) -> Result<ExecuteResponse, AdapterError> {
964 let plan::CreateTablePlan {
965 name,
966 table,
967 if_not_exists,
968 } = plan;
969
970 let conn_id = if table.temporary {
971 Some(ctx.session().conn_id())
972 } else {
973 None
974 };
975 let id_ts = self.get_catalog_write_ts().await;
976 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
977 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
978
979 let data_source = match table.data_source {
980 plan::TableDataSource::TableWrites { defaults } => {
981 TableDataSource::TableWrites { defaults }
982 }
983 plan::TableDataSource::DataSource {
984 desc: data_source_plan,
985 timeline,
986 } => match data_source_plan {
987 plan::DataSourceDesc::IngestionExport {
988 ingestion_id,
989 external_reference,
990 details,
991 data_config,
992 } => TableDataSource::DataSource {
993 desc: DataSourceDesc::IngestionExport {
994 ingestion_id,
995 external_reference,
996 details,
997 data_config,
998 },
999 timeline,
1000 },
1001 plan::DataSourceDesc::Webhook {
1002 validate_using,
1003 body_format,
1004 headers,
1005 cluster_id,
1006 } => TableDataSource::DataSource {
1007 desc: DataSourceDesc::Webhook {
1008 validate_using,
1009 body_format,
1010 headers,
1011 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1012 },
1013 timeline,
1014 },
1015 o => {
1016 unreachable!("CREATE TABLE data source got {:?}", o)
1017 }
1018 },
1019 };
1020
1021 let is_webhook = if let TableDataSource::DataSource {
1022 desc: DataSourceDesc::Webhook { .. },
1023 timeline: _,
1024 } = &data_source
1025 {
1026 true
1027 } else {
1028 false
1029 };
1030
1031 let table = Table {
1032 create_sql: Some(table.create_sql),
1033 desc: table.desc,
1034 collections,
1035 conn_id: conn_id.cloned(),
1036 resolved_ids,
1037 custom_logical_compaction_window: table.compaction_window,
1038 is_retained_metrics_object: false,
1039 data_source,
1040 };
1041 let ops = vec![catalog::Op::CreateItem {
1042 id: table_id,
1043 name: name.clone(),
1044 item: CatalogItem::Table(table.clone()),
1045 owner_id: *ctx.session().current_role_id(),
1046 }];
1047
1048 let catalog_result = self
1049 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1050 .await;
1051
1052 if is_webhook {
1053 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1056 ctx.session()
1057 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1058 }
1059 }
1060
1061 match catalog_result {
1062 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1063 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1064 kind:
1065 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1066 })) if if_not_exists => {
1067 ctx.session_mut()
1068 .add_notice(AdapterNotice::ObjectAlreadyExists {
1069 name: name.item,
1070 ty: "table",
1071 });
1072 Ok(ExecuteResponse::CreatedTable)
1073 }
1074 Err(err) => Err(err),
1075 }
1076 }
1077
1078 #[instrument]
1079 pub(super) async fn sequence_create_sink(
1080 &mut self,
1081 ctx: ExecuteContext,
1082 plan: plan::CreateSinkPlan,
1083 resolved_ids: ResolvedIds,
1084 ) {
1085 let plan::CreateSinkPlan {
1086 name,
1087 sink,
1088 with_snapshot,
1089 if_not_exists,
1090 in_cluster,
1091 } = plan;
1092
1093 let id_ts = self.get_catalog_write_ts().await;
1095 let (item_id, global_id) =
1096 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1097
1098 let catalog_sink = Sink {
1099 create_sql: sink.create_sql,
1100 global_id,
1101 from: sink.from,
1102 connection: sink.connection,
1103 envelope: sink.envelope,
1104 version: sink.version,
1105 with_snapshot,
1106 resolved_ids,
1107 cluster_id: in_cluster,
1108 };
1109
1110 let ops = vec![catalog::Op::CreateItem {
1111 id: item_id,
1112 name: name.clone(),
1113 item: CatalogItem::Sink(catalog_sink.clone()),
1114 owner_id: *ctx.session().current_role_id(),
1115 }];
1116
1117 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1118
1119 match result {
1120 Ok(()) => {}
1121 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1122 kind:
1123 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1124 })) if if_not_exists => {
1125 ctx.session()
1126 .add_notice(AdapterNotice::ObjectAlreadyExists {
1127 name: name.item,
1128 ty: "sink",
1129 });
1130 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1131 return;
1132 }
1133 Err(e) => {
1134 ctx.retire(Err(e));
1135 return;
1136 }
1137 };
1138
1139 self.create_storage_export(global_id, &catalog_sink)
1140 .await
1141 .unwrap_or_terminate("cannot fail to create exports");
1142
1143 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1144 .await;
1145
1146 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1147 }
1148
1149 pub(super) fn validate_system_column_references(
1172 &self,
1173 uses_ambiguous_columns: bool,
1174 depends_on: &BTreeSet<GlobalId>,
1175 ) -> Result<(), AdapterError> {
1176 if uses_ambiguous_columns
1177 && depends_on
1178 .iter()
1179 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1180 {
1181 Err(AdapterError::AmbiguousSystemColumnReference)
1182 } else {
1183 Ok(())
1184 }
1185 }
1186
1187 #[instrument]
1188 pub(super) async fn sequence_create_type(
1189 &mut self,
1190 session: &Session,
1191 plan: plan::CreateTypePlan,
1192 resolved_ids: ResolvedIds,
1193 ) -> Result<ExecuteResponse, AdapterError> {
1194 let id_ts = self.get_catalog_write_ts().await;
1195 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1196 plan.typ
1198 .inner
1199 .desc(&self.catalog().for_session(session))
1200 .map_err(AdapterError::from)?;
1201 let typ = Type {
1202 create_sql: Some(plan.typ.create_sql),
1203 global_id,
1204 details: CatalogTypeDetails {
1205 array_id: None,
1206 typ: plan.typ.inner,
1207 pg_metadata: None,
1208 },
1209 resolved_ids,
1210 };
1211 let op = catalog::Op::CreateItem {
1212 id: item_id,
1213 name: plan.name,
1214 item: CatalogItem::Type(typ),
1215 owner_id: *session.current_role_id(),
1216 };
1217 match self.catalog_transact(Some(session), vec![op]).await {
1218 Ok(()) => Ok(ExecuteResponse::CreatedType),
1219 Err(err) => Err(err),
1220 }
1221 }
1222
1223 #[instrument]
1224 pub(super) async fn sequence_comment_on(
1225 &mut self,
1226 session: &Session,
1227 plan: plan::CommentPlan,
1228 ) -> Result<ExecuteResponse, AdapterError> {
1229 let op = catalog::Op::Comment {
1230 object_id: plan.object_id,
1231 sub_component: plan.sub_component,
1232 comment: plan.comment,
1233 };
1234 self.catalog_transact(Some(session), vec![op]).await?;
1235 Ok(ExecuteResponse::Comment)
1236 }
1237
1238 #[instrument]
1239 pub(super) async fn sequence_drop_objects(
1240 &mut self,
1241 ctx: &mut ExecuteContext,
1242 plan::DropObjectsPlan {
1243 drop_ids,
1244 object_type,
1245 referenced_ids,
1246 }: plan::DropObjectsPlan,
1247 ) -> Result<ExecuteResponse, AdapterError> {
1248 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1249 let mut objects = Vec::new();
1250 for obj_id in &drop_ids {
1251 if !referenced_ids_hashset.contains(obj_id) {
1252 let object_info = ErrorMessageObjectDescription::from_id(
1253 obj_id,
1254 &self.catalog().for_session(ctx.session()),
1255 )
1256 .to_string();
1257 objects.push(object_info);
1258 }
1259 }
1260
1261 if !objects.is_empty() {
1262 ctx.session()
1263 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1264 }
1265
1266 let DropOps {
1267 ops,
1268 dropped_active_db,
1269 dropped_active_cluster,
1270 dropped_in_use_indexes,
1271 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1272
1273 self.catalog_transact_with_context(None, Some(ctx), ops)
1274 .await?;
1275
1276 fail::fail_point!("after_sequencer_drop_replica");
1277
1278 if dropped_active_db {
1279 ctx.session()
1280 .add_notice(AdapterNotice::DroppedActiveDatabase {
1281 name: ctx.session().vars().database().to_string(),
1282 });
1283 }
1284 if dropped_active_cluster {
1285 ctx.session()
1286 .add_notice(AdapterNotice::DroppedActiveCluster {
1287 name: ctx.session().vars().cluster().to_string(),
1288 });
1289 }
1290 for dropped_in_use_index in dropped_in_use_indexes {
1291 ctx.session()
1292 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1293 self.metrics
1294 .optimization_notices
1295 .with_label_values(&["DroppedInUseIndex"])
1296 .inc_by(1);
1297 }
1298 Ok(ExecuteResponse::DroppedObject(object_type))
1299 }
1300
1301 fn validate_dropped_role_ownership(
1302 &self,
1303 session: &Session,
1304 dropped_roles: &BTreeMap<RoleId, &str>,
1305 ) -> Result<(), AdapterError> {
1306 fn privilege_check(
1307 privileges: &PrivilegeMap,
1308 dropped_roles: &BTreeMap<RoleId, &str>,
1309 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1310 object_id: &SystemObjectId,
1311 catalog: &ConnCatalog,
1312 ) {
1313 for privilege in privileges.all_values() {
1314 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1315 let grantor_name = catalog.get_role(&privilege.grantor).name();
1316 let object_description =
1317 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1318 dependent_objects
1319 .entry(role_name.to_string())
1320 .or_default()
1321 .push(format!(
1322 "privileges on {object_description} granted by {grantor_name}",
1323 ));
1324 }
1325 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1326 let grantee_name = catalog.get_role(&privilege.grantee).name();
1327 let object_description =
1328 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1329 dependent_objects
1330 .entry(role_name.to_string())
1331 .or_default()
1332 .push(format!(
1333 "privileges granted on {object_description} to {grantee_name}"
1334 ));
1335 }
1336 }
1337 }
1338
1339 let catalog = self.catalog().for_session(session);
1340 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1341 for entry in self.catalog.entries() {
1342 let id = SystemObjectId::Object(entry.id().into());
1343 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1344 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1345 dependent_objects
1346 .entry(role_name.to_string())
1347 .or_default()
1348 .push(format!("owner of {object_description}"));
1349 }
1350 privilege_check(
1351 entry.privileges(),
1352 dropped_roles,
1353 &mut dependent_objects,
1354 &id,
1355 &catalog,
1356 );
1357 }
1358 for database in self.catalog.databases() {
1359 let database_id = SystemObjectId::Object(database.id().into());
1360 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1361 let object_description =
1362 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1363 dependent_objects
1364 .entry(role_name.to_string())
1365 .or_default()
1366 .push(format!("owner of {object_description}"));
1367 }
1368 privilege_check(
1369 &database.privileges,
1370 dropped_roles,
1371 &mut dependent_objects,
1372 &database_id,
1373 &catalog,
1374 );
1375 for schema in database.schemas_by_id.values() {
1376 let schema_id = SystemObjectId::Object(
1377 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1378 );
1379 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1380 let object_description =
1381 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1382 dependent_objects
1383 .entry(role_name.to_string())
1384 .or_default()
1385 .push(format!("owner of {object_description}"));
1386 }
1387 privilege_check(
1388 &schema.privileges,
1389 dropped_roles,
1390 &mut dependent_objects,
1391 &schema_id,
1392 &catalog,
1393 );
1394 }
1395 }
1396 for cluster in self.catalog.clusters() {
1397 let cluster_id = SystemObjectId::Object(cluster.id().into());
1398 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1399 let object_description =
1400 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1401 dependent_objects
1402 .entry(role_name.to_string())
1403 .or_default()
1404 .push(format!("owner of {object_description}"));
1405 }
1406 privilege_check(
1407 &cluster.privileges,
1408 dropped_roles,
1409 &mut dependent_objects,
1410 &cluster_id,
1411 &catalog,
1412 );
1413 for replica in cluster.replicas() {
1414 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1415 let replica_id =
1416 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1417 let object_description =
1418 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1419 dependent_objects
1420 .entry(role_name.to_string())
1421 .or_default()
1422 .push(format!("owner of {object_description}"));
1423 }
1424 }
1425 }
1426 privilege_check(
1427 self.catalog().system_privileges(),
1428 dropped_roles,
1429 &mut dependent_objects,
1430 &SystemObjectId::System,
1431 &catalog,
1432 );
1433 for (default_privilege_object, default_privilege_acl_items) in
1434 self.catalog.default_privileges()
1435 {
1436 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1437 dependent_objects
1438 .entry(role_name.to_string())
1439 .or_default()
1440 .push(format!(
1441 "default privileges on {}S created by {}",
1442 default_privilege_object.object_type, role_name
1443 ));
1444 }
1445 for default_privilege_acl_item in default_privilege_acl_items {
1446 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1447 dependent_objects
1448 .entry(role_name.to_string())
1449 .or_default()
1450 .push(format!(
1451 "default privileges on {}S granted to {}",
1452 default_privilege_object.object_type, role_name
1453 ));
1454 }
1455 }
1456 }
1457
1458 if !dependent_objects.is_empty() {
1459 Err(AdapterError::DependentObject(dependent_objects))
1460 } else {
1461 Ok(())
1462 }
1463 }
1464
1465 #[instrument]
1466 pub(super) async fn sequence_drop_owned(
1467 &mut self,
1468 session: &Session,
1469 plan: plan::DropOwnedPlan,
1470 ) -> Result<ExecuteResponse, AdapterError> {
1471 for role_id in &plan.role_ids {
1472 self.catalog().ensure_not_reserved_role(role_id)?;
1473 }
1474
1475 let mut privilege_revokes = plan.privilege_revokes;
1476
1477 let session_catalog = self.catalog().for_session(session);
1479 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1480 && !session.is_superuser()
1481 {
1482 let role_membership =
1484 session_catalog.collect_role_membership(session.current_role_id());
1485 let invalid_revokes: BTreeSet<_> = privilege_revokes
1486 .extract_if(.., |(_, privilege)| {
1487 !role_membership.contains(&privilege.grantor)
1488 })
1489 .map(|(object_id, _)| object_id)
1490 .collect();
1491 for invalid_revoke in invalid_revokes {
1492 let object_description =
1493 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1494 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1495 }
1496 }
1497
1498 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1499 catalog::Op::UpdatePrivilege {
1500 target_id: object_id,
1501 privilege,
1502 variant: UpdatePrivilegeVariant::Revoke,
1503 }
1504 });
1505 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1506 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1507 privilege_object,
1508 privilege_acl_item,
1509 variant: UpdatePrivilegeVariant::Revoke,
1510 },
1511 );
1512 let DropOps {
1513 ops: drop_ops,
1514 dropped_active_db,
1515 dropped_active_cluster,
1516 dropped_in_use_indexes,
1517 } = self.sequence_drop_common(session, plan.drop_ids)?;
1518
1519 let ops = privilege_revoke_ops
1520 .chain(default_privilege_revoke_ops)
1521 .chain(drop_ops.into_iter())
1522 .collect();
1523
1524 self.catalog_transact(Some(session), ops).await?;
1525
1526 if dropped_active_db {
1527 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1528 name: session.vars().database().to_string(),
1529 });
1530 }
1531 if dropped_active_cluster {
1532 session.add_notice(AdapterNotice::DroppedActiveCluster {
1533 name: session.vars().cluster().to_string(),
1534 });
1535 }
1536 for dropped_in_use_index in dropped_in_use_indexes {
1537 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1538 }
1539 Ok(ExecuteResponse::DroppedOwned)
1540 }
1541
1542 fn sequence_drop_common(
1543 &self,
1544 session: &Session,
1545 ids: Vec<ObjectId>,
1546 ) -> Result<DropOps, AdapterError> {
1547 let mut dropped_active_db = false;
1548 let mut dropped_active_cluster = false;
1549 let mut dropped_in_use_indexes = Vec::new();
1550 let mut dropped_roles = BTreeMap::new();
1551 let mut dropped_databases = BTreeSet::new();
1552 let mut dropped_schemas = BTreeSet::new();
1553 let mut role_revokes = BTreeSet::new();
1557 let mut default_privilege_revokes = BTreeSet::new();
1560
1561 let mut clusters_to_drop = BTreeSet::new();
1563
1564 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1565 for id in &ids {
1566 match id {
1567 ObjectId::Database(id) => {
1568 let name = self.catalog().get_database(id).name();
1569 if name == session.vars().database() {
1570 dropped_active_db = true;
1571 }
1572 dropped_databases.insert(id);
1573 }
1574 ObjectId::Schema((_, spec)) => {
1575 if let SchemaSpecifier::Id(id) = spec {
1576 dropped_schemas.insert(id);
1577 }
1578 }
1579 ObjectId::Cluster(id) => {
1580 clusters_to_drop.insert(*id);
1581 if let Some(active_id) = self
1582 .catalog()
1583 .active_cluster(session)
1584 .ok()
1585 .map(|cluster| cluster.id())
1586 {
1587 if id == &active_id {
1588 dropped_active_cluster = true;
1589 }
1590 }
1591 }
1592 ObjectId::Role(id) => {
1593 let role = self.catalog().get_role(id);
1594 let name = role.name();
1595 dropped_roles.insert(*id, name);
1596 for (group_id, grantor_id) in &role.membership.map {
1598 role_revokes.insert((*group_id, *id, *grantor_id));
1599 }
1600 }
1601 ObjectId::Item(id) => {
1602 if let Some(index) = self.catalog().get_entry(id).index() {
1603 let humanizer = self.catalog().for_session(session);
1604 let dependants = self
1605 .controller
1606 .compute
1607 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1608 .ok()
1609 .into_iter()
1610 .flatten()
1611 .filter(|dependant_id| {
1612 if dependant_id.is_transient() {
1619 return false;
1620 }
1621 let Some(dependent_id) = humanizer
1623 .try_get_item_by_global_id(dependant_id)
1624 .map(|item| item.id())
1625 else {
1626 return false;
1627 };
1628 !ids_set.contains(&ObjectId::Item(dependent_id))
1631 })
1632 .flat_map(|dependant_id| {
1633 humanizer.humanize_id(dependant_id)
1637 })
1638 .collect_vec();
1639 if !dependants.is_empty() {
1640 dropped_in_use_indexes.push(DroppedInUseIndex {
1641 index_name: humanizer
1642 .humanize_id(index.global_id())
1643 .unwrap_or_else(|| id.to_string()),
1644 dependant_objects: dependants,
1645 });
1646 }
1647 }
1648 }
1649 _ => {}
1650 }
1651 }
1652
1653 for id in &ids {
1654 match id {
1655 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1659 if !clusters_to_drop.contains(cluster_id) {
1660 let cluster = self.catalog.get_cluster(*cluster_id);
1661 if cluster.is_managed() {
1662 let replica =
1663 cluster.replica(*replica_id).expect("Catalog out of sync");
1664 if !replica.config.location.internal() {
1665 coord_bail!("cannot drop replica of managed cluster");
1666 }
1667 }
1668 }
1669 }
1670 _ => {}
1671 }
1672 }
1673
1674 for role_id in dropped_roles.keys() {
1675 self.catalog().ensure_not_reserved_role(role_id)?;
1676 }
1677 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1678 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1680 for role in self.catalog().user_roles() {
1681 for dropped_role_id in
1682 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1683 {
1684 role_revokes.insert((
1685 **dropped_role_id,
1686 role.id(),
1687 *role
1688 .membership
1689 .map
1690 .get(*dropped_role_id)
1691 .expect("included in keys above"),
1692 ));
1693 }
1694 }
1695
1696 for (default_privilege_object, default_privilege_acls) in
1697 self.catalog().default_privileges()
1698 {
1699 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1700 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1701 {
1702 for default_privilege_acl in default_privilege_acls {
1703 default_privilege_revokes.insert((
1704 default_privilege_object.clone(),
1705 default_privilege_acl.clone(),
1706 ));
1707 }
1708 }
1709 }
1710
1711 let ops = role_revokes
1712 .into_iter()
1713 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1714 role_id,
1715 member_id,
1716 grantor_id,
1717 })
1718 .chain(default_privilege_revokes.into_iter().map(
1719 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1720 privilege_object,
1721 privilege_acl_item,
1722 variant: UpdatePrivilegeVariant::Revoke,
1723 },
1724 ))
1725 .chain(iter::once(catalog::Op::DropObjects(
1726 ids.into_iter()
1727 .map(DropObjectInfo::manual_drop_from_object_id)
1728 .collect(),
1729 )))
1730 .collect();
1731
1732 Ok(DropOps {
1733 ops,
1734 dropped_active_db,
1735 dropped_active_cluster,
1736 dropped_in_use_indexes,
1737 })
1738 }
1739
1740 pub(super) fn sequence_explain_schema(
1741 &self,
1742 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1743 ) -> Result<ExecuteResponse, AdapterError> {
1744 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1745 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1746 })?;
1747
1748 let json_string = json_string(&json_value);
1749 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1750 Ok(Self::send_immediate_rows(row))
1751 }
1752
1753 pub(super) fn sequence_show_all_variables(
1754 &self,
1755 session: &Session,
1756 ) -> Result<ExecuteResponse, AdapterError> {
1757 let mut rows = viewable_variables(self.catalog().state(), session)
1758 .map(|v| (v.name(), v.value(), v.description()))
1759 .collect::<Vec<_>>();
1760 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1761
1762 let rows: Vec<_> = rows
1764 .into_iter()
1765 .map(|(name, val, desc)| {
1766 Row::pack_slice(&[
1767 Datum::String(name),
1768 Datum::String(&val),
1769 Datum::String(desc),
1770 ])
1771 })
1772 .collect();
1773 Ok(Self::send_immediate_rows(rows))
1774 }
1775
1776 pub(super) fn sequence_show_variable(
1777 &self,
1778 session: &Session,
1779 plan: plan::ShowVariablePlan,
1780 ) -> Result<ExecuteResponse, AdapterError> {
1781 if &plan.name == SCHEMA_ALIAS {
1782 let schemas = self.catalog.resolve_search_path(session);
1783 let schema = schemas.first();
1784 return match schema {
1785 Some((database_spec, schema_spec)) => {
1786 let schema_name = &self
1787 .catalog
1788 .get_schema(database_spec, schema_spec, session.conn_id())
1789 .name()
1790 .schema;
1791 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1792 Ok(Self::send_immediate_rows(row))
1793 }
1794 None => {
1795 if session.vars().current_object_missing_warnings() {
1796 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1797 search_path: session
1798 .vars()
1799 .search_path()
1800 .into_iter()
1801 .map(|schema| schema.to_string())
1802 .collect(),
1803 });
1804 }
1805 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1806 }
1807 };
1808 }
1809
1810 let variable = session
1811 .vars()
1812 .get(self.catalog().system_config(), &plan.name)
1813 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1814
1815 variable.visible(session.user(), self.catalog().system_config())?;
1818
1819 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1820 if variable.name() == vars::DATABASE.name()
1821 && matches!(
1822 self.catalog().resolve_database(&variable.value()),
1823 Err(CatalogError::UnknownDatabase(_))
1824 )
1825 && session.vars().current_object_missing_warnings()
1826 {
1827 let name = variable.value();
1828 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1829 } else if variable.name() == vars::CLUSTER.name()
1830 && matches!(
1831 self.catalog().resolve_cluster(&variable.value()),
1832 Err(CatalogError::UnknownCluster(_))
1833 )
1834 && session.vars().current_object_missing_warnings()
1835 {
1836 let name = variable.value();
1837 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1838 }
1839 Ok(Self::send_immediate_rows(row))
1840 }
1841
1842 #[instrument]
1843 pub(super) async fn sequence_inspect_shard(
1844 &self,
1845 session: &Session,
1846 plan: plan::InspectShardPlan,
1847 ) -> Result<ExecuteResponse, AdapterError> {
1848 if !session.user().is_internal() {
1851 return Err(AdapterError::Unauthorized(
1852 rbac::UnauthorizedError::MzSystem {
1853 action: "inspect".into(),
1854 },
1855 ));
1856 }
1857 let state = self
1858 .controller
1859 .storage
1860 .inspect_persist_state(plan.id)
1861 .await?;
1862 let jsonb = Jsonb::from_serde_json(state)?;
1863 Ok(Self::send_immediate_rows(jsonb.into_row()))
1864 }
1865
1866 #[instrument]
1867 pub(super) fn sequence_set_variable(
1868 &self,
1869 session: &mut Session,
1870 plan: plan::SetVariablePlan,
1871 ) -> Result<ExecuteResponse, AdapterError> {
1872 let (name, local) = (plan.name, plan.local);
1873 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1874 self.validate_set_isolation_level(session)?;
1875 }
1876 if &name == vars::CLUSTER.name() {
1877 self.validate_set_cluster(session)?;
1878 }
1879
1880 let vars = session.vars_mut();
1881 let values = match plan.value {
1882 plan::VariableValue::Default => None,
1883 plan::VariableValue::Values(values) => Some(values),
1884 };
1885
1886 match values {
1887 Some(values) => {
1888 vars.set(
1889 self.catalog().system_config(),
1890 &name,
1891 VarInput::SqlSet(&values),
1892 local,
1893 )?;
1894
1895 let vars = session.vars();
1896
1897 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1900 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1901 } else if name == vars::CLUSTER.name()
1902 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1903 {
1904 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1905 }
1906
1907 if name.as_str() == vars::DATABASE.name()
1909 && matches!(
1910 self.catalog().resolve_database(vars.database()),
1911 Err(CatalogError::UnknownDatabase(_))
1912 )
1913 && session.vars().current_object_missing_warnings()
1914 {
1915 let name = vars.database().to_string();
1916 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1917 } else if name.as_str() == vars::CLUSTER.name()
1918 && matches!(
1919 self.catalog().resolve_cluster(vars.cluster()),
1920 Err(CatalogError::UnknownCluster(_))
1921 )
1922 && session.vars().current_object_missing_warnings()
1923 {
1924 let name = vars.cluster().to_string();
1925 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1926 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1927 let v = values.into_first().to_lowercase();
1928 if v == IsolationLevel::ReadUncommitted.as_str()
1929 || v == IsolationLevel::ReadCommitted.as_str()
1930 || v == IsolationLevel::RepeatableRead.as_str()
1931 {
1932 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1933 isolation_level: v,
1934 });
1935 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1936 session.add_notice(AdapterNotice::StrongSessionSerializable);
1937 }
1938 }
1939 }
1940 None => vars.reset(self.catalog().system_config(), &name, local)?,
1941 }
1942
1943 Ok(ExecuteResponse::SetVariable { name, reset: false })
1944 }
1945
1946 pub(super) fn sequence_reset_variable(
1947 &self,
1948 session: &mut Session,
1949 plan: plan::ResetVariablePlan,
1950 ) -> Result<ExecuteResponse, AdapterError> {
1951 let name = plan.name;
1952 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1953 self.validate_set_isolation_level(session)?;
1954 }
1955 if &name == vars::CLUSTER.name() {
1956 self.validate_set_cluster(session)?;
1957 }
1958 session
1959 .vars_mut()
1960 .reset(self.catalog().system_config(), &name, false)?;
1961 Ok(ExecuteResponse::SetVariable { name, reset: true })
1962 }
1963
1964 pub(super) fn sequence_set_transaction(
1965 &self,
1966 session: &mut Session,
1967 plan: plan::SetTransactionPlan,
1968 ) -> Result<ExecuteResponse, AdapterError> {
1969 for mode in plan.modes {
1971 match mode {
1972 TransactionMode::AccessMode(_) => {
1973 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1974 }
1975 TransactionMode::IsolationLevel(isolation_level) => {
1976 self.validate_set_isolation_level(session)?;
1977
1978 session.vars_mut().set(
1979 self.catalog().system_config(),
1980 TRANSACTION_ISOLATION_VAR_NAME,
1981 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1982 plan.local,
1983 )?
1984 }
1985 }
1986 }
1987 Ok(ExecuteResponse::SetVariable {
1988 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1989 reset: false,
1990 })
1991 }
1992
1993 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1994 if session.transaction().contains_ops() {
1995 Err(AdapterError::InvalidSetIsolationLevel)
1996 } else {
1997 Ok(())
1998 }
1999 }
2000
2001 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2002 if session.transaction().contains_ops() {
2003 Err(AdapterError::InvalidSetCluster)
2004 } else {
2005 Ok(())
2006 }
2007 }
2008
2009 #[instrument]
2010 pub(super) async fn sequence_end_transaction(
2011 &mut self,
2012 mut ctx: ExecuteContext,
2013 mut action: EndTransactionAction,
2014 ) {
2015 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2017 (&action, ctx.session().transaction())
2018 {
2019 action = EndTransactionAction::Rollback;
2020 }
2021 let response = match action {
2022 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2023 params: BTreeMap::new(),
2024 }),
2025 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2026 params: BTreeMap::new(),
2027 }),
2028 };
2029
2030 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2031
2032 let (response, action) = match result {
2033 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2034 (response, action)
2035 }
2036 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2037 let validated_locks = match write_lock_guards {
2041 None => None,
2042 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2043 Ok(locks) => Some(locks),
2044 Err(missing) => {
2045 tracing::error!(?missing, "programming error, missing write locks");
2046 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2047 }
2048 },
2049 };
2050
2051 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2052 for WriteOp { id, rows } in writes {
2053 let total_rows = collected_writes.entry(id).or_default();
2054 total_rows.push(rows);
2055 }
2056
2057 self.submit_write(PendingWriteTxn::User {
2058 span: Span::current(),
2059 writes: collected_writes,
2060 write_locks: validated_locks,
2061 pending_txn: PendingTxn {
2062 ctx,
2063 response,
2064 action,
2065 },
2066 });
2067 return;
2068 }
2069 Ok((
2070 Some(TransactionOps::Peeks {
2071 determination,
2072 requires_linearization: RequireLinearization::Required,
2073 ..
2074 }),
2075 _,
2076 )) if ctx.session().vars().transaction_isolation()
2077 == &IsolationLevel::StrictSerializable =>
2078 {
2079 let conn_id = ctx.session().conn_id().clone();
2080 let pending_read_txn = PendingReadTxn {
2081 txn: PendingRead::Read {
2082 txn: PendingTxn {
2083 ctx,
2084 response,
2085 action,
2086 },
2087 },
2088 timestamp_context: determination.timestamp_context,
2089 created: Instant::now(),
2090 num_requeues: 0,
2091 otel_ctx: OpenTelemetryContext::obtain(),
2092 };
2093 self.strict_serializable_reads_tx
2094 .send((conn_id, pending_read_txn))
2095 .expect("sending to strict_serializable_reads_tx cannot fail");
2096 return;
2097 }
2098 Ok((
2099 Some(TransactionOps::Peeks {
2100 determination,
2101 requires_linearization: RequireLinearization::Required,
2102 ..
2103 }),
2104 _,
2105 )) if ctx.session().vars().transaction_isolation()
2106 == &IsolationLevel::StrongSessionSerializable =>
2107 {
2108 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2109 ctx.session_mut()
2110 .ensure_timestamp_oracle(timeline.clone())
2111 .apply_write(*ts);
2112 }
2113 (response, action)
2114 }
2115 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2116 self.internal_cmd_tx
2117 .send(Message::ExecuteSingleStatementTransaction {
2118 ctx,
2119 otel_ctx: OpenTelemetryContext::obtain(),
2120 stmt,
2121 params,
2122 })
2123 .expect("must send");
2124 return;
2125 }
2126 Ok((_, _)) => (response, action),
2127 Err(err) => (Err(err), EndTransactionAction::Rollback),
2128 };
2129 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2130 let response = response.map(|mut r| {
2132 r.extend_params(changed);
2133 ExecuteResponse::from(r)
2134 });
2135
2136 ctx.retire(response);
2137 }
2138
2139 #[instrument]
2140 async fn sequence_end_transaction_inner(
2141 &mut self,
2142 ctx: &mut ExecuteContext,
2143 action: EndTransactionAction,
2144 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2145 let txn = self.clear_transaction(ctx.session_mut()).await;
2146
2147 if let EndTransactionAction::Commit = action {
2148 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2149 match &mut ops {
2150 TransactionOps::Writes(writes) => {
2151 for WriteOp { id, .. } in &mut writes.iter() {
2152 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2154 AdapterError::Catalog(mz_catalog::memory::error::Error {
2155 kind: mz_catalog::memory::error::ErrorKind::Sql(
2156 CatalogError::UnknownItem(id.to_string()),
2157 ),
2158 })
2159 })?;
2160 }
2161
2162 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2164 }
2165 TransactionOps::DDL {
2166 ops,
2167 state: _,
2168 side_effects,
2169 revision,
2170 } => {
2171 if *revision != self.catalog().transient_revision() {
2173 return Err(AdapterError::DDLTransactionRace);
2174 }
2175 let ops = std::mem::take(ops);
2177 let side_effects = std::mem::take(side_effects);
2178 self.catalog_transact_with_side_effects(
2179 Some(ctx),
2180 ops,
2181 move |a, mut ctx| {
2182 Box::pin(async move {
2183 for side_effect in side_effects {
2184 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2185 }
2186 })
2187 },
2188 )
2189 .await?;
2190 }
2191 _ => (),
2192 }
2193 return Ok((Some(ops), write_lock_guards));
2194 }
2195 }
2196
2197 Ok((None, None))
2198 }
2199
2200 pub(super) async fn sequence_side_effecting_func(
2201 &mut self,
2202 ctx: ExecuteContext,
2203 plan: SideEffectingFunc,
2204 ) {
2205 match plan {
2206 SideEffectingFunc::PgCancelBackend { connection_id } => {
2207 if ctx.session().conn_id().unhandled() == connection_id {
2208 ctx.retire(Err(AdapterError::Canceled));
2212 return;
2213 }
2214
2215 let res = if let Some((id_handle, _conn_meta)) =
2216 self.active_conns.get_key_value(&connection_id)
2217 {
2218 self.handle_privileged_cancel(id_handle.clone()).await;
2220 Datum::True
2221 } else {
2222 Datum::False
2223 };
2224 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2225 }
2226 }
2227 }
2228
2229 pub(crate) async fn determine_real_time_recent_timestamp(
2233 &self,
2234 source_ids: impl Iterator<Item = GlobalId>,
2235 real_time_recency_timeout: Duration,
2236 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2237 {
2238 let item_ids = source_ids
2239 .map(|gid| {
2240 self.catalog
2241 .try_resolve_item_id(&gid)
2242 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2243 })
2244 .collect::<Result<Vec<_>, _>>()?;
2245
2246 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2252 if to_visit.is_empty() {
2255 return Ok(None);
2256 }
2257
2258 let mut timestamp_objects = BTreeSet::new();
2259
2260 while let Some(id) = to_visit.pop_front() {
2261 timestamp_objects.insert(id);
2262 to_visit.extend(
2263 self.catalog()
2264 .get_entry(&id)
2265 .uses()
2266 .into_iter()
2267 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2268 );
2269 }
2270 let timestamp_objects = timestamp_objects
2271 .into_iter()
2272 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2273 .collect();
2274
2275 let r = self
2276 .controller
2277 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2278 .await?;
2279
2280 Ok(Some(r))
2281 }
2282
2283 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2286 &self,
2287 session: &Session,
2288 source_ids: impl Iterator<Item = GlobalId>,
2289 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2290 {
2291 let vars = session.vars();
2292
2293 if vars.real_time_recency()
2294 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2295 && !session.contains_read_timestamp()
2296 {
2297 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2298 .await
2299 } else {
2300 Ok(None)
2301 }
2302 }
2303
2304 #[instrument]
2305 pub(super) async fn sequence_explain_plan(
2306 &mut self,
2307 ctx: ExecuteContext,
2308 plan: plan::ExplainPlanPlan,
2309 target_cluster: TargetCluster,
2310 ) {
2311 match &plan.explainee {
2312 plan::Explainee::Statement(stmt) => match stmt {
2313 plan::ExplaineeStatement::CreateView { .. } => {
2314 self.explain_create_view(ctx, plan).await;
2315 }
2316 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2317 self.explain_create_materialized_view(ctx, plan).await;
2318 }
2319 plan::ExplaineeStatement::CreateIndex { .. } => {
2320 self.explain_create_index(ctx, plan).await;
2321 }
2322 plan::ExplaineeStatement::Select { .. } => {
2323 self.explain_peek(ctx, plan, target_cluster).await;
2324 }
2325 },
2326 plan::Explainee::View(_) => {
2327 let result = self.explain_view(&ctx, plan);
2328 ctx.retire(result);
2329 }
2330 plan::Explainee::MaterializedView(_) => {
2331 let result = self.explain_materialized_view(&ctx, plan);
2332 ctx.retire(result);
2333 }
2334 plan::Explainee::Index(_) => {
2335 let result = self.explain_index(&ctx, plan);
2336 ctx.retire(result);
2337 }
2338 plan::Explainee::ReplanView(_) => {
2339 self.explain_replan_view(ctx, plan).await;
2340 }
2341 plan::Explainee::ReplanMaterializedView(_) => {
2342 self.explain_replan_materialized_view(ctx, plan).await;
2343 }
2344 plan::Explainee::ReplanIndex(_) => {
2345 self.explain_replan_index(ctx, plan).await;
2346 }
2347 };
2348 }
2349
2350 pub(super) async fn sequence_explain_pushdown(
2351 &mut self,
2352 ctx: ExecuteContext,
2353 plan: plan::ExplainPushdownPlan,
2354 target_cluster: TargetCluster,
2355 ) {
2356 match plan.explainee {
2357 Explainee::Statement(ExplaineeStatement::Select {
2358 broken: false,
2359 plan,
2360 desc: _,
2361 }) => {
2362 let stage = return_if_err!(
2363 self.peek_validate(
2364 ctx.session(),
2365 plan,
2366 target_cluster,
2367 None,
2368 ExplainContext::Pushdown,
2369 Some(ctx.session().vars().max_query_result_size()),
2370 ),
2371 ctx
2372 );
2373 self.sequence_staged(ctx, Span::current(), stage).await;
2374 }
2375 Explainee::MaterializedView(item_id) => {
2376 self.explain_pushdown_materialized_view(ctx, item_id).await;
2377 }
2378 _ => {
2379 ctx.retire(Err(AdapterError::Unsupported(
2380 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2381 )));
2382 }
2383 };
2384 }
2385
2386 async fn execute_explain_pushdown_with_read_holds(
2388 &self,
2389 ctx: ExecuteContext,
2390 as_of: Antichain<Timestamp>,
2391 mz_now: ResultSpec<'static>,
2392 read_holds: Option<ReadHolds<Timestamp>>,
2393 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2394 ) {
2395 let fut = self
2396 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2397 .await;
2398 task::spawn(|| "render explain pushdown", async move {
2399 let _read_holds = read_holds;
2401 let res = fut.await;
2402 ctx.retire(res);
2403 });
2404 }
2405
2406 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2408 &self,
2409 session: &Session,
2410 as_of: Antichain<Timestamp>,
2411 mz_now: ResultSpec<'static>,
2412 imports: I,
2413 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2414 super::explain_pushdown_future_inner(
2416 session,
2417 &self.catalog,
2418 &self.controller.storage_collections,
2419 as_of,
2420 mz_now,
2421 imports,
2422 )
2423 .await
2424 }
2425
2426 #[instrument]
2427 pub(super) async fn sequence_insert(
2428 &mut self,
2429 mut ctx: ExecuteContext,
2430 plan: plan::InsertPlan,
2431 ) {
2432 if !ctx.session_mut().transaction().allows_writes() {
2440 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2441 return;
2442 }
2443
2444 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2458 let expr = return_if_err!(
2461 plan.values
2462 .clone()
2463 .lower(self.catalog().system_config(), None),
2464 ctx
2465 );
2466 OptimizedMirRelationExpr(expr)
2467 } else {
2468 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2470
2471 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2473
2474 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2476 };
2477
2478 match optimized_mir.into_inner() {
2479 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2480 let catalog = self.owned_catalog();
2481 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2482 let result =
2483 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2484 ctx.retire(result);
2485 });
2486 }
2487 _ => {
2489 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2490 Some(table) => {
2491 let fullname = self
2492 .catalog()
2493 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2494 table
2496 .desc_latest(&fullname)
2497 .expect("desc called on table")
2498 .arity()
2499 }
2500 None => {
2501 ctx.retire(Err(AdapterError::Catalog(
2502 mz_catalog::memory::error::Error {
2503 kind: mz_catalog::memory::error::ErrorKind::Sql(
2504 CatalogError::UnknownItem(plan.id.to_string()),
2505 ),
2506 },
2507 )));
2508 return;
2509 }
2510 };
2511
2512 let finishing = RowSetFinishing {
2513 order_by: vec![],
2514 limit: None,
2515 offset: 0,
2516 project: (0..desc_arity).collect(),
2517 };
2518
2519 let read_then_write_plan = plan::ReadThenWritePlan {
2520 id: plan.id,
2521 selection: plan.values,
2522 finishing,
2523 assignments: BTreeMap::new(),
2524 kind: MutationKind::Insert,
2525 returning: plan.returning,
2526 };
2527
2528 self.sequence_read_then_write(ctx, read_then_write_plan)
2529 .await;
2530 }
2531 }
2532 }
2533
2534 #[instrument]
2539 pub(super) async fn sequence_read_then_write(
2540 &mut self,
2541 mut ctx: ExecuteContext,
2542 plan: plan::ReadThenWritePlan,
2543 ) {
2544 let mut source_ids: BTreeSet<_> = plan
2545 .selection
2546 .depends_on()
2547 .into_iter()
2548 .map(|gid| self.catalog().resolve_item_id(&gid))
2549 .collect();
2550 source_ids.insert(plan.id);
2551
2552 if ctx.session().transaction().write_locks().is_none() {
2554 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2556
2557 for id in &source_ids {
2559 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2560 write_locks.insert_lock(*id, lock);
2561 }
2562 }
2563
2564 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2566 Ok(locks) => locks,
2567 Err(missing) => {
2568 let role_metadata = ctx.session().role_metadata().clone();
2570 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2571 let plan = DeferredPlan {
2572 ctx,
2573 plan: Plan::ReadThenWrite(plan),
2574 validity: PlanValidity::new(
2575 self.catalog.transient_revision(),
2576 source_ids.clone(),
2577 None,
2578 None,
2579 role_metadata,
2580 ),
2581 requires_locks: source_ids,
2582 };
2583 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2584 }
2585 };
2586
2587 ctx.session_mut()
2588 .try_grant_write_locks(write_locks)
2589 .expect("session has already been granted write locks");
2590 }
2591
2592 let plan::ReadThenWritePlan {
2593 id,
2594 kind,
2595 selection,
2596 mut assignments,
2597 finishing,
2598 mut returning,
2599 } = plan;
2600
2601 let desc = match self.catalog().try_get_entry(&id) {
2603 Some(table) => {
2604 let full_name = self
2605 .catalog()
2606 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2607 table
2609 .desc_latest(&full_name)
2610 .expect("desc called on table")
2611 .into_owned()
2612 }
2613 None => {
2614 ctx.retire(Err(AdapterError::Catalog(
2615 mz_catalog::memory::error::Error {
2616 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2617 id.to_string(),
2618 )),
2619 },
2620 )));
2621 return;
2622 }
2623 };
2624
2625 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2627 || assignments.values().any(|e| e.contains_temporal())
2628 || returning.iter().any(|e| e.contains_temporal());
2629 if contains_temporal {
2630 ctx.retire(Err(AdapterError::Unsupported(
2631 "calls to mz_now in write statements",
2632 )));
2633 return;
2634 }
2635
2636 fn validate_read_dependencies(
2644 catalog: &Catalog,
2645 id: &CatalogItemId,
2646 ) -> Result<(), AdapterError> {
2647 use CatalogItemType::*;
2648 use mz_catalog::memory::objects;
2649 let mut ids_to_check = Vec::new();
2650 let valid = match catalog.try_get_entry(id) {
2651 Some(entry) => {
2652 if let CatalogItem::View(objects::View { optimized_expr, .. })
2653 | CatalogItem::MaterializedView(objects::MaterializedView {
2654 optimized_expr,
2655 ..
2656 }) = entry.item()
2657 {
2658 if optimized_expr.contains_temporal() {
2659 return Err(AdapterError::Unsupported(
2660 "calls to mz_now in write statements",
2661 ));
2662 }
2663 }
2664 match entry.item().typ() {
2665 typ @ (Func | View | MaterializedView | ContinualTask) => {
2666 ids_to_check.extend(entry.uses());
2667 let valid_id = id.is_user() || matches!(typ, Func);
2668 valid_id
2669 }
2670 Source | Secret | Connection => false,
2671 Sink | Index => unreachable!(),
2673 Table => {
2674 if !id.is_user() {
2675 false
2677 } else {
2678 entry.source_export_details().is_none()
2680 }
2681 }
2682 Type => true,
2683 }
2684 }
2685 None => false,
2686 };
2687 if !valid {
2688 return Err(AdapterError::InvalidTableMutationSelection);
2689 }
2690 for id in ids_to_check {
2691 validate_read_dependencies(catalog, &id)?;
2692 }
2693 Ok(())
2694 }
2695
2696 for gid in selection.depends_on() {
2697 let item_id = self.catalog().resolve_item_id(&gid);
2698 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2699 ctx.retire(Err(err));
2700 return;
2701 }
2702 }
2703
2704 let (peek_tx, peek_rx) = oneshot::channel();
2705 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2706 let (tx, _, session, extra) = ctx.into_parts();
2707 let peek_ctx = ExecuteContext::from_parts(
2719 peek_client_tx,
2720 self.internal_cmd_tx.clone(),
2721 session,
2722 Default::default(),
2723 );
2724
2725 self.sequence_peek(
2726 peek_ctx,
2727 plan::SelectPlan {
2728 select: None,
2729 source: selection,
2730 when: QueryWhen::FreshestTableWrite,
2731 finishing,
2732 copy_to: None,
2733 },
2734 TargetCluster::Active,
2735 None,
2736 )
2737 .await;
2738
2739 let internal_cmd_tx = self.internal_cmd_tx.clone();
2740 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2741 let catalog = self.owned_catalog();
2742 let max_result_size = self.catalog().system_config().max_result_size();
2743
2744 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2745 let (peek_response, session) = match peek_rx.await {
2746 Ok(Response {
2747 result: Ok(resp),
2748 session,
2749 otel_ctx,
2750 }) => {
2751 otel_ctx.attach_as_parent();
2752 (resp, session)
2753 }
2754 Ok(Response {
2755 result: Err(e),
2756 session,
2757 otel_ctx,
2758 }) => {
2759 let ctx =
2760 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2761 otel_ctx.attach_as_parent();
2762 ctx.retire(Err(e));
2763 return;
2764 }
2765 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2767 };
2768 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2769 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2770
2771 if timeout_dur == Duration::ZERO {
2773 timeout_dur = Duration::MAX;
2774 }
2775
2776 let style = ExprPrepStyle::OneShot {
2777 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2779 catalog_state: catalog.state(),
2780 };
2781 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2782 return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
2783 }
2784
2785 let make_diffs =
2786 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2787 let arena = RowArena::new();
2788 let mut diffs = Vec::new();
2789 let mut datum_vec = mz_repr::DatumVec::new();
2790
2791 while let Some(row) = rows.next() {
2792 if !assignments.is_empty() {
2793 assert!(
2794 matches!(kind, MutationKind::Update),
2795 "only updates support assignments"
2796 );
2797 let mut datums = datum_vec.borrow_with(row);
2798 let mut updates = vec![];
2799 for (idx, expr) in &assignments {
2800 let updated = match expr.eval(&datums, &arena) {
2801 Ok(updated) => updated,
2802 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2803 };
2804 updates.push((*idx, updated));
2805 }
2806 for (idx, new_value) in updates {
2807 datums[idx] = new_value;
2808 }
2809 let updated = Row::pack_slice(&datums);
2810 diffs.push((updated, Diff::ONE));
2811 }
2812 match kind {
2813 MutationKind::Update | MutationKind::Delete => {
2817 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2818 }
2819 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2820 }
2821 }
2822
2823 let mut byte_size: u64 = 0;
2826 for (row, diff) in &diffs {
2827 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2828 if diff.is_positive() {
2829 for (idx, datum) in row.iter().enumerate() {
2830 desc.constraints_met(idx, &datum)?;
2831 }
2832 }
2833 }
2834 Ok((diffs, byte_size))
2835 };
2836
2837 let diffs = match peek_response {
2838 ExecuteResponse::SendingRowsStreaming {
2839 rows: mut rows_stream,
2840 ..
2841 } => {
2842 let mut byte_size: u64 = 0;
2843 let mut diffs = Vec::new();
2844 let result = loop {
2845 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2846 Ok(Some(res)) => match res {
2847 PeekResponseUnary::Rows(new_rows) => {
2848 match make_diffs(new_rows) {
2849 Ok((mut new_diffs, new_byte_size)) => {
2850 byte_size = byte_size.saturating_add(new_byte_size);
2851 if byte_size > max_result_size {
2852 break Err(AdapterError::ResultSize(format!(
2853 "result exceeds max size of {max_result_size}"
2854 )));
2855 }
2856 diffs.append(&mut new_diffs)
2857 }
2858 Err(e) => break Err(e),
2859 };
2860 }
2861 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2862 PeekResponseUnary::Error(e) => {
2863 break Err(AdapterError::Unstructured(anyhow!(e)));
2864 }
2865 },
2866 Ok(None) => break Ok(diffs),
2867 Err(_) => {
2868 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2873 conn_id: ctx.session().conn_id().clone(),
2874 });
2875 if let Err(e) = result {
2876 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2877 }
2878 break Err(AdapterError::StatementTimeout);
2879 }
2880 }
2881 };
2882
2883 result
2884 }
2885 ExecuteResponse::SendingRowsImmediate { rows } => {
2886 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2887 }
2888 resp => Err(AdapterError::Unstructured(anyhow!(
2889 "unexpected peek response: {resp:?}"
2890 ))),
2891 };
2892
2893 let mut returning_rows = Vec::new();
2894 let mut diff_err: Option<AdapterError> = None;
2895 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2896 let arena = RowArena::new();
2897 for (row, diff) in diffs {
2898 if !diff.is_positive() {
2899 continue;
2900 }
2901 let mut returning_row = Row::with_capacity(returning.len());
2902 let mut packer = returning_row.packer();
2903 for expr in &returning {
2904 let datums: Vec<_> = row.iter().collect();
2905 match expr.eval(&datums, &arena) {
2906 Ok(datum) => {
2907 packer.push(datum);
2908 }
2909 Err(err) => {
2910 diff_err = Some(err.into());
2911 break;
2912 }
2913 }
2914 }
2915 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2916 let diff = match NonZeroUsize::try_from(diff) {
2917 Ok(diff) => diff,
2918 Err(err) => {
2919 diff_err = Some(err.into());
2920 break;
2921 }
2922 };
2923 returning_rows.push((returning_row, diff));
2924 if diff_err.is_some() {
2925 break;
2926 }
2927 }
2928 }
2929 let diffs = if let Some(err) = diff_err {
2930 Err(err)
2931 } else {
2932 diffs
2933 };
2934
2935 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2938 if let Some(timestamp_context) = timestamp_context {
2947 let (tx, rx) = tokio::sync::oneshot::channel();
2948 let conn_id = ctx.session().conn_id().clone();
2949 let pending_read_txn = PendingReadTxn {
2950 txn: PendingRead::ReadThenWrite { ctx, tx },
2951 timestamp_context,
2952 created: Instant::now(),
2953 num_requeues: 0,
2954 otel_ctx: OpenTelemetryContext::obtain(),
2955 };
2956 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2957 if let Err(e) = result {
2959 warn!(
2960 "strict_serializable_reads_tx dropped before we could send: {:?}",
2961 e
2962 );
2963 return;
2964 }
2965 let result = rx.await;
2966 ctx = match result {
2968 Ok(Some(ctx)) => ctx,
2969 Ok(None) => {
2970 return;
2973 }
2974 Err(e) => {
2975 warn!(
2976 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
2977 e
2978 );
2979 return;
2980 }
2981 };
2982 }
2983
2984 match diffs {
2985 Ok(diffs) => {
2986 let result = Self::send_diffs(
2987 ctx.session_mut(),
2988 plan::SendDiffsPlan {
2989 id,
2990 updates: diffs,
2991 kind,
2992 returning: returning_rows,
2993 max_result_size,
2994 },
2995 );
2996 ctx.retire(result);
2997 }
2998 Err(e) => {
2999 ctx.retire(Err(e));
3000 }
3001 }
3002 });
3003 }
3004
3005 #[instrument]
3006 pub(super) async fn sequence_alter_item_rename(
3007 &mut self,
3008 ctx: &mut ExecuteContext,
3009 plan: plan::AlterItemRenamePlan,
3010 ) -> Result<ExecuteResponse, AdapterError> {
3011 let op = catalog::Op::RenameItem {
3012 id: plan.id,
3013 current_full_name: plan.current_full_name,
3014 to_name: plan.to_name,
3015 };
3016 match self
3017 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3018 .await
3019 {
3020 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3021 Err(err) => Err(err),
3022 }
3023 }
3024
3025 #[instrument]
3026 pub(super) async fn sequence_alter_retain_history(
3027 &mut self,
3028 ctx: &mut ExecuteContext,
3029 plan: plan::AlterRetainHistoryPlan,
3030 ) -> Result<ExecuteResponse, AdapterError> {
3031 let ops = vec![catalog::Op::AlterRetainHistory {
3032 id: plan.id,
3033 value: plan.value,
3034 window: plan.window,
3035 }];
3036 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3037 Box::pin(async move {
3038 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3039 let cluster = match catalog_item {
3040 CatalogItem::Table(_)
3041 | CatalogItem::MaterializedView(_)
3042 | CatalogItem::Source(_)
3043 | CatalogItem::ContinualTask(_) => None,
3044 CatalogItem::Index(index) => Some(index.cluster_id),
3045 CatalogItem::Log(_)
3046 | CatalogItem::View(_)
3047 | CatalogItem::Sink(_)
3048 | CatalogItem::Type(_)
3049 | CatalogItem::Func(_)
3050 | CatalogItem::Secret(_)
3051 | CatalogItem::Connection(_) => unreachable!(),
3052 };
3053 match cluster {
3054 Some(cluster) => {
3055 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3056 }
3057 None => {
3058 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3059 }
3060 }
3061 })
3062 })
3063 .await?;
3064 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3065 }
3066
3067 #[instrument]
3068 pub(super) async fn sequence_alter_schema_rename(
3069 &mut self,
3070 ctx: &mut ExecuteContext,
3071 plan: plan::AlterSchemaRenamePlan,
3072 ) -> Result<ExecuteResponse, AdapterError> {
3073 let (database_spec, schema_spec) = plan.cur_schema_spec;
3074 let op = catalog::Op::RenameSchema {
3075 database_spec,
3076 schema_spec,
3077 new_name: plan.new_schema_name,
3078 check_reserved_names: true,
3079 };
3080 match self
3081 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3082 .await
3083 {
3084 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3085 Err(err) => Err(err),
3086 }
3087 }
3088
3089 #[instrument]
3090 pub(super) async fn sequence_alter_schema_swap(
3091 &mut self,
3092 ctx: &mut ExecuteContext,
3093 plan: plan::AlterSchemaSwapPlan,
3094 ) -> Result<ExecuteResponse, AdapterError> {
3095 let plan::AlterSchemaSwapPlan {
3096 schema_a_spec: (schema_a_db, schema_a),
3097 schema_a_name,
3098 schema_b_spec: (schema_b_db, schema_b),
3099 schema_b_name,
3100 name_temp,
3101 } = plan;
3102
3103 let op_a = catalog::Op::RenameSchema {
3104 database_spec: schema_a_db,
3105 schema_spec: schema_a,
3106 new_name: name_temp,
3107 check_reserved_names: false,
3108 };
3109 let op_b = catalog::Op::RenameSchema {
3110 database_spec: schema_b_db,
3111 schema_spec: schema_b,
3112 new_name: schema_a_name,
3113 check_reserved_names: false,
3114 };
3115 let op_c = catalog::Op::RenameSchema {
3116 database_spec: schema_a_db,
3117 schema_spec: schema_a,
3118 new_name: schema_b_name,
3119 check_reserved_names: false,
3120 };
3121
3122 match self
3123 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3124 Box::pin(async {})
3125 })
3126 .await
3127 {
3128 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3129 Err(err) => Err(err),
3130 }
3131 }
3132
3133 #[instrument]
3134 pub(super) async fn sequence_alter_role(
3135 &mut self,
3136 session: &Session,
3137 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3138 ) -> Result<ExecuteResponse, AdapterError> {
3139 let catalog = self.catalog().for_session(session);
3140 let role = catalog.get_role(&id);
3141
3142 let mut notices = vec![];
3144
3145 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3147 let mut vars = role.vars().clone();
3148
3149 let mut nopassword = false;
3152
3153 match option {
3155 PlannedAlterRoleOption::Attributes(attrs) => {
3156 self.validate_role_attributes(&attrs.clone().into())?;
3157
3158 if let Some(inherit) = attrs.inherit {
3159 attributes.inherit = inherit;
3160 }
3161
3162 if let Some(password) = attrs.password {
3163 attributes.password = Some(password);
3164 attributes.scram_iterations =
3165 Some(self.catalog().system_config().scram_iterations())
3166 }
3167
3168 if let Some(superuser) = attrs.superuser {
3169 attributes.superuser = Some(superuser);
3170 }
3171
3172 if let Some(login) = attrs.login {
3173 attributes.login = Some(login);
3174 }
3175
3176 if attrs.nopassword.unwrap_or(false) {
3177 nopassword = true;
3178 }
3179
3180 if let Some(notice) = self.should_emit_rbac_notice(session) {
3181 notices.push(notice);
3182 }
3183 }
3184 PlannedAlterRoleOption::Variable(variable) => {
3185 let session_var = session.vars().inspect(variable.name())?;
3187 session_var.visible(session.user(), catalog.system_vars())?;
3189
3190 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3193 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3194 } else if let PlannedRoleVariable::Set {
3195 name,
3196 value: VariableValue::Values(vals),
3197 } = &variable
3198 {
3199 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3200 notices.push(AdapterNotice::IntrospectionClusterUsage);
3201 }
3202 }
3203
3204 let var_name = match variable {
3205 PlannedRoleVariable::Set { name, value } => {
3206 match &value {
3208 VariableValue::Default => {
3209 vars.remove(&name);
3210 }
3211 VariableValue::Values(vals) => {
3212 let var = match &vals[..] {
3213 [val] => OwnedVarInput::Flat(val.clone()),
3214 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3215 };
3216 session_var.check(var.borrow())?;
3218
3219 vars.insert(name.clone(), var);
3220 }
3221 };
3222 name
3223 }
3224 PlannedRoleVariable::Reset { name } => {
3225 vars.remove(&name);
3227 name
3228 }
3229 };
3230
3231 notices.push(AdapterNotice::VarDefaultUpdated {
3233 role: Some(name.clone()),
3234 var_name: Some(var_name),
3235 });
3236 }
3237 }
3238
3239 let op = catalog::Op::AlterRole {
3240 id,
3241 name,
3242 attributes,
3243 nopassword,
3244 vars: RoleVars { map: vars },
3245 };
3246 let response = self
3247 .catalog_transact(Some(session), vec![op])
3248 .await
3249 .map(|_| ExecuteResponse::AlteredRole)?;
3250
3251 session.add_notices(notices);
3253
3254 Ok(response)
3255 }
3256
3257 #[instrument]
3258 pub(super) async fn sequence_alter_sink_prepare(
3259 &mut self,
3260 ctx: ExecuteContext,
3261 plan: plan::AlterSinkPlan,
3262 ) {
3263 let id_bundle = crate::CollectionIdBundle {
3265 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3266 compute_ids: BTreeMap::new(),
3267 };
3268 let read_hold = self.acquire_read_holds(&id_bundle);
3269
3270 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3271 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3272 return;
3273 };
3274
3275 let otel_ctx = OpenTelemetryContext::obtain();
3276 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3277
3278 let plan_validity = PlanValidity::new(
3279 self.catalog().transient_revision(),
3280 BTreeSet::from_iter([plan.item_id, from_item_id]),
3281 Some(plan.in_cluster),
3282 None,
3283 ctx.session().role_metadata().clone(),
3284 );
3285
3286 info!(
3287 "preparing alter sink for {}: frontiers={:?} export={:?}",
3288 plan.global_id,
3289 self.controller
3290 .storage_collections
3291 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3292 self.controller.storage.export(plan.global_id)
3293 );
3294
3295 self.install_storage_watch_set(
3301 ctx.session().conn_id().clone(),
3302 BTreeSet::from_iter([plan.global_id]),
3303 read_ts,
3304 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3305 ctx: Some(ctx),
3306 otel_ctx,
3307 plan,
3308 plan_validity,
3309 read_hold,
3310 }),
3311 );
3312 }
3313
3314 #[instrument]
3315 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3316 ctx.otel_ctx.attach_as_parent();
3317
3318 let plan::AlterSinkPlan {
3319 item_id,
3320 global_id,
3321 sink: sink_plan,
3322 with_snapshot,
3323 in_cluster,
3324 } = ctx.plan.clone();
3325
3326 match ctx.plan_validity.check(self.catalog()) {
3335 Ok(()) => {}
3336 Err(err) => {
3337 ctx.retire(Err(err));
3338 return;
3339 }
3340 }
3341
3342 let entry = self.catalog().get_entry(&item_id);
3343 let CatalogItem::Sink(old_sink) = entry.item() else {
3344 panic!("invalid item kind for `AlterSinkPlan`");
3345 };
3346
3347 if sink_plan.version != old_sink.version + 1 {
3348 ctx.retire(Err(AdapterError::ChangedPlan(
3349 "sink was altered concurrently".into(),
3350 )));
3351 return;
3352 }
3353
3354 info!(
3355 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3356 self.controller
3357 .storage_collections
3358 .collections_frontiers(vec![global_id, sink_plan.from]),
3359 self.controller.storage.export(global_id),
3360 );
3361
3362 let write_frontier = &self
3365 .controller
3366 .storage
3367 .export(global_id)
3368 .expect("sink known to exist")
3369 .write_frontier;
3370 let as_of = ctx.read_hold.least_valid_read();
3371 assert!(
3372 write_frontier.iter().all(|t| as_of.less_than(t)),
3373 "{:?} should be strictly less than {:?}",
3374 &*as_of,
3375 &**write_frontier
3376 );
3377
3378 let create_sql = &old_sink.create_sql;
3384 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3385 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3386 unreachable!("invalid statement kind for sink");
3387 };
3388
3389 stmt.with_options
3391 .retain(|o| o.name != CreateSinkOptionName::Version);
3392 stmt.with_options.push(CreateSinkOption {
3393 name: CreateSinkOptionName::Version,
3394 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3395 sink_plan.version.to_string(),
3396 ))),
3397 });
3398
3399 let conn_catalog = self.catalog().for_system_session();
3400 let (mut stmt, resolved_ids) =
3401 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3402
3403 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3405 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3406 stmt.from = ResolvedItemName::Item {
3407 id: from_entry.id(),
3408 qualifiers: from_entry.name.qualifiers.clone(),
3409 full_name,
3410 print_id: true,
3411 version: from_entry.version,
3412 };
3413
3414 let new_sink = Sink {
3415 create_sql: stmt.to_ast_string_stable(),
3416 global_id,
3417 from: sink_plan.from,
3418 connection: sink_plan.connection.clone(),
3419 envelope: sink_plan.envelope,
3420 version: sink_plan.version,
3421 with_snapshot,
3422 resolved_ids: resolved_ids.clone(),
3423 cluster_id: in_cluster,
3424 };
3425
3426 let ops = vec![catalog::Op::UpdateItem {
3427 id: item_id,
3428 name: entry.name().clone(),
3429 to_item: CatalogItem::Sink(new_sink),
3430 }];
3431
3432 match self
3433 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3434 .await
3435 {
3436 Ok(()) => {}
3437 Err(err) => {
3438 ctx.retire(Err(err));
3439 return;
3440 }
3441 }
3442
3443 let storage_sink_desc = StorageSinkDesc {
3444 from: sink_plan.from,
3445 from_desc: from_entry
3446 .desc_opt()
3447 .expect("sinks can only be built on items with descs")
3448 .into_owned(),
3449 connection: sink_plan
3450 .connection
3451 .clone()
3452 .into_inline_connection(self.catalog().state()),
3453 envelope: sink_plan.envelope,
3454 as_of,
3455 with_snapshot,
3456 version: sink_plan.version,
3457 from_storage_metadata: (),
3458 to_storage_metadata: (),
3459 };
3460
3461 self.controller
3462 .storage
3463 .alter_export(
3464 global_id,
3465 ExportDescription {
3466 sink: storage_sink_desc,
3467 instance_id: in_cluster,
3468 },
3469 )
3470 .await
3471 .unwrap_or_terminate("cannot fail to alter source desc");
3472
3473 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3474 }
3475
3476 #[instrument]
3477 pub(super) async fn sequence_alter_connection(
3478 &mut self,
3479 ctx: ExecuteContext,
3480 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3481 ) {
3482 match action {
3483 AlterConnectionAction::RotateKeys => {
3484 self.sequence_rotate_keys(ctx, id).await;
3485 }
3486 AlterConnectionAction::AlterOptions {
3487 set_options,
3488 drop_options,
3489 validate,
3490 } => {
3491 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3492 .await
3493 }
3494 }
3495 }
3496
3497 #[instrument]
3498 async fn sequence_alter_connection_options(
3499 &mut self,
3500 mut ctx: ExecuteContext,
3501 id: CatalogItemId,
3502 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3503 drop_options: BTreeSet<ConnectionOptionName>,
3504 validate: bool,
3505 ) {
3506 let cur_entry = self.catalog().get_entry(&id);
3507 let cur_conn = cur_entry.connection().expect("known to be connection");
3508 let connection_gid = cur_conn.global_id();
3509
3510 let inner = || -> Result<Connection, AdapterError> {
3511 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3513 .expect("invalid create sql persisted to catalog")
3514 .into_element()
3515 .ast
3516 {
3517 Statement::CreateConnection(stmt) => stmt,
3518 _ => unreachable!("proved type is source"),
3519 };
3520
3521 let catalog = self.catalog().for_system_session();
3522
3523 let (mut create_conn_stmt, resolved_ids) =
3525 mz_sql::names::resolve(&catalog, create_conn_stmt)
3526 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3527
3528 create_conn_stmt
3530 .values
3531 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3532
3533 create_conn_stmt.values.extend(
3535 set_options
3536 .into_iter()
3537 .map(|(name, value)| ConnectionOption { name, value }),
3538 );
3539
3540 let mut catalog = self.catalog().for_system_session();
3543 catalog.mark_id_unresolvable_for_replanning(id);
3544
3545 let plan = match mz_sql::plan::plan(
3547 None,
3548 &catalog,
3549 Statement::CreateConnection(create_conn_stmt),
3550 &Params::empty(),
3551 &resolved_ids,
3552 )
3553 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3554 {
3555 Plan::CreateConnection(plan) => plan,
3556 _ => unreachable!("create source plan is only valid response"),
3557 };
3558
3559 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3561 .expect("invalid create sql persisted to catalog")
3562 .into_element()
3563 .ast
3564 {
3565 Statement::CreateConnection(stmt) => stmt,
3566 _ => unreachable!("proved type is source"),
3567 };
3568
3569 let catalog = self.catalog().for_system_session();
3570
3571 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3573 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3574
3575 Ok(Connection {
3576 create_sql: plan.connection.create_sql,
3577 global_id: cur_conn.global_id,
3578 details: plan.connection.details,
3579 resolved_ids: new_deps,
3580 })
3581 };
3582
3583 let conn = match inner() {
3584 Ok(conn) => conn,
3585 Err(e) => {
3586 return ctx.retire(Err(e));
3587 }
3588 };
3589
3590 if validate {
3591 let connection = conn
3592 .details
3593 .to_connection()
3594 .into_inline_connection(self.catalog().state());
3595
3596 let internal_cmd_tx = self.internal_cmd_tx.clone();
3597 let transient_revision = self.catalog().transient_revision();
3598 let conn_id = ctx.session().conn_id().clone();
3599 let otel_ctx = OpenTelemetryContext::obtain();
3600 let role_metadata = ctx.session().role_metadata().clone();
3601 let current_storage_parameters = self.controller.storage.config().clone();
3602
3603 task::spawn(
3604 || format!("validate_alter_connection:{conn_id}"),
3605 async move {
3606 let resolved_ids = conn.resolved_ids.clone();
3607 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3608 let result = match connection.validate(id, ¤t_storage_parameters).await {
3609 Ok(()) => Ok(conn),
3610 Err(err) => Err(err.into()),
3611 };
3612
3613 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3615 AlterConnectionValidationReady {
3616 ctx,
3617 result,
3618 connection_id: id,
3619 connection_gid,
3620 plan_validity: PlanValidity::new(
3621 transient_revision,
3622 dependency_ids.clone(),
3623 None,
3624 None,
3625 role_metadata,
3626 ),
3627 otel_ctx,
3628 resolved_ids,
3629 },
3630 ));
3631 if let Err(e) = result {
3632 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3633 }
3634 },
3635 );
3636 } else {
3637 let result = self
3638 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3639 .await;
3640 ctx.retire(result);
3641 }
3642 }
3643
3644 #[instrument]
3645 pub(crate) async fn sequence_alter_connection_stage_finish(
3646 &mut self,
3647 session: &mut Session,
3648 id: CatalogItemId,
3649 connection: Connection,
3650 ) -> Result<ExecuteResponse, AdapterError> {
3651 match self.catalog.get_entry(&id).item() {
3652 CatalogItem::Connection(curr_conn) => {
3653 curr_conn
3654 .details
3655 .to_connection()
3656 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3657 .map_err(StorageError::from)?;
3658 }
3659 _ => unreachable!("known to be a connection"),
3660 };
3661
3662 let ops = vec![catalog::Op::UpdateItem {
3663 id,
3664 name: self.catalog.get_entry(&id).name().clone(),
3665 to_item: CatalogItem::Connection(connection.clone()),
3666 }];
3667
3668 self.catalog_transact(Some(session), ops).await?;
3669
3670 match connection.details {
3671 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3672 let spec = VpcEndpointConfig {
3673 aws_service_name: privatelink.service_name.to_owned(),
3674 availability_zone_ids: privatelink.availability_zones.to_owned(),
3675 };
3676 self.cloud_resource_controller
3677 .as_ref()
3678 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3679 .ensure_vpc_endpoint(id, spec)
3680 .await?;
3681 }
3682 _ => {}
3683 };
3684
3685 let entry = self.catalog().get_entry(&id);
3686
3687 let mut connections = VecDeque::new();
3688 connections.push_front(entry.id());
3689
3690 let mut source_connections = BTreeMap::new();
3691 let mut sink_connections = BTreeMap::new();
3692 let mut source_export_data_configs = BTreeMap::new();
3693
3694 while let Some(id) = connections.pop_front() {
3695 for id in self.catalog.get_entry(&id).used_by() {
3696 let entry = self.catalog.get_entry(id);
3697 match entry.item() {
3698 CatalogItem::Connection(_) => connections.push_back(*id),
3699 CatalogItem::Source(source) => {
3700 let desc = match &entry.source().expect("known to be source").data_source {
3701 DataSourceDesc::Ingestion { desc, .. }
3702 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3703 desc.clone().into_inline_connection(self.catalog().state())
3704 }
3705 _ => unreachable!("only ingestions reference connections"),
3706 };
3707
3708 source_connections.insert(source.global_id, desc.connection);
3709 }
3710 CatalogItem::Sink(sink) => {
3711 let export = entry.sink().expect("known to be sink");
3712 sink_connections.insert(
3713 sink.global_id,
3714 export
3715 .connection
3716 .clone()
3717 .into_inline_connection(self.catalog().state()),
3718 );
3719 }
3720 CatalogItem::Table(table) => {
3721 if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
3724 let data_config = export_data_config.clone();
3725 source_export_data_configs.insert(
3726 table.global_id_writes(),
3727 data_config.into_inline_connection(self.catalog().state()),
3728 );
3729 }
3730 }
3731 t => unreachable!("connection dependency not expected on {:?}", t),
3732 }
3733 }
3734 }
3735
3736 if !source_connections.is_empty() {
3737 self.controller
3738 .storage
3739 .alter_ingestion_connections(source_connections)
3740 .await
3741 .unwrap_or_terminate("cannot fail to alter ingestion connection");
3742 }
3743
3744 if !sink_connections.is_empty() {
3745 self.controller
3746 .storage
3747 .alter_export_connections(sink_connections)
3748 .await
3749 .unwrap_or_terminate("altering exports after txn must succeed");
3750 }
3751
3752 if !source_export_data_configs.is_empty() {
3753 self.controller
3754 .storage
3755 .alter_ingestion_export_data_configs(source_export_data_configs)
3756 .await
3757 .unwrap_or_terminate("altering source export data configs after txn must succeed");
3758 }
3759
3760 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3761 }
3762
3763 #[instrument]
3764 pub(super) async fn sequence_alter_source(
3765 &mut self,
3766 session: &Session,
3767 plan::AlterSourcePlan {
3768 item_id,
3769 ingestion_id,
3770 action,
3771 }: plan::AlterSourcePlan,
3772 ) -> Result<ExecuteResponse, AdapterError> {
3773 let cur_entry = self.catalog().get_entry(&item_id);
3774 let cur_source = cur_entry.source().expect("known to be source");
3775
3776 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3777 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3779 .expect("invalid create sql persisted to catalog")
3780 .into_element()
3781 .ast
3782 {
3783 Statement::CreateSource(stmt) => stmt,
3784 _ => unreachable!("proved type is source"),
3785 };
3786
3787 let catalog = coord.catalog().for_system_session();
3788
3789 mz_sql::names::resolve(&catalog, create_source_stmt)
3791 .map_err(|e| AdapterError::internal(err_cx, e))
3792 };
3793
3794 match action {
3795 plan::AlterSourceAction::AddSubsourceExports {
3796 subsources,
3797 options,
3798 } => {
3799 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3800
3801 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3802 text_columns: mut new_text_columns,
3803 exclude_columns: mut new_exclude_columns,
3804 ..
3805 } = options.try_into()?;
3806
3807 let (mut create_source_stmt, resolved_ids) =
3809 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3810
3811 let catalog = self.catalog();
3813 let curr_references: BTreeSet<_> = catalog
3814 .get_entry(&item_id)
3815 .used_by()
3816 .into_iter()
3817 .filter_map(|subsource| {
3818 catalog
3819 .get_entry(subsource)
3820 .subsource_details()
3821 .map(|(_id, reference, _details)| reference)
3822 })
3823 .collect();
3824
3825 let purification_err =
3828 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3829
3830 match &mut create_source_stmt.connection {
3834 CreateSourceConnection::Postgres {
3835 options: curr_options,
3836 ..
3837 } => {
3838 let mz_sql::plan::PgConfigOptionExtracted {
3839 mut text_columns, ..
3840 } = curr_options.clone().try_into()?;
3841
3842 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3845
3846 text_columns.retain(|column_qualified_reference| {
3848 mz_ore::soft_assert_eq_or_log!(
3849 column_qualified_reference.0.len(),
3850 4,
3851 "all TEXT COLUMNS values must be column-qualified references"
3852 );
3853 let mut table = column_qualified_reference.clone();
3854 table.0.truncate(3);
3855 curr_references.contains(&table)
3856 });
3857
3858 new_text_columns.extend(text_columns);
3860
3861 if !new_text_columns.is_empty() {
3863 new_text_columns.sort();
3864 let new_text_columns = new_text_columns
3865 .into_iter()
3866 .map(WithOptionValue::UnresolvedItemName)
3867 .collect();
3868
3869 curr_options.push(PgConfigOption {
3870 name: PgConfigOptionName::TextColumns,
3871 value: Some(WithOptionValue::Sequence(new_text_columns)),
3872 });
3873 }
3874 }
3875 CreateSourceConnection::MySql {
3876 options: curr_options,
3877 ..
3878 } => {
3879 let mz_sql::plan::MySqlConfigOptionExtracted {
3880 mut text_columns,
3881 mut exclude_columns,
3882 ..
3883 } = curr_options.clone().try_into()?;
3884
3885 curr_options.retain(|o| {
3888 !matches!(
3889 o.name,
3890 MySqlConfigOptionName::TextColumns
3891 | MySqlConfigOptionName::ExcludeColumns
3892 )
3893 });
3894
3895 let column_referenced =
3897 |column_qualified_reference: &UnresolvedItemName| {
3898 mz_ore::soft_assert_eq_or_log!(
3899 column_qualified_reference.0.len(),
3900 3,
3901 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3902 );
3903 let mut table = column_qualified_reference.clone();
3904 table.0.truncate(2);
3905 curr_references.contains(&table)
3906 };
3907 text_columns.retain(column_referenced);
3908 exclude_columns.retain(column_referenced);
3909
3910 new_text_columns.extend(text_columns);
3912 new_exclude_columns.extend(exclude_columns);
3913
3914 if !new_text_columns.is_empty() {
3916 new_text_columns.sort();
3917 let new_text_columns = new_text_columns
3918 .into_iter()
3919 .map(WithOptionValue::UnresolvedItemName)
3920 .collect();
3921
3922 curr_options.push(MySqlConfigOption {
3923 name: MySqlConfigOptionName::TextColumns,
3924 value: Some(WithOptionValue::Sequence(new_text_columns)),
3925 });
3926 }
3927 if !new_exclude_columns.is_empty() {
3929 new_exclude_columns.sort();
3930 let new_exclude_columns = new_exclude_columns
3931 .into_iter()
3932 .map(WithOptionValue::UnresolvedItemName)
3933 .collect();
3934
3935 curr_options.push(MySqlConfigOption {
3936 name: MySqlConfigOptionName::ExcludeColumns,
3937 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3938 });
3939 }
3940 }
3941 CreateSourceConnection::SqlServer {
3942 options: curr_options,
3943 ..
3944 } => {
3945 let mz_sql::plan::SqlServerConfigOptionExtracted {
3946 mut text_columns,
3947 mut exclude_columns,
3948 ..
3949 } = curr_options.clone().try_into()?;
3950
3951 curr_options.retain(|o| {
3954 !matches!(
3955 o.name,
3956 SqlServerConfigOptionName::TextColumns
3957 | SqlServerConfigOptionName::ExcludeColumns
3958 )
3959 });
3960
3961 let column_referenced =
3963 |column_qualified_reference: &UnresolvedItemName| {
3964 mz_ore::soft_assert_eq_or_log!(
3965 column_qualified_reference.0.len(),
3966 3,
3967 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3968 );
3969 let mut table = column_qualified_reference.clone();
3970 table.0.truncate(2);
3971 curr_references.contains(&table)
3972 };
3973 text_columns.retain(column_referenced);
3974 exclude_columns.retain(column_referenced);
3975
3976 new_text_columns.extend(text_columns);
3978 new_exclude_columns.extend(exclude_columns);
3979
3980 if !new_text_columns.is_empty() {
3982 new_text_columns.sort();
3983 let new_text_columns = new_text_columns
3984 .into_iter()
3985 .map(WithOptionValue::UnresolvedItemName)
3986 .collect();
3987
3988 curr_options.push(SqlServerConfigOption {
3989 name: SqlServerConfigOptionName::TextColumns,
3990 value: Some(WithOptionValue::Sequence(new_text_columns)),
3991 });
3992 }
3993 if !new_exclude_columns.is_empty() {
3995 new_exclude_columns.sort();
3996 let new_exclude_columns = new_exclude_columns
3997 .into_iter()
3998 .map(WithOptionValue::UnresolvedItemName)
3999 .collect();
4000
4001 curr_options.push(SqlServerConfigOption {
4002 name: SqlServerConfigOptionName::ExcludeColumns,
4003 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4004 });
4005 }
4006 }
4007 _ => return Err(purification_err()),
4008 };
4009
4010 let mut catalog = self.catalog().for_system_session();
4011 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4012
4013 let plan = match mz_sql::plan::plan(
4015 None,
4016 &catalog,
4017 Statement::CreateSource(create_source_stmt),
4018 &Params::empty(),
4019 &resolved_ids,
4020 )
4021 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4022 {
4023 Plan::CreateSource(plan) => plan,
4024 _ => unreachable!("create source plan is only valid response"),
4025 };
4026
4027 let source = Source::new(
4031 plan,
4032 cur_source.global_id,
4033 resolved_ids,
4034 cur_source.custom_logical_compaction_window,
4035 cur_source.is_retained_metrics_object,
4036 );
4037
4038 let desc = match &source.data_source {
4040 DataSourceDesc::Ingestion { desc, .. }
4041 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4042 desc.clone().into_inline_connection(self.catalog().state())
4043 }
4044 _ => unreachable!("already verified of type ingestion"),
4045 };
4046
4047 self.controller
4048 .storage
4049 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4050 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4051
4052 let mut ops = vec![catalog::Op::UpdateItem {
4055 id: item_id,
4056 name: self.catalog.get_entry(&item_id).name().clone(),
4059 to_item: CatalogItem::Source(source),
4060 }];
4061
4062 let CreateSourceInner {
4063 ops: new_ops,
4064 sources: _,
4065 if_not_exists_ids,
4066 } = self.create_source_inner(session, subsources).await?;
4067
4068 ops.extend(new_ops.into_iter());
4069
4070 assert!(
4071 if_not_exists_ids.is_empty(),
4072 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4073 );
4074
4075 self.catalog_transact(Some(session), ops).await?;
4076 }
4077 plan::AlterSourceAction::RefreshReferences { references } => {
4078 self.catalog_transact(
4079 Some(session),
4080 vec![catalog::Op::UpdateSourceReferences {
4081 source_id: item_id,
4082 references: references.into(),
4083 }],
4084 )
4085 .await?;
4086 }
4087 }
4088
4089 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4090 }
4091
4092 #[instrument]
4093 pub(super) async fn sequence_alter_system_set(
4094 &mut self,
4095 session: &Session,
4096 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4097 ) -> Result<ExecuteResponse, AdapterError> {
4098 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4099 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4101 self.validate_alter_system_network_policy(session, &value)?;
4102 }
4103
4104 let op = match value {
4105 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4106 name: name.clone(),
4107 value: OwnedVarInput::SqlSet(values),
4108 },
4109 plan::VariableValue::Default => {
4110 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4111 }
4112 };
4113 self.catalog_transact(Some(session), vec![op]).await?;
4114
4115 session.add_notice(AdapterNotice::VarDefaultUpdated {
4116 role: None,
4117 var_name: Some(name),
4118 });
4119 Ok(ExecuteResponse::AlteredSystemConfiguration)
4120 }
4121
4122 #[instrument]
4123 pub(super) async fn sequence_alter_system_reset(
4124 &mut self,
4125 session: &Session,
4126 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4127 ) -> Result<ExecuteResponse, AdapterError> {
4128 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4129 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4130 self.catalog_transact(Some(session), vec![op]).await?;
4131 session.add_notice(AdapterNotice::VarDefaultUpdated {
4132 role: None,
4133 var_name: Some(name),
4134 });
4135 Ok(ExecuteResponse::AlteredSystemConfiguration)
4136 }
4137
4138 #[instrument]
4139 pub(super) async fn sequence_alter_system_reset_all(
4140 &mut self,
4141 session: &Session,
4142 _: plan::AlterSystemResetAllPlan,
4143 ) -> Result<ExecuteResponse, AdapterError> {
4144 self.is_user_allowed_to_alter_system(session, None)?;
4145 let op = catalog::Op::ResetAllSystemConfiguration;
4146 self.catalog_transact(Some(session), vec![op]).await?;
4147 session.add_notice(AdapterNotice::VarDefaultUpdated {
4148 role: None,
4149 var_name: None,
4150 });
4151 Ok(ExecuteResponse::AlteredSystemConfiguration)
4152 }
4153
4154 fn is_user_allowed_to_alter_system(
4156 &self,
4157 session: &Session,
4158 var_name: Option<&str>,
4159 ) -> Result<(), AdapterError> {
4160 match (session.user().kind(), var_name) {
4161 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4163 (UserKind::Superuser, Some(name))
4165 if session.user().is_internal()
4166 || self.catalog().system_config().user_modifiable(name) =>
4167 {
4168 let var = self.catalog().system_config().get(name)?;
4171 var.visible(session.user(), self.catalog().system_config())?;
4172 Ok(())
4173 }
4174 (UserKind::Regular, Some(name))
4177 if self.catalog().system_config().user_modifiable(name) =>
4178 {
4179 Err(AdapterError::Unauthorized(
4180 rbac::UnauthorizedError::Superuser {
4181 action: format!("toggle the '{name}' system configuration parameter"),
4182 },
4183 ))
4184 }
4185 _ => Err(AdapterError::Unauthorized(
4186 rbac::UnauthorizedError::MzSystem {
4187 action: "alter system".into(),
4188 },
4189 )),
4190 }
4191 }
4192
4193 fn validate_alter_system_network_policy(
4194 &self,
4195 session: &Session,
4196 policy_value: &plan::VariableValue,
4197 ) -> Result<(), AdapterError> {
4198 let policy_name = match &policy_value {
4199 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4201 plan::VariableValue::Values(values) if values.len() == 1 => {
4202 values.iter().next().cloned()
4203 }
4204 plan::VariableValue::Values(values) => {
4205 tracing::warn!(?values, "can't set multiple network policies at once");
4206 None
4207 }
4208 };
4209 let maybe_network_policy = policy_name
4210 .as_ref()
4211 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4212 let Some(network_policy) = maybe_network_policy else {
4213 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4214 VarError::InvalidParameterValue {
4215 name: NETWORK_POLICY.name(),
4216 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4217 reason: "no network policy with such name exists".to_string(),
4218 },
4219 )));
4220 };
4221 self.validate_alter_network_policy(session, &network_policy.rules)
4222 }
4223
4224 fn validate_alter_network_policy(
4229 &self,
4230 session: &Session,
4231 policy_rules: &Vec<NetworkPolicyRule>,
4232 ) -> Result<(), AdapterError> {
4233 if session.user().is_internal() {
4236 return Ok(());
4237 }
4238 if let Some(ip) = session.meta().client_ip() {
4239 validate_ip_with_policy_rules(ip, policy_rules)
4240 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4241 } else {
4242 return Err(AdapterError::NetworkPolicyDenied(
4245 NetworkPolicyError::MissingIp,
4246 ));
4247 }
4248 Ok(())
4249 }
4250
4251 #[instrument]
4253 pub(super) fn sequence_execute(
4254 &mut self,
4255 session: &mut Session,
4256 plan: plan::ExecutePlan,
4257 ) -> Result<String, AdapterError> {
4258 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4260 let ps = session
4261 .get_prepared_statement_unverified(&plan.name)
4262 .expect("known to exist");
4263 let stmt = ps.stmt().cloned();
4264 let desc = ps.desc().clone();
4265 let state_revision = ps.state_revision;
4266 let logging = Arc::clone(ps.logging());
4267 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4268 }
4269
4270 #[instrument]
4271 pub(super) async fn sequence_grant_privileges(
4272 &mut self,
4273 session: &Session,
4274 plan::GrantPrivilegesPlan {
4275 update_privileges,
4276 grantees,
4277 }: plan::GrantPrivilegesPlan,
4278 ) -> Result<ExecuteResponse, AdapterError> {
4279 self.sequence_update_privileges(
4280 session,
4281 update_privileges,
4282 grantees,
4283 UpdatePrivilegeVariant::Grant,
4284 )
4285 .await
4286 }
4287
4288 #[instrument]
4289 pub(super) async fn sequence_revoke_privileges(
4290 &mut self,
4291 session: &Session,
4292 plan::RevokePrivilegesPlan {
4293 update_privileges,
4294 revokees,
4295 }: plan::RevokePrivilegesPlan,
4296 ) -> Result<ExecuteResponse, AdapterError> {
4297 self.sequence_update_privileges(
4298 session,
4299 update_privileges,
4300 revokees,
4301 UpdatePrivilegeVariant::Revoke,
4302 )
4303 .await
4304 }
4305
4306 #[instrument]
4307 async fn sequence_update_privileges(
4308 &mut self,
4309 session: &Session,
4310 update_privileges: Vec<UpdatePrivilege>,
4311 grantees: Vec<RoleId>,
4312 variant: UpdatePrivilegeVariant,
4313 ) -> Result<ExecuteResponse, AdapterError> {
4314 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4315 let mut warnings = Vec::new();
4316 let catalog = self.catalog().for_session(session);
4317
4318 for UpdatePrivilege {
4319 acl_mode,
4320 target_id,
4321 grantor,
4322 } in update_privileges
4323 {
4324 let actual_object_type = catalog.get_system_object_type(&target_id);
4325 if actual_object_type.is_relation() {
4328 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4329 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4330 if !non_applicable_privileges.is_empty() {
4331 let object_description =
4332 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4333 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4334 non_applicable_privileges,
4335 object_description,
4336 })
4337 }
4338 }
4339
4340 if let SystemObjectId::Object(object_id) = &target_id {
4341 self.catalog()
4342 .ensure_not_reserved_object(object_id, session.conn_id())?;
4343 }
4344
4345 let privileges = self
4346 .catalog()
4347 .get_privileges(&target_id, session.conn_id())
4348 .ok_or(AdapterError::Unsupported(
4351 "GRANTs/REVOKEs on an object type with no privileges",
4352 ))?;
4353
4354 for grantee in &grantees {
4355 self.catalog().ensure_not_system_role(grantee)?;
4356 self.catalog().ensure_not_predefined_role(grantee)?;
4357 let existing_privilege = privileges
4358 .get_acl_item(grantee, &grantor)
4359 .map(Cow::Borrowed)
4360 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4361
4362 match variant {
4363 UpdatePrivilegeVariant::Grant
4364 if !existing_privilege.acl_mode.contains(acl_mode) =>
4365 {
4366 ops.push(catalog::Op::UpdatePrivilege {
4367 target_id: target_id.clone(),
4368 privilege: MzAclItem {
4369 grantee: *grantee,
4370 grantor,
4371 acl_mode,
4372 },
4373 variant,
4374 });
4375 }
4376 UpdatePrivilegeVariant::Revoke
4377 if !existing_privilege
4378 .acl_mode
4379 .intersection(acl_mode)
4380 .is_empty() =>
4381 {
4382 ops.push(catalog::Op::UpdatePrivilege {
4383 target_id: target_id.clone(),
4384 privilege: MzAclItem {
4385 grantee: *grantee,
4386 grantor,
4387 acl_mode,
4388 },
4389 variant,
4390 });
4391 }
4392 _ => {}
4394 }
4395 }
4396 }
4397
4398 if ops.is_empty() {
4399 session.add_notices(warnings);
4400 return Ok(variant.into());
4401 }
4402
4403 let res = self
4404 .catalog_transact(Some(session), ops)
4405 .await
4406 .map(|_| match variant {
4407 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4408 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4409 });
4410 if res.is_ok() {
4411 session.add_notices(warnings);
4412 }
4413 res
4414 }
4415
4416 #[instrument]
4417 pub(super) async fn sequence_alter_default_privileges(
4418 &mut self,
4419 session: &Session,
4420 plan::AlterDefaultPrivilegesPlan {
4421 privilege_objects,
4422 privilege_acl_items,
4423 is_grant,
4424 }: plan::AlterDefaultPrivilegesPlan,
4425 ) -> Result<ExecuteResponse, AdapterError> {
4426 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4427 let variant = if is_grant {
4428 UpdatePrivilegeVariant::Grant
4429 } else {
4430 UpdatePrivilegeVariant::Revoke
4431 };
4432 for privilege_object in &privilege_objects {
4433 self.catalog()
4434 .ensure_not_system_role(&privilege_object.role_id)?;
4435 self.catalog()
4436 .ensure_not_predefined_role(&privilege_object.role_id)?;
4437 if let Some(database_id) = privilege_object.database_id {
4438 self.catalog()
4439 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4440 }
4441 if let Some(schema_id) = privilege_object.schema_id {
4442 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4443 let schema_spec: SchemaSpecifier = schema_id.into();
4444
4445 self.catalog().ensure_not_reserved_object(
4446 &(database_spec, schema_spec).into(),
4447 session.conn_id(),
4448 )?;
4449 }
4450 for privilege_acl_item in &privilege_acl_items {
4451 self.catalog()
4452 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4453 self.catalog()
4454 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4455 ops.push(catalog::Op::UpdateDefaultPrivilege {
4456 privilege_object: privilege_object.clone(),
4457 privilege_acl_item: privilege_acl_item.clone(),
4458 variant,
4459 })
4460 }
4461 }
4462
4463 self.catalog_transact(Some(session), ops).await?;
4464 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4465 }
4466
4467 #[instrument]
4468 pub(super) async fn sequence_grant_role(
4469 &mut self,
4470 session: &Session,
4471 plan::GrantRolePlan {
4472 role_ids,
4473 member_ids,
4474 grantor_id,
4475 }: plan::GrantRolePlan,
4476 ) -> Result<ExecuteResponse, AdapterError> {
4477 let catalog = self.catalog();
4478 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4479 for role_id in role_ids {
4480 for member_id in &member_ids {
4481 let member_membership: BTreeSet<_> =
4482 catalog.get_role(member_id).membership().keys().collect();
4483 if member_membership.contains(&role_id) {
4484 let role_name = catalog.get_role(&role_id).name().to_string();
4485 let member_name = catalog.get_role(member_id).name().to_string();
4486 catalog.ensure_not_reserved_role(member_id)?;
4488 catalog.ensure_grantable_role(&role_id)?;
4489 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4490 role_name,
4491 member_name,
4492 });
4493 } else {
4494 ops.push(catalog::Op::GrantRole {
4495 role_id,
4496 member_id: *member_id,
4497 grantor_id,
4498 });
4499 }
4500 }
4501 }
4502
4503 if ops.is_empty() {
4504 return Ok(ExecuteResponse::GrantedRole);
4505 }
4506
4507 self.catalog_transact(Some(session), ops)
4508 .await
4509 .map(|_| ExecuteResponse::GrantedRole)
4510 }
4511
4512 #[instrument]
4513 pub(super) async fn sequence_revoke_role(
4514 &mut self,
4515 session: &Session,
4516 plan::RevokeRolePlan {
4517 role_ids,
4518 member_ids,
4519 grantor_id,
4520 }: plan::RevokeRolePlan,
4521 ) -> Result<ExecuteResponse, AdapterError> {
4522 let catalog = self.catalog();
4523 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4524 for role_id in role_ids {
4525 for member_id in &member_ids {
4526 let member_membership: BTreeSet<_> =
4527 catalog.get_role(member_id).membership().keys().collect();
4528 if !member_membership.contains(&role_id) {
4529 let role_name = catalog.get_role(&role_id).name().to_string();
4530 let member_name = catalog.get_role(member_id).name().to_string();
4531 catalog.ensure_not_reserved_role(member_id)?;
4533 catalog.ensure_grantable_role(&role_id)?;
4534 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4535 role_name,
4536 member_name,
4537 });
4538 } else {
4539 ops.push(catalog::Op::RevokeRole {
4540 role_id,
4541 member_id: *member_id,
4542 grantor_id,
4543 });
4544 }
4545 }
4546 }
4547
4548 if ops.is_empty() {
4549 return Ok(ExecuteResponse::RevokedRole);
4550 }
4551
4552 self.catalog_transact(Some(session), ops)
4553 .await
4554 .map(|_| ExecuteResponse::RevokedRole)
4555 }
4556
4557 #[instrument]
4558 pub(super) async fn sequence_alter_owner(
4559 &mut self,
4560 session: &Session,
4561 plan::AlterOwnerPlan {
4562 id,
4563 object_type,
4564 new_owner,
4565 }: plan::AlterOwnerPlan,
4566 ) -> Result<ExecuteResponse, AdapterError> {
4567 let mut ops = vec![catalog::Op::UpdateOwner {
4568 id: id.clone(),
4569 new_owner,
4570 }];
4571
4572 match &id {
4573 ObjectId::Item(global_id) => {
4574 let entry = self.catalog().get_entry(global_id);
4575
4576 if entry.is_index() {
4578 let name = self
4579 .catalog()
4580 .resolve_full_name(entry.name(), Some(session.conn_id()))
4581 .to_string();
4582 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4583 return Ok(ExecuteResponse::AlteredObject(object_type));
4584 }
4585
4586 let dependent_index_ops = entry
4588 .used_by()
4589 .into_iter()
4590 .filter(|id| self.catalog().get_entry(id).is_index())
4591 .map(|id| catalog::Op::UpdateOwner {
4592 id: ObjectId::Item(*id),
4593 new_owner,
4594 });
4595 ops.extend(dependent_index_ops);
4596
4597 let dependent_subsources =
4599 entry
4600 .progress_id()
4601 .into_iter()
4602 .map(|item_id| catalog::Op::UpdateOwner {
4603 id: ObjectId::Item(item_id),
4604 new_owner,
4605 });
4606 ops.extend(dependent_subsources);
4607 }
4608 ObjectId::Cluster(cluster_id) => {
4609 let cluster = self.catalog().get_cluster(*cluster_id);
4610 let managed_cluster_replica_ops =
4612 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4613 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4614 new_owner,
4615 });
4616 ops.extend(managed_cluster_replica_ops);
4617 }
4618 _ => {}
4619 }
4620
4621 self.catalog_transact(Some(session), ops)
4622 .await
4623 .map(|_| ExecuteResponse::AlteredObject(object_type))
4624 }
4625
4626 #[instrument]
4627 pub(super) async fn sequence_reassign_owned(
4628 &mut self,
4629 session: &Session,
4630 plan::ReassignOwnedPlan {
4631 old_roles,
4632 new_role,
4633 reassign_ids,
4634 }: plan::ReassignOwnedPlan,
4635 ) -> Result<ExecuteResponse, AdapterError> {
4636 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4637 self.catalog().ensure_not_reserved_role(role_id)?;
4638 }
4639
4640 let ops = reassign_ids
4641 .into_iter()
4642 .map(|id| catalog::Op::UpdateOwner {
4643 id,
4644 new_owner: new_role,
4645 })
4646 .collect();
4647
4648 self.catalog_transact(Some(session), ops)
4649 .await
4650 .map(|_| ExecuteResponse::ReassignOwned)
4651 }
4652
4653 #[instrument]
4654 pub(crate) async fn handle_deferred_statement(&mut self) {
4655 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4659 return;
4660 };
4661 match ps {
4662 crate::coord::PlanStatement::Statement { stmt, params } => {
4663 self.handle_execute_inner(stmt, params, ctx).await;
4664 }
4665 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4666 self.sequence_plan(ctx, plan, resolved_ids).await;
4667 }
4668 }
4669 }
4670
4671 #[instrument]
4672 #[allow(clippy::unused_async)]
4674 pub(super) async fn sequence_alter_table(
4675 &mut self,
4676 ctx: &mut ExecuteContext,
4677 plan: plan::AlterTablePlan,
4678 ) -> Result<ExecuteResponse, AdapterError> {
4679 let plan::AlterTablePlan {
4680 relation_id,
4681 column_name,
4682 column_type,
4683 raw_sql_type,
4684 } = plan;
4685
4686 let id_ts = self.get_catalog_write_ts().await;
4688 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4689 let ops = vec![catalog::Op::AlterAddColumn {
4690 id: relation_id,
4691 new_global_id,
4692 name: column_name,
4693 typ: column_type,
4694 sql: raw_sql_type,
4695 }];
4696
4697 self.catalog_transact_with_context(None, Some(ctx), ops)
4698 .await?;
4699
4700 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4701 }
4702
4703 #[instrument]
4704 pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4705 &mut self,
4706 ctx: &mut ExecuteContext,
4707 plan: AlterMaterializedViewApplyReplacementPlan,
4708 ) -> Result<ExecuteResponse, AdapterError> {
4709 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4710
4711 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4715 self.catalog_transact(Some(ctx.session()), ops).await?;
4716
4717 Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4718 }
4719
4720 pub(super) async fn statistics_oracle(
4721 &self,
4722 session: &Session,
4723 source_ids: &BTreeSet<GlobalId>,
4724 query_as_of: &Antichain<Timestamp>,
4725 is_oneshot: bool,
4726 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4727 super::statistics_oracle(
4728 session,
4729 source_ids,
4730 query_as_of,
4731 is_oneshot,
4732 self.catalog().system_config(),
4733 self.controller.storage_collections.as_ref(),
4734 )
4735 .await
4736 }
4737}
4738
4739impl Coordinator {
4740 async fn process_dataflow_metainfo(
4742 &mut self,
4743 df_meta: DataflowMetainfo,
4744 export_id: GlobalId,
4745 ctx: Option<&mut ExecuteContext>,
4746 notice_ids: Vec<GlobalId>,
4747 ) -> Option<BuiltinTableAppendNotify> {
4748 if let Some(ctx) = ctx {
4750 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4751 }
4752
4753 let df_meta = self
4755 .catalog()
4756 .render_notices(df_meta, notice_ids, Some(export_id));
4757
4758 if self.catalog().state().system_config().enable_mz_notices()
4761 && !df_meta.optimizer_notices.is_empty()
4762 {
4763 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4764 self.catalog().state().pack_optimizer_notices(
4765 &mut builtin_table_updates,
4766 df_meta.optimizer_notices.iter(),
4767 Diff::ONE,
4768 );
4769
4770 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4772
4773 Some(
4774 self.builtin_table_update()
4775 .execute(builtin_table_updates)
4776 .await
4777 .0,
4778 )
4779 } else {
4780 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4782
4783 None
4784 }
4785 }
4786}