1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43 RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48 SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58 SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62 StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106 WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114 WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = return_if_err!(res, ctx);
255 f(ctx, next);
256 }
257 _ = rx, if cancel_enabled => {
258 ctx.retire(Err(AdapterError::Canceled));
259 }
260 }
261 });
262 }
263
264 async fn create_source_inner(
265 &self,
266 session: &Session,
267 plans: Vec<plan::CreateSourcePlanBundle>,
268 ) -> Result<CreateSourceInner, AdapterError> {
269 let mut ops = vec![];
270 let mut sources = vec![];
271
272 let if_not_exists_ids = plans
273 .iter()
274 .filter_map(
275 |plan::CreateSourcePlanBundle {
276 item_id,
277 global_id: _,
278 plan,
279 resolved_ids: _,
280 available_source_references: _,
281 }| {
282 if plan.if_not_exists {
283 Some((*item_id, plan.name.clone()))
284 } else {
285 None
286 }
287 },
288 )
289 .collect::<BTreeMap<_, _>>();
290
291 for plan::CreateSourcePlanBundle {
292 item_id,
293 global_id,
294 mut plan,
295 resolved_ids,
296 available_source_references,
297 } in plans
298 {
299 let name = plan.name.clone();
300
301 match plan.source.data_source {
302 plan::DataSourceDesc::Ingestion(ref desc)
303 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304 let cluster_id = plan
305 .in_cluster
306 .expect("ingestion plans must specify cluster");
307 match desc.connection {
308 GenericSourceConnection::Postgres(_)
309 | GenericSourceConnection::MySql(_)
310 | GenericSourceConnection::SqlServer(_)
311 | GenericSourceConnection::Kafka(_)
312 | GenericSourceConnection::LoadGenerator(_) => {
313 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315 .get(self.catalog().system_config().dyncfgs());
316
317 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318 {
319 return Err(AdapterError::Unsupported(
320 "sources in clusters with >1 replicas",
321 ));
322 }
323 }
324 }
325 }
326 }
327 plan::DataSourceDesc::Webhook { .. } => {
328 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331 .get(self.catalog().system_config().dyncfgs());
332
333 if !enable_multi_replica_sources {
334 if cluster.replica_ids().len() > 1 {
335 return Err(AdapterError::Unsupported(
336 "webhook sources in clusters with >1 replicas",
337 ));
338 }
339 }
340 }
341 }
342 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343 }
344
345 if let mz_sql::plan::DataSourceDesc::Webhook {
347 validate_using: Some(validate),
348 ..
349 } = &mut plan.source.data_source
350 {
351 if let Err(reason) = validate.reduce_expression().await {
352 self.metrics
353 .webhook_validation_reduce_failures
354 .with_label_values(&[reason])
355 .inc();
356 return Err(AdapterError::Internal(format!(
357 "failed to reduce check expression, {reason}"
358 )));
359 }
360 }
361
362 let mut reference_ops = vec![];
365 if let Some(references) = &available_source_references {
366 reference_ops.push(catalog::Op::UpdateSourceReferences {
367 source_id: item_id,
368 references: references.clone().into(),
369 });
370 }
371
372 let source = Source::new(plan, global_id, resolved_ids, None, false);
373 ops.push(catalog::Op::CreateItem {
374 id: item_id,
375 name,
376 item: CatalogItem::Source(source.clone()),
377 owner_id: *session.current_role_id(),
378 });
379 sources.push((item_id, source));
380 ops.extend(reference_ops);
382 }
383
384 Ok(CreateSourceInner {
385 ops,
386 sources,
387 if_not_exists_ids,
388 })
389 }
390
391 pub(crate) fn plan_subsource(
399 &self,
400 session: &Session,
401 params: &mz_sql::plan::Params,
402 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403 item_id: CatalogItemId,
404 global_id: GlobalId,
405 ) -> Result<CreateSourcePlanBundle, AdapterError> {
406 let catalog = self.catalog().for_session(session);
407 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409 let plan = self.plan_statement(
410 session,
411 Statement::CreateSubsource(subsource_stmt),
412 params,
413 &resolved_ids,
414 )?;
415 let plan = match plan {
416 Plan::CreateSource(plan) => plan,
417 _ => unreachable!(),
418 };
419 Ok(CreateSourcePlanBundle {
420 item_id,
421 global_id,
422 plan,
423 resolved_ids,
424 available_source_references: None,
425 })
426 }
427
428 pub(crate) async fn plan_purified_alter_source_add_subsource(
430 &mut self,
431 session: &Session,
432 params: Params,
433 source_name: ResolvedItemName,
434 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436 ) -> Result<(Plan, ResolvedIds), AdapterError> {
437 let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439 let conn_catalog = self.catalog().for_system_session();
441 let pcx = plan::PlanContext::zero();
442 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444 let entry = self.catalog().get_entry(source_name.item_id());
445 let source = entry.source().ok_or_else(|| {
446 AdapterError::internal(
447 "plan alter source",
448 format!("expected Source found {entry:?}"),
449 )
450 })?;
451
452 let item_id = entry.id();
453 let ingestion_id = source.global_id();
454 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456 let id_ts = self.get_catalog_write_ts().await;
457 let ids = self
458 .catalog()
459 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
460 .await?;
461 for (subsource_stmt, (item_id, global_id)) in
462 subsource_stmts.into_iter().zip_eq(ids.into_iter())
463 {
464 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
465 subsource_plans.push(s);
466 }
467
468 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
469 subsources: subsource_plans,
470 options,
471 };
472
473 Ok((
474 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
475 item_id,
476 ingestion_id,
477 action,
478 }),
479 ResolvedIds::empty(),
480 ))
481 }
482
483 pub(crate) fn plan_purified_alter_source_refresh_references(
485 &self,
486 _session: &Session,
487 _params: Params,
488 source_name: ResolvedItemName,
489 available_source_references: plan::SourceReferences,
490 ) -> Result<(Plan, ResolvedIds), AdapterError> {
491 let entry = self.catalog().get_entry(source_name.item_id());
492 let source = entry.source().ok_or_else(|| {
493 AdapterError::internal(
494 "plan alter source",
495 format!("expected Source found {entry:?}"),
496 )
497 })?;
498 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
499 references: available_source_references,
500 };
501
502 Ok((
503 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
504 item_id: entry.id(),
505 ingestion_id: source.global_id(),
506 action,
507 }),
508 ResolvedIds::empty(),
509 ))
510 }
511
512 pub(crate) async fn plan_purified_create_source(
516 &mut self,
517 ctx: &ExecuteContext,
518 params: Params,
519 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
520 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
521 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
522 available_source_references: plan::SourceReferences,
523 ) -> Result<(Plan, ResolvedIds), AdapterError> {
524 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
525
526 if let Some(progress_stmt) = progress_stmt {
528 assert_none!(progress_stmt.of_source);
533 let id_ts = self.get_catalog_write_ts().await;
534 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
535 let progress_plan =
536 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
537 let progress_full_name = self
538 .catalog()
539 .resolve_full_name(&progress_plan.plan.name, None);
540 let progress_subsource = ResolvedItemName::Item {
541 id: progress_plan.item_id,
542 qualifiers: progress_plan.plan.name.qualifiers.clone(),
543 full_name: progress_full_name,
544 print_id: true,
545 version: RelationVersionSelector::Latest,
546 };
547
548 create_source_plans.push(progress_plan);
549
550 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
551 }
552
553 let catalog = self.catalog().for_session(ctx.session());
554 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
555
556 let propagated_with_options: Vec<_> = source_stmt
557 .with_options
558 .iter()
559 .filter_map(|opt| match opt.name {
560 CreateSourceOptionName::TimestampInterval => None,
561 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
562 name: CreateSubsourceOptionName::RetainHistory,
563 value: opt.value.clone(),
564 }),
565 })
566 .collect();
567
568 let source_plan = match self.plan_statement(
570 ctx.session(),
571 Statement::CreateSource(source_stmt),
572 ¶ms,
573 &resolved_ids,
574 )? {
575 Plan::CreateSource(plan) => plan,
576 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
577 };
578
579 let id_ts = self.get_catalog_write_ts().await;
580 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
581
582 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
583 let of_source = ResolvedItemName::Item {
584 id: item_id,
585 qualifiers: source_plan.name.qualifiers.clone(),
586 full_name: source_full_name,
587 print_id: true,
588 version: RelationVersionSelector::Latest,
589 };
590
591 let conn_catalog = self.catalog().for_system_session();
593 let pcx = plan::PlanContext::zero();
594 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
595
596 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
597
598 for subsource_stmt in subsource_stmts.iter_mut() {
599 subsource_stmt
600 .with_options
601 .extend(propagated_with_options.iter().cloned())
602 }
603
604 create_source_plans.push(CreateSourcePlanBundle {
605 item_id,
606 global_id,
607 plan: source_plan,
608 resolved_ids: resolved_ids.clone(),
609 available_source_references: Some(available_source_references),
610 });
611
612 let id_ts = self.get_catalog_write_ts().await;
614 let ids = self
615 .catalog()
616 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
617 .await?;
618 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
619 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
620 create_source_plans.push(plan);
621 }
622
623 Ok((
624 Plan::CreateSources(create_source_plans),
625 ResolvedIds::empty(),
626 ))
627 }
628
629 #[instrument]
630 pub(super) async fn sequence_create_source(
631 &mut self,
632 ctx: &mut ExecuteContext,
633 plans: Vec<plan::CreateSourcePlanBundle>,
634 ) -> Result<ExecuteResponse, AdapterError> {
635 let CreateSourceInner {
636 ops,
637 sources,
638 if_not_exists_ids,
639 } = self.create_source_inner(ctx.session(), plans).await?;
640
641 let transact_result = self
642 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
643 .await;
644
645 for (item_id, source) in &sources {
647 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
648 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
649 ctx.session()
650 .add_notice(AdapterNotice::WebhookSourceCreated { url });
651 }
652 }
653 }
654
655 match transact_result {
656 Ok(()) => Ok(ExecuteResponse::CreatedSource),
657 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
658 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
659 })) if if_not_exists_ids.contains_key(&id) => {
660 ctx.session()
661 .add_notice(AdapterNotice::ObjectAlreadyExists {
662 name: if_not_exists_ids[&id].item.clone(),
663 ty: "source",
664 });
665 Ok(ExecuteResponse::CreatedSource)
666 }
667 Err(err) => Err(err),
668 }
669 }
670
671 #[instrument]
672 pub(super) async fn sequence_create_connection(
673 &mut self,
674 mut ctx: ExecuteContext,
675 plan: plan::CreateConnectionPlan,
676 resolved_ids: ResolvedIds,
677 ) {
678 let id_ts = self.get_catalog_write_ts().await;
679 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
680 Ok(item_id) => item_id,
681 Err(err) => return ctx.retire(Err(err.into())),
682 };
683
684 match &plan.connection.details {
685 ConnectionDetails::Ssh { key_1, key_2, .. } => {
686 let key_1 = match key_1.as_key_pair() {
687 Some(key_1) => key_1.clone(),
688 None => {
689 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
690 "the PUBLIC KEY 1 option cannot be explicitly specified"
691 ))));
692 }
693 };
694
695 let key_2 = match key_2.as_key_pair() {
696 Some(key_2) => key_2.clone(),
697 None => {
698 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
699 "the PUBLIC KEY 2 option cannot be explicitly specified"
700 ))));
701 }
702 };
703
704 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
705 let secret = key_set.to_bytes();
706 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
707 return ctx.retire(Err(err.into()));
708 }
709 }
710 _ => (),
711 };
712
713 if plan.validate {
714 let internal_cmd_tx = self.internal_cmd_tx.clone();
715 let transient_revision = self.catalog().transient_revision();
716 let conn_id = ctx.session().conn_id().clone();
717 let otel_ctx = OpenTelemetryContext::obtain();
718 let role_metadata = ctx.session().role_metadata().clone();
719
720 let connection = plan
721 .connection
722 .details
723 .to_connection()
724 .into_inline_connection(self.catalog().state());
725
726 let current_storage_parameters = self.controller.storage.config().clone();
727 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
728 let result = match connection
729 .validate(connection_id, ¤t_storage_parameters)
730 .await
731 {
732 Ok(()) => Ok(plan),
733 Err(err) => Err(err.into()),
734 };
735
736 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
738 CreateConnectionValidationReady {
739 ctx,
740 result,
741 connection_id,
742 connection_gid,
743 plan_validity: PlanValidity::new(
744 transient_revision,
745 resolved_ids.items().copied().collect(),
746 None,
747 None,
748 role_metadata,
749 ),
750 otel_ctx,
751 resolved_ids: resolved_ids.clone(),
752 },
753 ));
754 if let Err(e) = result {
755 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
756 }
757 });
758 } else {
759 let result = self
760 .sequence_create_connection_stage_finish(
761 &mut ctx,
762 connection_id,
763 connection_gid,
764 plan,
765 resolved_ids,
766 )
767 .await;
768 ctx.retire(result);
769 }
770 }
771
772 #[instrument]
773 pub(crate) async fn sequence_create_connection_stage_finish(
774 &mut self,
775 ctx: &mut ExecuteContext,
776 connection_id: CatalogItemId,
777 connection_gid: GlobalId,
778 plan: plan::CreateConnectionPlan,
779 resolved_ids: ResolvedIds,
780 ) -> Result<ExecuteResponse, AdapterError> {
781 let ops = vec![catalog::Op::CreateItem {
782 id: connection_id,
783 name: plan.name.clone(),
784 item: CatalogItem::Connection(Connection {
785 create_sql: plan.connection.create_sql,
786 global_id: connection_gid,
787 details: plan.connection.details.clone(),
788 resolved_ids,
789 }),
790 owner_id: *ctx.session().current_role_id(),
791 }];
792
793 let conn_id = ctx.session().conn_id().clone();
796 let transact_result = self
797 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
798 .await;
799
800 match transact_result {
801 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
802 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
803 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
804 })) if plan.if_not_exists => {
805 ctx.session()
806 .add_notice(AdapterNotice::ObjectAlreadyExists {
807 name: plan.name.item,
808 ty: "connection",
809 });
810 Ok(ExecuteResponse::CreatedConnection)
811 }
812 Err(err) => Err(err),
813 }
814 }
815
816 #[instrument]
817 pub(super) async fn sequence_create_database(
818 &mut self,
819 session: &Session,
820 plan: plan::CreateDatabasePlan,
821 ) -> Result<ExecuteResponse, AdapterError> {
822 let ops = vec![catalog::Op::CreateDatabase {
823 name: plan.name.clone(),
824 owner_id: *session.current_role_id(),
825 }];
826 match self.catalog_transact(Some(session), ops).await {
827 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
828 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
829 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
830 })) if plan.if_not_exists => {
831 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
832 Ok(ExecuteResponse::CreatedDatabase)
833 }
834 Err(err) => Err(err),
835 }
836 }
837
838 #[instrument]
839 pub(super) async fn sequence_create_schema(
840 &mut self,
841 session: &Session,
842 plan: plan::CreateSchemaPlan,
843 ) -> Result<ExecuteResponse, AdapterError> {
844 let op = catalog::Op::CreateSchema {
845 database_id: plan.database_spec,
846 schema_name: plan.schema_name.clone(),
847 owner_id: *session.current_role_id(),
848 };
849 match self.catalog_transact(Some(session), vec![op]).await {
850 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
851 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
852 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853 })) if plan.if_not_exists => {
854 session.add_notice(AdapterNotice::SchemaAlreadyExists {
855 name: plan.schema_name,
856 });
857 Ok(ExecuteResponse::CreatedSchema)
858 }
859 Err(err) => Err(err),
860 }
861 }
862
863 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866 if attributes.superuser.is_some()
867 || attributes.password.is_some()
868 || attributes.login.is_some()
869 {
870 return Err(AdapterError::UnavailableFeature {
871 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873 });
874 }
875 }
876 Ok(())
877 }
878
879 #[instrument]
880 pub(super) async fn sequence_create_role(
881 &mut self,
882 conn_id: Option<&ConnectionId>,
883 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884 ) -> Result<ExecuteResponse, AdapterError> {
885 self.validate_role_attributes(&attributes.clone())?;
886 let op = catalog::Op::CreateRole { name, attributes };
887 self.catalog_transact_with_context(conn_id, None, vec![op])
888 .await
889 .map(|_| ExecuteResponse::CreatedRole)
890 }
891
892 #[instrument]
893 pub(super) async fn sequence_create_network_policy(
894 &mut self,
895 session: &Session,
896 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897 ) -> Result<ExecuteResponse, AdapterError> {
898 let op = catalog::Op::CreateNetworkPolicy {
899 rules,
900 name,
901 owner_id: *session.current_role_id(),
902 };
903 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904 .await
905 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906 }
907
908 #[instrument]
909 pub(super) async fn sequence_alter_network_policy(
910 &mut self,
911 session: &Session,
912 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913 ) -> Result<ExecuteResponse, AdapterError> {
914 let current_network_policy_name =
916 self.catalog().system_config().default_network_policy_name();
917 if current_network_policy_name == name {
919 self.validate_alter_network_policy(session, &rules)?;
920 }
921
922 let op = catalog::Op::AlterNetworkPolicy {
923 id,
924 rules,
925 name,
926 owner_id: *session.current_role_id(),
927 };
928 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929 .await
930 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931 }
932
933 #[instrument]
934 pub(super) async fn sequence_create_table(
935 &mut self,
936 ctx: &mut ExecuteContext,
937 plan: plan::CreateTablePlan,
938 resolved_ids: ResolvedIds,
939 ) -> Result<ExecuteResponse, AdapterError> {
940 let plan::CreateTablePlan {
941 name,
942 table,
943 if_not_exists,
944 } = plan;
945
946 let conn_id = if table.temporary {
947 Some(ctx.session().conn_id())
948 } else {
949 None
950 };
951 let id_ts = self.get_catalog_write_ts().await;
952 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955 let data_source = match table.data_source {
956 plan::TableDataSource::TableWrites { defaults } => {
957 TableDataSource::TableWrites { defaults }
958 }
959 plan::TableDataSource::DataSource {
960 desc: data_source_plan,
961 timeline,
962 } => match data_source_plan {
963 plan::DataSourceDesc::IngestionExport {
964 ingestion_id,
965 external_reference,
966 details,
967 data_config,
968 } => TableDataSource::DataSource {
969 desc: DataSourceDesc::IngestionExport {
970 ingestion_id,
971 external_reference,
972 details,
973 data_config,
974 },
975 timeline,
976 },
977 plan::DataSourceDesc::Webhook {
978 validate_using,
979 body_format,
980 headers,
981 cluster_id,
982 } => TableDataSource::DataSource {
983 desc: DataSourceDesc::Webhook {
984 validate_using,
985 body_format,
986 headers,
987 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988 },
989 timeline,
990 },
991 o => {
992 unreachable!("CREATE TABLE data source got {:?}", o)
993 }
994 },
995 };
996
997 let is_webhook = if let TableDataSource::DataSource {
998 desc: DataSourceDesc::Webhook { .. },
999 timeline: _,
1000 } = &data_source
1001 {
1002 true
1003 } else {
1004 false
1005 };
1006
1007 let table = Table {
1008 create_sql: Some(table.create_sql),
1009 desc: table.desc,
1010 collections,
1011 conn_id: conn_id.cloned(),
1012 resolved_ids,
1013 custom_logical_compaction_window: table.compaction_window,
1014 is_retained_metrics_object: false,
1015 data_source,
1016 };
1017 let ops = vec![catalog::Op::CreateItem {
1018 id: table_id,
1019 name: name.clone(),
1020 item: CatalogItem::Table(table.clone()),
1021 owner_id: *ctx.session().current_role_id(),
1022 }];
1023
1024 let catalog_result = self
1025 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026 .await;
1027
1028 if is_webhook {
1029 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032 ctx.session()
1033 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034 }
1035 }
1036
1037 match catalog_result {
1038 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1041 })) if if_not_exists => {
1042 ctx.session_mut()
1043 .add_notice(AdapterNotice::ObjectAlreadyExists {
1044 name: name.item,
1045 ty: "table",
1046 });
1047 Ok(ExecuteResponse::CreatedTable)
1048 }
1049 Err(err) => Err(err),
1050 }
1051 }
1052
1053 #[instrument]
1054 pub(super) async fn sequence_create_sink(
1055 &mut self,
1056 ctx: ExecuteContext,
1057 plan: plan::CreateSinkPlan,
1058 resolved_ids: ResolvedIds,
1059 ) {
1060 let plan::CreateSinkPlan {
1061 name,
1062 sink,
1063 with_snapshot,
1064 if_not_exists,
1065 in_cluster,
1066 } = plan;
1067
1068 let id_ts = self.get_catalog_write_ts().await;
1070 let (item_id, global_id) =
1071 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1072
1073 let catalog_sink = Sink {
1074 create_sql: sink.create_sql,
1075 global_id,
1076 from: sink.from,
1077 connection: sink.connection,
1078 envelope: sink.envelope,
1079 version: sink.version,
1080 with_snapshot,
1081 resolved_ids,
1082 cluster_id: in_cluster,
1083 commit_interval: sink.commit_interval,
1084 };
1085
1086 let ops = vec![catalog::Op::CreateItem {
1087 id: item_id,
1088 name: name.clone(),
1089 item: CatalogItem::Sink(catalog_sink.clone()),
1090 owner_id: *ctx.session().current_role_id(),
1091 }];
1092
1093 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1094
1095 match result {
1096 Ok(()) => {}
1097 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1098 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1099 })) if if_not_exists => {
1100 ctx.session()
1101 .add_notice(AdapterNotice::ObjectAlreadyExists {
1102 name: name.item,
1103 ty: "sink",
1104 });
1105 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1106 return;
1107 }
1108 Err(e) => {
1109 ctx.retire(Err(e));
1110 return;
1111 }
1112 };
1113
1114 self.create_storage_export(global_id, &catalog_sink)
1115 .await
1116 .unwrap_or_terminate("cannot fail to create exports");
1117
1118 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1119 .await;
1120
1121 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1122 }
1123
1124 pub(super) fn validate_system_column_references(
1147 &self,
1148 uses_ambiguous_columns: bool,
1149 depends_on: &BTreeSet<GlobalId>,
1150 ) -> Result<(), AdapterError> {
1151 if uses_ambiguous_columns
1152 && depends_on
1153 .iter()
1154 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1155 {
1156 Err(AdapterError::AmbiguousSystemColumnReference)
1157 } else {
1158 Ok(())
1159 }
1160 }
1161
1162 #[instrument]
1163 pub(super) async fn sequence_create_type(
1164 &mut self,
1165 session: &Session,
1166 plan: plan::CreateTypePlan,
1167 resolved_ids: ResolvedIds,
1168 ) -> Result<ExecuteResponse, AdapterError> {
1169 let id_ts = self.get_catalog_write_ts().await;
1170 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1171 plan.typ
1173 .inner
1174 .desc(&self.catalog().for_session(session))
1175 .map_err(AdapterError::from)?;
1176 let typ = Type {
1177 create_sql: Some(plan.typ.create_sql),
1178 global_id,
1179 details: CatalogTypeDetails {
1180 array_id: None,
1181 typ: plan.typ.inner,
1182 pg_metadata: None,
1183 },
1184 resolved_ids,
1185 };
1186 let op = catalog::Op::CreateItem {
1187 id: item_id,
1188 name: plan.name,
1189 item: CatalogItem::Type(typ),
1190 owner_id: *session.current_role_id(),
1191 };
1192 match self.catalog_transact(Some(session), vec![op]).await {
1193 Ok(()) => Ok(ExecuteResponse::CreatedType),
1194 Err(err) => Err(err),
1195 }
1196 }
1197
1198 #[instrument]
1199 pub(super) async fn sequence_comment_on(
1200 &mut self,
1201 session: &Session,
1202 plan: plan::CommentPlan,
1203 ) -> Result<ExecuteResponse, AdapterError> {
1204 let op = catalog::Op::Comment {
1205 object_id: plan.object_id,
1206 sub_component: plan.sub_component,
1207 comment: plan.comment,
1208 };
1209 self.catalog_transact(Some(session), vec![op]).await?;
1210 Ok(ExecuteResponse::Comment)
1211 }
1212
1213 #[instrument]
1214 pub(super) async fn sequence_drop_objects(
1215 &mut self,
1216 ctx: &mut ExecuteContext,
1217 plan::DropObjectsPlan {
1218 drop_ids,
1219 object_type,
1220 referenced_ids,
1221 }: plan::DropObjectsPlan,
1222 ) -> Result<ExecuteResponse, AdapterError> {
1223 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1224 let mut objects = Vec::new();
1225 for obj_id in &drop_ids {
1226 if !referenced_ids_hashset.contains(obj_id) {
1227 let object_info = ErrorMessageObjectDescription::from_id(
1228 obj_id,
1229 &self.catalog().for_session(ctx.session()),
1230 )
1231 .to_string();
1232 objects.push(object_info);
1233 }
1234 }
1235
1236 if !objects.is_empty() {
1237 ctx.session()
1238 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1239 }
1240
1241 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1243 .iter()
1244 .filter_map(|id| match id {
1245 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1246 _ => None,
1247 })
1248 .flatten()
1249 .collect();
1250
1251 let DropOps {
1252 ops,
1253 dropped_active_db,
1254 dropped_active_cluster,
1255 dropped_in_use_indexes,
1256 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1257
1258 self.catalog_transact_with_context(None, Some(ctx), ops)
1259 .await?;
1260
1261 if !expr_cache_invalidate_ids.is_empty() {
1263 let _fut = self.catalog().update_expression_cache(
1264 Default::default(),
1265 Default::default(),
1266 expr_cache_invalidate_ids,
1267 );
1268 }
1269
1270 fail::fail_point!("after_sequencer_drop_replica");
1271
1272 if dropped_active_db {
1273 ctx.session()
1274 .add_notice(AdapterNotice::DroppedActiveDatabase {
1275 name: ctx.session().vars().database().to_string(),
1276 });
1277 }
1278 if dropped_active_cluster {
1279 ctx.session()
1280 .add_notice(AdapterNotice::DroppedActiveCluster {
1281 name: ctx.session().vars().cluster().to_string(),
1282 });
1283 }
1284 for dropped_in_use_index in dropped_in_use_indexes {
1285 ctx.session()
1286 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1287 self.metrics
1288 .optimization_notices
1289 .with_label_values(&["DroppedInUseIndex"])
1290 .inc_by(1);
1291 }
1292 Ok(ExecuteResponse::DroppedObject(object_type))
1293 }
1294
1295 fn validate_dropped_role_ownership(
1296 &self,
1297 session: &Session,
1298 dropped_roles: &BTreeMap<RoleId, &str>,
1299 ) -> Result<(), AdapterError> {
1300 fn privilege_check(
1301 privileges: &PrivilegeMap,
1302 dropped_roles: &BTreeMap<RoleId, &str>,
1303 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1304 object_id: &SystemObjectId,
1305 catalog: &ConnCatalog,
1306 ) {
1307 for privilege in privileges.all_values() {
1308 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1309 let grantor_name = catalog.get_role(&privilege.grantor).name();
1310 let object_description =
1311 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1312 dependent_objects
1313 .entry(role_name.to_string())
1314 .or_default()
1315 .push(format!(
1316 "privileges on {object_description} granted by {grantor_name}",
1317 ));
1318 }
1319 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1320 let grantee_name = catalog.get_role(&privilege.grantee).name();
1321 let object_description =
1322 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1323 dependent_objects
1324 .entry(role_name.to_string())
1325 .or_default()
1326 .push(format!(
1327 "privileges granted on {object_description} to {grantee_name}"
1328 ));
1329 }
1330 }
1331 }
1332
1333 let catalog = self.catalog().for_session(session);
1334 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1335 for entry in self.catalog.entries() {
1336 let id = SystemObjectId::Object(entry.id().into());
1337 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1338 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1339 dependent_objects
1340 .entry(role_name.to_string())
1341 .or_default()
1342 .push(format!("owner of {object_description}"));
1343 }
1344 privilege_check(
1345 entry.privileges(),
1346 dropped_roles,
1347 &mut dependent_objects,
1348 &id,
1349 &catalog,
1350 );
1351 }
1352 for database in self.catalog.databases() {
1353 let database_id = SystemObjectId::Object(database.id().into());
1354 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1355 let object_description =
1356 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1357 dependent_objects
1358 .entry(role_name.to_string())
1359 .or_default()
1360 .push(format!("owner of {object_description}"));
1361 }
1362 privilege_check(
1363 &database.privileges,
1364 dropped_roles,
1365 &mut dependent_objects,
1366 &database_id,
1367 &catalog,
1368 );
1369 for schema in database.schemas_by_id.values() {
1370 let schema_id = SystemObjectId::Object(
1371 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1372 );
1373 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1374 let object_description =
1375 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1376 dependent_objects
1377 .entry(role_name.to_string())
1378 .or_default()
1379 .push(format!("owner of {object_description}"));
1380 }
1381 privilege_check(
1382 &schema.privileges,
1383 dropped_roles,
1384 &mut dependent_objects,
1385 &schema_id,
1386 &catalog,
1387 );
1388 }
1389 }
1390 for cluster in self.catalog.clusters() {
1391 let cluster_id = SystemObjectId::Object(cluster.id().into());
1392 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1393 let object_description =
1394 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1395 dependent_objects
1396 .entry(role_name.to_string())
1397 .or_default()
1398 .push(format!("owner of {object_description}"));
1399 }
1400 privilege_check(
1401 &cluster.privileges,
1402 dropped_roles,
1403 &mut dependent_objects,
1404 &cluster_id,
1405 &catalog,
1406 );
1407 for replica in cluster.replicas() {
1408 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1409 let replica_id =
1410 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1411 let object_description =
1412 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1413 dependent_objects
1414 .entry(role_name.to_string())
1415 .or_default()
1416 .push(format!("owner of {object_description}"));
1417 }
1418 }
1419 }
1420 privilege_check(
1421 self.catalog().system_privileges(),
1422 dropped_roles,
1423 &mut dependent_objects,
1424 &SystemObjectId::System,
1425 &catalog,
1426 );
1427 for (default_privilege_object, default_privilege_acl_items) in
1428 self.catalog.default_privileges()
1429 {
1430 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1431 dependent_objects
1432 .entry(role_name.to_string())
1433 .or_default()
1434 .push(format!(
1435 "default privileges on {}S created by {}",
1436 default_privilege_object.object_type, role_name
1437 ));
1438 }
1439 for default_privilege_acl_item in default_privilege_acl_items {
1440 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1441 dependent_objects
1442 .entry(role_name.to_string())
1443 .or_default()
1444 .push(format!(
1445 "default privileges on {}S granted to {}",
1446 default_privilege_object.object_type, role_name
1447 ));
1448 }
1449 }
1450 }
1451
1452 if !dependent_objects.is_empty() {
1453 Err(AdapterError::DependentObject(dependent_objects))
1454 } else {
1455 Ok(())
1456 }
1457 }
1458
1459 #[instrument]
1460 pub(super) async fn sequence_drop_owned(
1461 &mut self,
1462 session: &Session,
1463 plan: plan::DropOwnedPlan,
1464 ) -> Result<ExecuteResponse, AdapterError> {
1465 for role_id in &plan.role_ids {
1466 self.catalog().ensure_not_reserved_role(role_id)?;
1467 }
1468
1469 let mut privilege_revokes = plan.privilege_revokes;
1470
1471 let session_catalog = self.catalog().for_session(session);
1473 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1474 && !session.is_superuser()
1475 {
1476 let role_membership =
1478 session_catalog.collect_role_membership(session.current_role_id());
1479 let invalid_revokes: BTreeSet<_> = privilege_revokes
1480 .extract_if(.., |(_, privilege)| {
1481 !role_membership.contains(&privilege.grantor)
1482 })
1483 .map(|(object_id, _)| object_id)
1484 .collect();
1485 for invalid_revoke in invalid_revokes {
1486 let object_description =
1487 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1488 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1489 }
1490 }
1491
1492 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1493 catalog::Op::UpdatePrivilege {
1494 target_id: object_id,
1495 privilege,
1496 variant: UpdatePrivilegeVariant::Revoke,
1497 }
1498 });
1499 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1500 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1501 privilege_object,
1502 privilege_acl_item,
1503 variant: UpdatePrivilegeVariant::Revoke,
1504 },
1505 );
1506 let DropOps {
1507 ops: drop_ops,
1508 dropped_active_db,
1509 dropped_active_cluster,
1510 dropped_in_use_indexes,
1511 } = self.sequence_drop_common(session, plan.drop_ids)?;
1512
1513 let ops = privilege_revoke_ops
1514 .chain(default_privilege_revoke_ops)
1515 .chain(drop_ops.into_iter())
1516 .collect();
1517
1518 self.catalog_transact(Some(session), ops).await?;
1519
1520 if dropped_active_db {
1521 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1522 name: session.vars().database().to_string(),
1523 });
1524 }
1525 if dropped_active_cluster {
1526 session.add_notice(AdapterNotice::DroppedActiveCluster {
1527 name: session.vars().cluster().to_string(),
1528 });
1529 }
1530 for dropped_in_use_index in dropped_in_use_indexes {
1531 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1532 }
1533 Ok(ExecuteResponse::DroppedOwned)
1534 }
1535
1536 fn sequence_drop_common(
1537 &self,
1538 session: &Session,
1539 ids: Vec<ObjectId>,
1540 ) -> Result<DropOps, AdapterError> {
1541 let mut dropped_active_db = false;
1542 let mut dropped_active_cluster = false;
1543 let mut dropped_in_use_indexes = Vec::new();
1544 let mut dropped_roles = BTreeMap::new();
1545 let mut dropped_databases = BTreeSet::new();
1546 let mut dropped_schemas = BTreeSet::new();
1547 let mut role_revokes = BTreeSet::new();
1551 let mut default_privilege_revokes = BTreeSet::new();
1554
1555 let mut clusters_to_drop = BTreeSet::new();
1557
1558 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1559 for id in &ids {
1560 match id {
1561 ObjectId::Database(id) => {
1562 let name = self.catalog().get_database(id).name();
1563 if name == session.vars().database() {
1564 dropped_active_db = true;
1565 }
1566 dropped_databases.insert(id);
1567 }
1568 ObjectId::Schema((_, spec)) => {
1569 if let SchemaSpecifier::Id(id) = spec {
1570 dropped_schemas.insert(id);
1571 }
1572 }
1573 ObjectId::Cluster(id) => {
1574 clusters_to_drop.insert(*id);
1575 if let Some(active_id) = self
1576 .catalog()
1577 .active_cluster(session)
1578 .ok()
1579 .map(|cluster| cluster.id())
1580 {
1581 if id == &active_id {
1582 dropped_active_cluster = true;
1583 }
1584 }
1585 }
1586 ObjectId::Role(id) => {
1587 let role = self.catalog().get_role(id);
1588 let name = role.name();
1589 dropped_roles.insert(*id, name);
1590 for (group_id, grantor_id) in &role.membership.map {
1592 role_revokes.insert((*group_id, *id, *grantor_id));
1593 }
1594 }
1595 ObjectId::Item(id) => {
1596 if let Some(index) = self.catalog().get_entry(id).index() {
1597 let humanizer = self.catalog().for_session(session);
1598 let dependants = self
1599 .controller
1600 .compute
1601 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1602 .ok()
1603 .into_iter()
1604 .flatten()
1605 .filter(|dependant_id| {
1606 if dependant_id.is_transient() {
1613 return false;
1614 }
1615 let Some(dependent_id) = humanizer
1617 .try_get_item_by_global_id(dependant_id)
1618 .map(|item| item.id())
1619 else {
1620 return false;
1621 };
1622 !ids_set.contains(&ObjectId::Item(dependent_id))
1625 })
1626 .flat_map(|dependant_id| {
1627 humanizer.humanize_id(dependant_id)
1631 })
1632 .collect_vec();
1633 if !dependants.is_empty() {
1634 dropped_in_use_indexes.push(DroppedInUseIndex {
1635 index_name: humanizer
1636 .humanize_id(index.global_id())
1637 .unwrap_or_else(|| id.to_string()),
1638 dependant_objects: dependants,
1639 });
1640 }
1641 }
1642 }
1643 _ => {}
1644 }
1645 }
1646
1647 for id in &ids {
1648 match id {
1649 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1653 if !clusters_to_drop.contains(cluster_id) {
1654 let cluster = self.catalog.get_cluster(*cluster_id);
1655 if cluster.is_managed() {
1656 let replica =
1657 cluster.replica(*replica_id).expect("Catalog out of sync");
1658 if !replica.config.location.internal() {
1659 coord_bail!("cannot drop replica of managed cluster");
1660 }
1661 }
1662 }
1663 }
1664 _ => {}
1665 }
1666 }
1667
1668 for role_id in dropped_roles.keys() {
1669 self.catalog().ensure_not_reserved_role(role_id)?;
1670 }
1671 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1672 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1674 for role in self.catalog().user_roles() {
1675 for dropped_role_id in
1676 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1677 {
1678 role_revokes.insert((
1679 **dropped_role_id,
1680 role.id(),
1681 *role
1682 .membership
1683 .map
1684 .get(*dropped_role_id)
1685 .expect("included in keys above"),
1686 ));
1687 }
1688 }
1689
1690 for (default_privilege_object, default_privilege_acls) in
1691 self.catalog().default_privileges()
1692 {
1693 if matches!(
1694 &default_privilege_object.database_id,
1695 Some(database_id) if dropped_databases.contains(database_id),
1696 ) || matches!(
1697 &default_privilege_object.schema_id,
1698 Some(schema_id) if dropped_schemas.contains(schema_id),
1699 ) {
1700 for default_privilege_acl in default_privilege_acls {
1701 default_privilege_revokes.insert((
1702 default_privilege_object.clone(),
1703 default_privilege_acl.clone(),
1704 ));
1705 }
1706 }
1707 }
1708
1709 let ops = role_revokes
1710 .into_iter()
1711 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1712 role_id,
1713 member_id,
1714 grantor_id,
1715 })
1716 .chain(default_privilege_revokes.into_iter().map(
1717 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1718 privilege_object,
1719 privilege_acl_item,
1720 variant: UpdatePrivilegeVariant::Revoke,
1721 },
1722 ))
1723 .chain(iter::once(catalog::Op::DropObjects(
1724 ids.into_iter()
1725 .map(DropObjectInfo::manual_drop_from_object_id)
1726 .collect(),
1727 )))
1728 .collect();
1729
1730 Ok(DropOps {
1731 ops,
1732 dropped_active_db,
1733 dropped_active_cluster,
1734 dropped_in_use_indexes,
1735 })
1736 }
1737
1738 pub(super) fn sequence_explain_schema(
1739 &self,
1740 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1741 ) -> Result<ExecuteResponse, AdapterError> {
1742 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1743 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1744 })?;
1745
1746 let json_string = json_string(&json_value);
1747 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1748 Ok(Self::send_immediate_rows(row))
1749 }
1750
1751 pub(super) fn sequence_show_all_variables(
1752 &self,
1753 session: &Session,
1754 ) -> Result<ExecuteResponse, AdapterError> {
1755 let mut rows = viewable_variables(self.catalog().state(), session)
1756 .map(|v| (v.name(), v.value(), v.description()))
1757 .collect::<Vec<_>>();
1758 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1759
1760 let rows: Vec<_> = rows
1762 .into_iter()
1763 .map(|(name, val, desc)| {
1764 Row::pack_slice(&[
1765 Datum::String(name),
1766 Datum::String(&val),
1767 Datum::String(desc),
1768 ])
1769 })
1770 .collect();
1771 Ok(Self::send_immediate_rows(rows))
1772 }
1773
1774 pub(super) fn sequence_show_variable(
1775 &self,
1776 session: &Session,
1777 plan: plan::ShowVariablePlan,
1778 ) -> Result<ExecuteResponse, AdapterError> {
1779 if &plan.name == SCHEMA_ALIAS {
1780 let schemas = self.catalog.resolve_search_path(session);
1781 let schema = schemas.first();
1782 return match schema {
1783 Some((database_spec, schema_spec)) => {
1784 let schema_name = &self
1785 .catalog
1786 .get_schema(database_spec, schema_spec, session.conn_id())
1787 .name()
1788 .schema;
1789 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1790 Ok(Self::send_immediate_rows(row))
1791 }
1792 None => {
1793 if session.vars().current_object_missing_warnings() {
1794 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1795 search_path: session
1796 .vars()
1797 .search_path()
1798 .into_iter()
1799 .map(|schema| schema.to_string())
1800 .collect(),
1801 });
1802 }
1803 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1804 }
1805 };
1806 }
1807
1808 let variable = session
1809 .vars()
1810 .get(self.catalog().system_config(), &plan.name)
1811 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1812
1813 variable.visible(session.user(), self.catalog().system_config())?;
1816
1817 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1818 if variable.name() == vars::DATABASE.name()
1819 && matches!(
1820 self.catalog().resolve_database(&variable.value()),
1821 Err(CatalogError::UnknownDatabase(_))
1822 )
1823 && session.vars().current_object_missing_warnings()
1824 {
1825 let name = variable.value();
1826 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1827 } else if variable.name() == vars::CLUSTER.name()
1828 && matches!(
1829 self.catalog().resolve_cluster(&variable.value()),
1830 Err(CatalogError::UnknownCluster(_))
1831 )
1832 && session.vars().current_object_missing_warnings()
1833 {
1834 let name = variable.value();
1835 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1836 }
1837 Ok(Self::send_immediate_rows(row))
1838 }
1839
1840 #[instrument]
1841 pub(super) async fn sequence_inspect_shard(
1842 &self,
1843 session: &Session,
1844 plan: plan::InspectShardPlan,
1845 ) -> Result<ExecuteResponse, AdapterError> {
1846 if !session.user().is_internal() {
1849 return Err(AdapterError::Unauthorized(
1850 rbac::UnauthorizedError::MzSystem {
1851 action: "inspect".into(),
1852 },
1853 ));
1854 }
1855 let state = self
1856 .controller
1857 .storage
1858 .inspect_persist_state(plan.id)
1859 .await?;
1860 let jsonb = Jsonb::from_serde_json(state)?;
1861 Ok(Self::send_immediate_rows(jsonb.into_row()))
1862 }
1863
1864 #[instrument]
1865 pub(super) fn sequence_set_variable(
1866 &self,
1867 session: &mut Session,
1868 plan: plan::SetVariablePlan,
1869 ) -> Result<ExecuteResponse, AdapterError> {
1870 let (name, local) = (plan.name, plan.local);
1871 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1872 self.validate_set_isolation_level(session)?;
1873 }
1874 if &name == vars::CLUSTER.name() {
1875 self.validate_set_cluster(session)?;
1876 }
1877
1878 let vars = session.vars_mut();
1879 let values = match plan.value {
1880 plan::VariableValue::Default => None,
1881 plan::VariableValue::Values(values) => Some(values),
1882 };
1883
1884 match values {
1885 Some(values) => {
1886 vars.set(
1887 self.catalog().system_config(),
1888 &name,
1889 VarInput::SqlSet(&values),
1890 local,
1891 )?;
1892
1893 let vars = session.vars();
1894
1895 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1898 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1899 } else if name == vars::CLUSTER.name()
1900 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1901 {
1902 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1903 }
1904
1905 if name.as_str() == vars::DATABASE.name()
1907 && matches!(
1908 self.catalog().resolve_database(vars.database()),
1909 Err(CatalogError::UnknownDatabase(_))
1910 )
1911 && session.vars().current_object_missing_warnings()
1912 {
1913 let name = vars.database().to_string();
1914 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1915 } else if name.as_str() == vars::CLUSTER.name()
1916 && matches!(
1917 self.catalog().resolve_cluster(vars.cluster()),
1918 Err(CatalogError::UnknownCluster(_))
1919 )
1920 && session.vars().current_object_missing_warnings()
1921 {
1922 let name = vars.cluster().to_string();
1923 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1924 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1925 let v = values.into_first().to_lowercase();
1926 if v == IsolationLevel::ReadUncommitted.as_str()
1927 || v == IsolationLevel::ReadCommitted.as_str()
1928 || v == IsolationLevel::RepeatableRead.as_str()
1929 {
1930 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1931 isolation_level: v,
1932 });
1933 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1934 session.add_notice(AdapterNotice::StrongSessionSerializable);
1935 }
1936 }
1937 }
1938 None => vars.reset(self.catalog().system_config(), &name, local)?,
1939 }
1940
1941 Ok(ExecuteResponse::SetVariable { name, reset: false })
1942 }
1943
1944 pub(super) fn sequence_reset_variable(
1945 &self,
1946 session: &mut Session,
1947 plan: plan::ResetVariablePlan,
1948 ) -> Result<ExecuteResponse, AdapterError> {
1949 let name = plan.name;
1950 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1951 self.validate_set_isolation_level(session)?;
1952 }
1953 if &name == vars::CLUSTER.name() {
1954 self.validate_set_cluster(session)?;
1955 }
1956 session
1957 .vars_mut()
1958 .reset(self.catalog().system_config(), &name, false)?;
1959 Ok(ExecuteResponse::SetVariable { name, reset: true })
1960 }
1961
1962 pub(super) fn sequence_set_transaction(
1963 &self,
1964 session: &mut Session,
1965 plan: plan::SetTransactionPlan,
1966 ) -> Result<ExecuteResponse, AdapterError> {
1967 for mode in plan.modes {
1969 match mode {
1970 TransactionMode::AccessMode(_) => {
1971 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1972 }
1973 TransactionMode::IsolationLevel(isolation_level) => {
1974 self.validate_set_isolation_level(session)?;
1975
1976 session.vars_mut().set(
1977 self.catalog().system_config(),
1978 TRANSACTION_ISOLATION_VAR_NAME,
1979 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1980 plan.local,
1981 )?
1982 }
1983 }
1984 }
1985 Ok(ExecuteResponse::SetVariable {
1986 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1987 reset: false,
1988 })
1989 }
1990
1991 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1992 if session.transaction().contains_ops() {
1993 Err(AdapterError::InvalidSetIsolationLevel)
1994 } else {
1995 Ok(())
1996 }
1997 }
1998
1999 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2000 if session.transaction().contains_ops() {
2001 Err(AdapterError::InvalidSetCluster)
2002 } else {
2003 Ok(())
2004 }
2005 }
2006
2007 #[instrument]
2008 pub(super) async fn sequence_end_transaction(
2009 &mut self,
2010 mut ctx: ExecuteContext,
2011 mut action: EndTransactionAction,
2012 ) {
2013 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2015 (&action, ctx.session().transaction())
2016 {
2017 action = EndTransactionAction::Rollback;
2018 }
2019 let response = match action {
2020 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2021 params: BTreeMap::new(),
2022 }),
2023 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2024 params: BTreeMap::new(),
2025 }),
2026 };
2027
2028 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2029
2030 let (response, action) = match result {
2031 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2032 (response, action)
2033 }
2034 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2035 let validated_locks = match write_lock_guards {
2039 None => None,
2040 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2041 Ok(locks) => Some(locks),
2042 Err(missing) => {
2043 tracing::error!(?missing, "programming error, missing write locks");
2044 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2045 }
2046 },
2047 };
2048
2049 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2050 for WriteOp { id, rows } in writes {
2051 let total_rows = collected_writes.entry(id).or_default();
2052 total_rows.push(rows);
2053 }
2054
2055 self.submit_write(PendingWriteTxn::User {
2056 span: Span::current(),
2057 writes: collected_writes,
2058 write_locks: validated_locks,
2059 pending_txn: PendingTxn {
2060 ctx,
2061 response,
2062 action,
2063 },
2064 });
2065 return;
2066 }
2067 Ok((
2068 Some(TransactionOps::Peeks {
2069 determination,
2070 requires_linearization: RequireLinearization::Required,
2071 ..
2072 }),
2073 _,
2074 )) if ctx.session().vars().transaction_isolation()
2075 == &IsolationLevel::StrictSerializable =>
2076 {
2077 let conn_id = ctx.session().conn_id().clone();
2078 let pending_read_txn = PendingReadTxn {
2079 txn: PendingRead::Read {
2080 txn: PendingTxn {
2081 ctx,
2082 response,
2083 action,
2084 },
2085 },
2086 timestamp_context: determination.timestamp_context,
2087 created: Instant::now(),
2088 num_requeues: 0,
2089 otel_ctx: OpenTelemetryContext::obtain(),
2090 };
2091 self.strict_serializable_reads_tx
2092 .send((conn_id, pending_read_txn))
2093 .expect("sending to strict_serializable_reads_tx cannot fail");
2094 return;
2095 }
2096 Ok((
2097 Some(TransactionOps::Peeks {
2098 determination,
2099 requires_linearization: RequireLinearization::Required,
2100 ..
2101 }),
2102 _,
2103 )) if ctx.session().vars().transaction_isolation()
2104 == &IsolationLevel::StrongSessionSerializable =>
2105 {
2106 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2107 ctx.session_mut()
2108 .ensure_timestamp_oracle(timeline.clone())
2109 .apply_write(*ts);
2110 }
2111 (response, action)
2112 }
2113 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2114 self.internal_cmd_tx
2115 .send(Message::ExecuteSingleStatementTransaction {
2116 ctx,
2117 otel_ctx: OpenTelemetryContext::obtain(),
2118 stmt,
2119 params,
2120 })
2121 .expect("must send");
2122 return;
2123 }
2124 Ok((_, _)) => (response, action),
2125 Err(err) => (Err(err), EndTransactionAction::Rollback),
2126 };
2127 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2128 let response = response.map(|mut r| {
2130 r.extend_params(changed);
2131 ExecuteResponse::from(r)
2132 });
2133
2134 ctx.retire(response);
2135 }
2136
2137 #[instrument]
2138 async fn sequence_end_transaction_inner(
2139 &mut self,
2140 ctx: &mut ExecuteContext,
2141 action: EndTransactionAction,
2142 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2143 let txn = self.clear_transaction(ctx.session_mut()).await;
2144
2145 if let EndTransactionAction::Commit = action {
2146 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2147 match &mut ops {
2148 TransactionOps::Writes(writes) => {
2149 for WriteOp { id, .. } in &mut writes.iter() {
2150 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2152 AdapterError::Catalog(mz_catalog::memory::error::Error {
2153 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2154 })
2155 })?;
2156 }
2157
2158 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2160 }
2161 TransactionOps::DDL {
2162 ops,
2163 state: _,
2164 side_effects,
2165 revision,
2166 } => {
2167 if *revision != self.catalog().transient_revision() {
2169 return Err(AdapterError::DDLTransactionRace);
2170 }
2171 let ops = std::mem::take(ops);
2173 let side_effects = std::mem::take(side_effects);
2174 self.catalog_transact_with_side_effects(
2175 Some(ctx),
2176 ops,
2177 move |a, mut ctx| {
2178 Box::pin(async move {
2179 for side_effect in side_effects {
2180 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2181 }
2182 })
2183 },
2184 )
2185 .await?;
2186 }
2187 _ => (),
2188 }
2189 return Ok((Some(ops), write_lock_guards));
2190 }
2191 }
2192
2193 Ok((None, None))
2194 }
2195
2196 pub(super) async fn sequence_side_effecting_func(
2197 &mut self,
2198 ctx: ExecuteContext,
2199 plan: SideEffectingFunc,
2200 ) {
2201 match plan {
2202 SideEffectingFunc::PgCancelBackend { connection_id } => {
2203 if ctx.session().conn_id().unhandled() == connection_id {
2204 ctx.retire(Err(AdapterError::Canceled));
2208 return;
2209 }
2210
2211 let res = if let Some((id_handle, _conn_meta)) =
2212 self.active_conns.get_key_value(&connection_id)
2213 {
2214 self.handle_privileged_cancel(id_handle.clone()).await;
2216 Datum::True
2217 } else {
2218 Datum::False
2219 };
2220 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2221 }
2222 }
2223 }
2224
2225 pub(crate) async fn execute_side_effecting_func(
2234 &mut self,
2235 plan: SideEffectingFunc,
2236 conn_id: ConnectionId,
2237 current_role: RoleId,
2238 ) -> Result<ExecuteResponse, AdapterError> {
2239 match plan {
2240 SideEffectingFunc::PgCancelBackend { connection_id } => {
2241 if conn_id.unhandled() == connection_id {
2242 return Err(AdapterError::Canceled);
2246 }
2247
2248 if let Some((_id_handle, conn_meta)) =
2251 self.active_conns.get_key_value(&connection_id)
2252 {
2253 let target_role = *conn_meta.authenticated_role_id();
2254 let role_membership = self
2255 .catalog()
2256 .state()
2257 .collect_role_membership(¤t_role);
2258 if !role_membership.contains(&target_role) {
2259 let target_role_name = self
2260 .catalog()
2261 .try_get_role(&target_role)
2262 .map(|role| role.name().to_string())
2263 .unwrap_or_else(|| target_role.to_string());
2264 return Err(AdapterError::Unauthorized(
2265 rbac::UnauthorizedError::RoleMembership {
2266 role_names: vec![target_role_name],
2267 },
2268 ));
2269 }
2270
2271 let id_handle = self
2273 .active_conns
2274 .get_key_value(&connection_id)
2275 .map(|(id, _)| id.clone())
2276 .expect("checked above");
2277 self.handle_privileged_cancel(id_handle).await;
2278 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2279 } else {
2280 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2282 }
2283 }
2284 }
2285 }
2286
2287 pub(crate) async fn determine_real_time_recent_timestamp(
2291 &self,
2292 source_ids: impl Iterator<Item = GlobalId>,
2293 real_time_recency_timeout: Duration,
2294 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2295 let item_ids = source_ids
2296 .map(|gid| {
2297 self.catalog
2298 .try_resolve_item_id(&gid)
2299 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2300 })
2301 .collect::<Result<Vec<_>, _>>()?;
2302
2303 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2309 if to_visit.is_empty() {
2312 return Ok(None);
2313 }
2314
2315 let mut timestamp_objects = BTreeSet::new();
2316
2317 while let Some(id) = to_visit.pop_front() {
2318 timestamp_objects.insert(id);
2319 to_visit.extend(
2320 self.catalog()
2321 .get_entry(&id)
2322 .uses()
2323 .into_iter()
2324 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2325 );
2326 }
2327 let timestamp_objects = timestamp_objects
2328 .into_iter()
2329 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2330 .collect();
2331
2332 let r = self
2333 .controller
2334 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2335 .await?;
2336
2337 Ok(Some(r))
2338 }
2339
2340 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2343 &self,
2344 session: &Session,
2345 source_ids: impl Iterator<Item = GlobalId>,
2346 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2347 let vars = session.vars();
2348
2349 if vars.real_time_recency()
2350 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2351 && !session.contains_read_timestamp()
2352 {
2353 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2354 .await
2355 } else {
2356 Ok(None)
2357 }
2358 }
2359
2360 #[instrument]
2361 pub(super) async fn sequence_explain_plan(
2362 &mut self,
2363 ctx: ExecuteContext,
2364 plan: plan::ExplainPlanPlan,
2365 target_cluster: TargetCluster,
2366 ) {
2367 match &plan.explainee {
2368 plan::Explainee::Statement(stmt) => match stmt {
2369 plan::ExplaineeStatement::CreateView { .. } => {
2370 self.explain_create_view(ctx, plan).await;
2371 }
2372 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2373 self.explain_create_materialized_view(ctx, plan).await;
2374 }
2375 plan::ExplaineeStatement::CreateIndex { .. } => {
2376 self.explain_create_index(ctx, plan).await;
2377 }
2378 plan::ExplaineeStatement::Select { .. } => {
2379 self.explain_peek(ctx, plan, target_cluster).await;
2380 }
2381 plan::ExplaineeStatement::Subscribe { .. } => {
2382 self.explain_subscribe(ctx, plan, target_cluster).await;
2383 }
2384 },
2385 plan::Explainee::View(_) => {
2386 let result = self.explain_view(&ctx, plan);
2387 ctx.retire(result);
2388 }
2389 plan::Explainee::MaterializedView(_) => {
2390 let result = self.explain_materialized_view(&ctx, plan);
2391 ctx.retire(result);
2392 }
2393 plan::Explainee::Index(_) => {
2394 let result = self.explain_index(&ctx, plan);
2395 ctx.retire(result);
2396 }
2397 plan::Explainee::ReplanView(_) => {
2398 self.explain_replan_view(ctx, plan).await;
2399 }
2400 plan::Explainee::ReplanMaterializedView(_) => {
2401 self.explain_replan_materialized_view(ctx, plan).await;
2402 }
2403 plan::Explainee::ReplanIndex(_) => {
2404 self.explain_replan_index(ctx, plan).await;
2405 }
2406 };
2407 }
2408
2409 pub(super) async fn sequence_explain_pushdown(
2410 &mut self,
2411 ctx: ExecuteContext,
2412 plan: plan::ExplainPushdownPlan,
2413 target_cluster: TargetCluster,
2414 ) {
2415 match plan.explainee {
2416 Explainee::Statement(ExplaineeStatement::Select {
2417 broken: false,
2418 plan,
2419 desc: _,
2420 }) => {
2421 let stage = return_if_err!(
2422 self.peek_validate(
2423 ctx.session(),
2424 plan,
2425 target_cluster,
2426 None,
2427 ExplainContext::Pushdown,
2428 Some(ctx.session().vars().max_query_result_size()),
2429 ),
2430 ctx
2431 );
2432 self.sequence_staged(ctx, Span::current(), stage).await;
2433 }
2434 Explainee::MaterializedView(item_id) => {
2435 self.explain_pushdown_materialized_view(ctx, item_id).await;
2436 }
2437 _ => {
2438 ctx.retire(Err(AdapterError::Unsupported(
2439 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2440 )));
2441 }
2442 };
2443 }
2444
2445 async fn execute_explain_pushdown_with_read_holds(
2447 &self,
2448 ctx: ExecuteContext,
2449 as_of: Antichain<Timestamp>,
2450 mz_now: ResultSpec<'static>,
2451 read_holds: Option<ReadHolds<Timestamp>>,
2452 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2453 ) {
2454 let fut = self
2455 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2456 .await;
2457 task::spawn(|| "render explain pushdown", async move {
2458 let _read_holds = read_holds;
2460 let res = fut.await;
2461 ctx.retire(res);
2462 });
2463 }
2464
2465 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2467 &self,
2468 session: &Session,
2469 as_of: Antichain<Timestamp>,
2470 mz_now: ResultSpec<'static>,
2471 imports: I,
2472 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2473 super::explain_pushdown_future_inner(
2475 session,
2476 &self.catalog,
2477 &self.controller.storage_collections,
2478 as_of,
2479 mz_now,
2480 imports,
2481 )
2482 .await
2483 }
2484
2485 #[instrument]
2486 pub(super) async fn sequence_insert(
2487 &mut self,
2488 mut ctx: ExecuteContext,
2489 plan: plan::InsertPlan,
2490 ) {
2491 if !ctx.session_mut().transaction().allows_writes() {
2499 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2500 return;
2501 }
2502
2503 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2517 let expr = return_if_err!(
2520 plan.values
2521 .clone()
2522 .lower(self.catalog().system_config(), None),
2523 ctx
2524 );
2525 OptimizedMirRelationExpr(expr)
2526 } else {
2527 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2529
2530 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2532
2533 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2535 };
2536
2537 match optimized_mir.into_inner() {
2538 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2539 let catalog = self.owned_catalog();
2540 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2541 let result =
2542 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2543 ctx.retire(result);
2544 });
2545 }
2546 _ => {
2548 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2549 Some(table) => {
2550 let desc = table.relation_desc_latest().expect("table has a desc");
2552 desc.arity()
2553 }
2554 None => {
2555 ctx.retire(Err(AdapterError::Catalog(
2556 mz_catalog::memory::error::Error {
2557 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2558 plan.id.to_string(),
2559 )),
2560 },
2561 )));
2562 return;
2563 }
2564 };
2565
2566 let finishing = RowSetFinishing {
2567 order_by: vec![],
2568 limit: None,
2569 offset: 0,
2570 project: (0..desc_arity).collect(),
2571 };
2572
2573 let read_then_write_plan = plan::ReadThenWritePlan {
2574 id: plan.id,
2575 selection: plan.values,
2576 finishing,
2577 assignments: BTreeMap::new(),
2578 kind: MutationKind::Insert,
2579 returning: plan.returning,
2580 };
2581
2582 self.sequence_read_then_write(ctx, read_then_write_plan)
2583 .await;
2584 }
2585 }
2586 }
2587
2588 #[instrument]
2593 pub(super) async fn sequence_read_then_write(
2594 &mut self,
2595 mut ctx: ExecuteContext,
2596 plan: plan::ReadThenWritePlan,
2597 ) {
2598 let mut source_ids: BTreeSet<_> = plan
2599 .selection
2600 .depends_on()
2601 .into_iter()
2602 .map(|gid| self.catalog().resolve_item_id(&gid))
2603 .collect();
2604 source_ids.insert(plan.id);
2605
2606 if ctx.session().transaction().write_locks().is_none() {
2608 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2610
2611 for id in &source_ids {
2613 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2614 write_locks.insert_lock(*id, lock);
2615 }
2616 }
2617
2618 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2620 Ok(locks) => locks,
2621 Err(missing) => {
2622 let role_metadata = ctx.session().role_metadata().clone();
2624 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2625 let plan = DeferredPlan {
2626 ctx,
2627 plan: Plan::ReadThenWrite(plan),
2628 validity: PlanValidity::new(
2629 self.catalog.transient_revision(),
2630 source_ids.clone(),
2631 None,
2632 None,
2633 role_metadata,
2634 ),
2635 requires_locks: source_ids,
2636 };
2637 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2638 }
2639 };
2640
2641 ctx.session_mut()
2642 .try_grant_write_locks(write_locks)
2643 .expect("session has already been granted write locks");
2644 }
2645
2646 let plan::ReadThenWritePlan {
2647 id,
2648 kind,
2649 selection,
2650 mut assignments,
2651 finishing,
2652 mut returning,
2653 } = plan;
2654
2655 let desc = match self.catalog().try_get_entry(&id) {
2657 Some(table) => {
2658 table
2660 .relation_desc_latest()
2661 .expect("table has a desc")
2662 .into_owned()
2663 }
2664 None => {
2665 ctx.retire(Err(AdapterError::Catalog(
2666 mz_catalog::memory::error::Error {
2667 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2668 },
2669 )));
2670 return;
2671 }
2672 };
2673
2674 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2676 || assignments.values().any(|e| e.contains_temporal())
2677 || returning.iter().any(|e| e.contains_temporal());
2678 if contains_temporal {
2679 ctx.retire(Err(AdapterError::Unsupported(
2680 "calls to mz_now in write statements",
2681 )));
2682 return;
2683 }
2684
2685 fn validate_read_dependencies(
2693 catalog: &Catalog,
2694 id: &CatalogItemId,
2695 ) -> Result<(), AdapterError> {
2696 use CatalogItemType::*;
2697 use mz_catalog::memory::objects;
2698 let mut ids_to_check = Vec::new();
2699 let valid = match catalog.try_get_entry(id) {
2700 Some(entry) => {
2701 if let CatalogItem::View(objects::View { optimized_expr, .. })
2702 | CatalogItem::MaterializedView(objects::MaterializedView {
2703 optimized_expr,
2704 ..
2705 }) = entry.item()
2706 {
2707 if optimized_expr.contains_temporal() {
2708 return Err(AdapterError::Unsupported(
2709 "calls to mz_now in write statements",
2710 ));
2711 }
2712 }
2713 match entry.item().typ() {
2714 typ @ (Func | View | MaterializedView | ContinualTask) => {
2715 ids_to_check.extend(entry.uses());
2716 let valid_id = id.is_user() || matches!(typ, Func);
2717 valid_id
2718 }
2719 Source | Secret | Connection => false,
2720 Sink | Index => unreachable!(),
2722 Table => {
2723 if !id.is_user() {
2724 false
2726 } else {
2727 entry.source_export_details().is_none()
2729 }
2730 }
2731 Type => true,
2732 }
2733 }
2734 None => false,
2735 };
2736 if !valid {
2737 let (object_name, object_type) = match catalog.try_get_entry(id) {
2738 Some(entry) => {
2739 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2740 let object_type = match entry.item().typ() {
2741 Source => "source",
2743 Secret => "secret",
2744 Connection => "connection",
2745 Table => {
2746 if !id.is_user() {
2747 "system table"
2748 } else {
2749 "source-export table"
2750 }
2751 }
2752 View => "system view",
2753 MaterializedView => "system materialized view",
2754 ContinualTask => "system task",
2755 _ => "invalid dependency",
2756 };
2757 (object_name, object_type.to_string())
2758 }
2759 None => (id.to_string(), "unknown".to_string()),
2760 };
2761 return Err(AdapterError::InvalidTableMutationSelection {
2762 object_name,
2763 object_type,
2764 });
2765 }
2766 for id in ids_to_check {
2767 validate_read_dependencies(catalog, &id)?;
2768 }
2769 Ok(())
2770 }
2771
2772 for gid in selection.depends_on() {
2773 let item_id = self.catalog().resolve_item_id(&gid);
2774 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2775 ctx.retire(Err(err));
2776 return;
2777 }
2778 }
2779
2780 let (peek_tx, peek_rx) = oneshot::channel();
2781 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2782 let (tx, _, session, extra) = ctx.into_parts();
2783 let peek_ctx = ExecuteContext::from_parts(
2795 peek_client_tx,
2796 self.internal_cmd_tx.clone(),
2797 session,
2798 Default::default(),
2799 );
2800
2801 self.sequence_peek(
2802 peek_ctx,
2803 plan::SelectPlan {
2804 select: None,
2805 source: selection,
2806 when: QueryWhen::FreshestTableWrite,
2807 finishing,
2808 copy_to: None,
2809 },
2810 TargetCluster::Active,
2811 None,
2812 )
2813 .await;
2814
2815 let internal_cmd_tx = self.internal_cmd_tx.clone();
2816 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2817 let catalog = self.owned_catalog();
2818 let max_result_size = self.catalog().system_config().max_result_size();
2819
2820 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2821 let (peek_response, session) = match peek_rx.await {
2822 Ok(Response {
2823 result: Ok(resp),
2824 session,
2825 otel_ctx,
2826 }) => {
2827 otel_ctx.attach_as_parent();
2828 (resp, session)
2829 }
2830 Ok(Response {
2831 result: Err(e),
2832 session,
2833 otel_ctx,
2834 }) => {
2835 let ctx =
2836 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2837 otel_ctx.attach_as_parent();
2838 ctx.retire(Err(e));
2839 return;
2840 }
2841 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2843 };
2844 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2845 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2846
2847 if timeout_dur == Duration::ZERO {
2849 timeout_dur = Duration::MAX;
2850 }
2851
2852 let style = ExprPrepOneShot {
2853 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2855 catalog_state: catalog.state(),
2856 };
2857 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2858 return_if_err!(style.prep_scalar_expr(expr), ctx);
2859 }
2860
2861 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2862 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2863 let arena = RowArena::new();
2864 let mut diffs = Vec::new();
2865 let mut datum_vec = mz_repr::DatumVec::new();
2866
2867 while let Some(row) = rows.next() {
2868 if !assignments.is_empty() {
2869 assert!(
2870 matches!(kind, MutationKind::Update),
2871 "only updates support assignments"
2872 );
2873 let mut datums = datum_vec.borrow_with(row);
2874 let mut updates = vec![];
2875 for (idx, expr) in &assignments {
2876 let updated = match expr.eval(&datums, &arena) {
2877 Ok(updated) => updated,
2878 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2879 };
2880 updates.push((*idx, updated));
2881 }
2882 for (idx, new_value) in updates {
2883 datums[idx] = new_value;
2884 }
2885 let updated = Row::pack_slice(&datums);
2886 diffs.push((updated, Diff::ONE));
2887 }
2888 match kind {
2889 MutationKind::Update | MutationKind::Delete => {
2893 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2894 }
2895 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2896 }
2897 }
2898
2899 let mut byte_size: u64 = 0;
2902 for (row, diff) in &diffs {
2903 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2904 if diff.is_positive() {
2905 for (idx, datum) in row.iter().enumerate() {
2906 desc.constraints_met(idx, &datum)?;
2907 }
2908 }
2909 }
2910 Ok((diffs, byte_size))
2911 };
2912
2913 let diffs = match peek_response {
2914 ExecuteResponse::SendingRowsStreaming {
2915 rows: mut rows_stream,
2916 ..
2917 } => {
2918 let mut byte_size: u64 = 0;
2919 let mut diffs = Vec::new();
2920 let result = loop {
2921 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2922 Ok(Some(res)) => match res {
2923 PeekResponseUnary::Rows(new_rows) => {
2924 match make_diffs(new_rows) {
2925 Ok((mut new_diffs, new_byte_size)) => {
2926 byte_size = byte_size.saturating_add(new_byte_size);
2927 if byte_size > max_result_size {
2928 break Err(AdapterError::ResultSize(format!(
2929 "result exceeds max size of {max_result_size}"
2930 )));
2931 }
2932 diffs.append(&mut new_diffs)
2933 }
2934 Err(e) => break Err(e),
2935 };
2936 }
2937 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2938 PeekResponseUnary::Error(e) => {
2939 break Err(AdapterError::Unstructured(anyhow!(e)));
2940 }
2941 },
2942 Ok(None) => break Ok(diffs),
2943 Err(_) => {
2944 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2949 conn_id: ctx.session().conn_id().clone(),
2950 });
2951 if let Err(e) = result {
2952 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2953 }
2954 break Err(AdapterError::StatementTimeout);
2955 }
2956 }
2957 };
2958
2959 result
2960 }
2961 ExecuteResponse::SendingRowsImmediate { rows } => {
2962 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2963 }
2964 resp => Err(AdapterError::Unstructured(anyhow!(
2965 "unexpected peek response: {resp:?}"
2966 ))),
2967 };
2968
2969 let mut returning_rows = Vec::new();
2970 let mut diff_err: Option<AdapterError> = None;
2971 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2972 let arena = RowArena::new();
2973 for (row, diff) in diffs {
2974 if !diff.is_positive() {
2975 continue;
2976 }
2977 let mut returning_row = Row::with_capacity(returning.len());
2978 let mut packer = returning_row.packer();
2979 for expr in &returning {
2980 let datums: Vec<_> = row.iter().collect();
2981 match expr.eval(&datums, &arena) {
2982 Ok(datum) => {
2983 packer.push(datum);
2984 }
2985 Err(err) => {
2986 diff_err = Some(err.into());
2987 break;
2988 }
2989 }
2990 }
2991 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2992 let diff = match NonZeroUsize::try_from(diff) {
2993 Ok(diff) => diff,
2994 Err(err) => {
2995 diff_err = Some(err.into());
2996 break;
2997 }
2998 };
2999 returning_rows.push((returning_row, diff));
3000 if diff_err.is_some() {
3001 break;
3002 }
3003 }
3004 }
3005 let diffs = if let Some(err) = diff_err {
3006 Err(err)
3007 } else {
3008 diffs
3009 };
3010
3011 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3014 if let Some(timestamp_context) = timestamp_context {
3023 let (tx, rx) = tokio::sync::oneshot::channel();
3024 let conn_id = ctx.session().conn_id().clone();
3025 let pending_read_txn = PendingReadTxn {
3026 txn: PendingRead::ReadThenWrite { ctx, tx },
3027 timestamp_context,
3028 created: Instant::now(),
3029 num_requeues: 0,
3030 otel_ctx: OpenTelemetryContext::obtain(),
3031 };
3032 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3033 if let Err(e) = result {
3035 warn!(
3036 "strict_serializable_reads_tx dropped before we could send: {:?}",
3037 e
3038 );
3039 return;
3040 }
3041 let result = rx.await;
3042 ctx = match result {
3044 Ok(Some(ctx)) => ctx,
3045 Ok(None) => {
3046 return;
3049 }
3050 Err(e) => {
3051 warn!(
3052 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3053 e
3054 );
3055 return;
3056 }
3057 };
3058 }
3059
3060 match diffs {
3061 Ok(diffs) => {
3062 let result = Self::send_diffs(
3063 ctx.session_mut(),
3064 plan::SendDiffsPlan {
3065 id,
3066 updates: diffs,
3067 kind,
3068 returning: returning_rows,
3069 max_result_size,
3070 },
3071 );
3072 ctx.retire(result);
3073 }
3074 Err(e) => {
3075 ctx.retire(Err(e));
3076 }
3077 }
3078 });
3079 }
3080
3081 #[instrument]
3082 pub(super) async fn sequence_alter_item_rename(
3083 &mut self,
3084 ctx: &mut ExecuteContext,
3085 plan: plan::AlterItemRenamePlan,
3086 ) -> Result<ExecuteResponse, AdapterError> {
3087 let op = catalog::Op::RenameItem {
3088 id: plan.id,
3089 current_full_name: plan.current_full_name,
3090 to_name: plan.to_name,
3091 };
3092 match self
3093 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3094 .await
3095 {
3096 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3097 Err(err) => Err(err),
3098 }
3099 }
3100
3101 #[instrument]
3102 pub(super) async fn sequence_alter_retain_history(
3103 &mut self,
3104 ctx: &mut ExecuteContext,
3105 plan: plan::AlterRetainHistoryPlan,
3106 ) -> Result<ExecuteResponse, AdapterError> {
3107 let ops = vec![catalog::Op::AlterRetainHistory {
3108 id: plan.id,
3109 value: plan.value,
3110 window: plan.window,
3111 }];
3112 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3113 Box::pin(async move {
3114 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3115 let cluster = match catalog_item {
3116 CatalogItem::Table(_)
3117 | CatalogItem::MaterializedView(_)
3118 | CatalogItem::Source(_)
3119 | CatalogItem::ContinualTask(_) => None,
3120 CatalogItem::Index(index) => Some(index.cluster_id),
3121 CatalogItem::Log(_)
3122 | CatalogItem::View(_)
3123 | CatalogItem::Sink(_)
3124 | CatalogItem::Type(_)
3125 | CatalogItem::Func(_)
3126 | CatalogItem::Secret(_)
3127 | CatalogItem::Connection(_) => unreachable!(),
3128 };
3129 match cluster {
3130 Some(cluster) => {
3131 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3132 }
3133 None => {
3134 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3135 }
3136 }
3137 })
3138 })
3139 .await?;
3140 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3141 }
3142
3143 #[instrument]
3144 pub(super) async fn sequence_alter_source_timestamp_interval(
3145 &mut self,
3146 ctx: &mut ExecuteContext,
3147 plan: plan::AlterSourceTimestampIntervalPlan,
3148 ) -> Result<ExecuteResponse, AdapterError> {
3149 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3150 id: plan.id,
3151 value: plan.value,
3152 interval: plan.interval,
3153 }];
3154 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3155 Box::pin(async move {
3156 let source = coord
3157 .catalog()
3158 .get_entry(&plan.id)
3159 .source()
3160 .expect("known to be source");
3161 let (global_id, desc) = match &source.data_source {
3162 DataSourceDesc::Ingestion { desc, .. }
3163 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3164 (source.global_id, desc.clone())
3165 }
3166 _ => return,
3167 };
3168 let desc = desc.into_inline_connection(coord.catalog().state());
3169 coord
3170 .controller
3171 .storage
3172 .alter_ingestion_source_desc(BTreeMap::from([(global_id, desc)]))
3173 .await
3174 .unwrap_or_terminate("cannot fail to alter ingestion source desc");
3175 })
3176 })
3177 .await?;
3178 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3179 }
3180
3181 #[instrument]
3182 pub(super) async fn sequence_alter_schema_rename(
3183 &mut self,
3184 ctx: &mut ExecuteContext,
3185 plan: plan::AlterSchemaRenamePlan,
3186 ) -> Result<ExecuteResponse, AdapterError> {
3187 let (database_spec, schema_spec) = plan.cur_schema_spec;
3188 let op = catalog::Op::RenameSchema {
3189 database_spec,
3190 schema_spec,
3191 new_name: plan.new_schema_name,
3192 check_reserved_names: true,
3193 };
3194 match self
3195 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3196 .await
3197 {
3198 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3199 Err(err) => Err(err),
3200 }
3201 }
3202
3203 #[instrument]
3204 pub(super) async fn sequence_alter_schema_swap(
3205 &mut self,
3206 ctx: &mut ExecuteContext,
3207 plan: plan::AlterSchemaSwapPlan,
3208 ) -> Result<ExecuteResponse, AdapterError> {
3209 let plan::AlterSchemaSwapPlan {
3210 schema_a_spec: (schema_a_db, schema_a),
3211 schema_a_name,
3212 schema_b_spec: (schema_b_db, schema_b),
3213 schema_b_name,
3214 name_temp,
3215 } = plan;
3216
3217 let op_a = catalog::Op::RenameSchema {
3218 database_spec: schema_a_db,
3219 schema_spec: schema_a,
3220 new_name: name_temp,
3221 check_reserved_names: false,
3222 };
3223 let op_b = catalog::Op::RenameSchema {
3224 database_spec: schema_b_db,
3225 schema_spec: schema_b,
3226 new_name: schema_a_name,
3227 check_reserved_names: false,
3228 };
3229 let op_c = catalog::Op::RenameSchema {
3230 database_spec: schema_a_db,
3231 schema_spec: schema_a,
3232 new_name: schema_b_name,
3233 check_reserved_names: false,
3234 };
3235
3236 match self
3237 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3238 Box::pin(async {})
3239 })
3240 .await
3241 {
3242 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3243 Err(err) => Err(err),
3244 }
3245 }
3246
3247 #[instrument]
3248 pub(super) async fn sequence_alter_role(
3249 &mut self,
3250 session: &Session,
3251 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3252 ) -> Result<ExecuteResponse, AdapterError> {
3253 let catalog = self.catalog().for_session(session);
3254 let role = catalog.get_role(&id);
3255
3256 let mut notices = vec![];
3258
3259 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3261 let mut vars = role.vars().clone();
3262
3263 let mut nopassword = false;
3266
3267 match option {
3269 PlannedAlterRoleOption::Attributes(attrs) => {
3270 self.validate_role_attributes(&attrs.clone().into())?;
3271
3272 if let Some(inherit) = attrs.inherit {
3273 attributes.inherit = inherit;
3274 }
3275
3276 if let Some(password) = attrs.password {
3277 attributes.password = Some(password);
3278 attributes.scram_iterations =
3279 Some(self.catalog().system_config().scram_iterations())
3280 }
3281
3282 if let Some(superuser) = attrs.superuser {
3283 attributes.superuser = Some(superuser);
3284 }
3285
3286 if let Some(login) = attrs.login {
3287 attributes.login = Some(login);
3288 }
3289
3290 if attrs.nopassword.unwrap_or(false) {
3291 nopassword = true;
3292 }
3293
3294 if let Some(notice) = self.should_emit_rbac_notice(session) {
3295 notices.push(notice);
3296 }
3297 }
3298 PlannedAlterRoleOption::Variable(variable) => {
3299 let session_var = session.vars().inspect(variable.name())?;
3301 session_var.visible(session.user(), catalog.system_vars())?;
3303
3304 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3307 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3308 } else if let PlannedRoleVariable::Set {
3309 name,
3310 value: VariableValue::Values(vals),
3311 } = &variable
3312 {
3313 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3314 notices.push(AdapterNotice::IntrospectionClusterUsage);
3315 }
3316 }
3317
3318 let var_name = match variable {
3319 PlannedRoleVariable::Set { name, value } => {
3320 match &value {
3322 VariableValue::Default => {
3323 vars.remove(&name);
3324 }
3325 VariableValue::Values(vals) => {
3326 let var = match &vals[..] {
3327 [val] => OwnedVarInput::Flat(val.clone()),
3328 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3329 };
3330 session_var.check(var.borrow())?;
3332
3333 vars.insert(name.clone(), var);
3334 }
3335 };
3336 name
3337 }
3338 PlannedRoleVariable::Reset { name } => {
3339 vars.remove(&name);
3341 name
3342 }
3343 };
3344
3345 notices.push(AdapterNotice::VarDefaultUpdated {
3347 role: Some(name.clone()),
3348 var_name: Some(var_name),
3349 });
3350 }
3351 }
3352
3353 let op = catalog::Op::AlterRole {
3354 id,
3355 name,
3356 attributes,
3357 nopassword,
3358 vars: RoleVars { map: vars },
3359 };
3360 let response = self
3361 .catalog_transact(Some(session), vec![op])
3362 .await
3363 .map(|_| ExecuteResponse::AlteredRole)?;
3364
3365 session.add_notices(notices);
3367
3368 Ok(response)
3369 }
3370
3371 #[instrument]
3372 pub(super) async fn sequence_alter_sink_prepare(
3373 &mut self,
3374 ctx: ExecuteContext,
3375 plan: plan::AlterSinkPlan,
3376 ) {
3377 let id_bundle = crate::CollectionIdBundle {
3379 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3380 compute_ids: BTreeMap::new(),
3381 };
3382 let read_hold = self.acquire_read_holds(&id_bundle);
3383
3384 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3385 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3386 return;
3387 };
3388
3389 let otel_ctx = OpenTelemetryContext::obtain();
3390 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3391
3392 let plan_validity = PlanValidity::new(
3393 self.catalog().transient_revision(),
3394 BTreeSet::from_iter([plan.item_id, from_item_id]),
3395 Some(plan.in_cluster),
3396 None,
3397 ctx.session().role_metadata().clone(),
3398 );
3399
3400 info!(
3401 "preparing alter sink for {}: frontiers={:?} export={:?}",
3402 plan.global_id,
3403 self.controller
3404 .storage_collections
3405 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3406 self.controller.storage.export(plan.global_id)
3407 );
3408
3409 self.install_storage_watch_set(
3415 ctx.session().conn_id().clone(),
3416 BTreeSet::from_iter([plan.global_id]),
3417 read_ts,
3418 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3419 ctx: Some(ctx),
3420 otel_ctx,
3421 plan,
3422 plan_validity,
3423 read_hold,
3424 }),
3425 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3426 }
3427
3428 #[instrument]
3429 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3430 ctx.otel_ctx.attach_as_parent();
3431
3432 let plan::AlterSinkPlan {
3433 item_id,
3434 global_id,
3435 sink: sink_plan,
3436 with_snapshot,
3437 in_cluster,
3438 } = ctx.plan.clone();
3439
3440 match ctx.plan_validity.check(self.catalog()) {
3449 Ok(()) => {}
3450 Err(err) => {
3451 ctx.retire(Err(err));
3452 return;
3453 }
3454 }
3455
3456 let entry = self.catalog().get_entry(&item_id);
3457 let CatalogItem::Sink(old_sink) = entry.item() else {
3458 panic!("invalid item kind for `AlterSinkPlan`");
3459 };
3460
3461 if sink_plan.version != old_sink.version + 1 {
3462 ctx.retire(Err(AdapterError::ChangedPlan(
3463 "sink was altered concurrently".into(),
3464 )));
3465 return;
3466 }
3467
3468 info!(
3469 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3470 self.controller
3471 .storage_collections
3472 .collections_frontiers(vec![global_id, sink_plan.from]),
3473 self.controller.storage.export(global_id),
3474 );
3475
3476 let write_frontier = &self
3479 .controller
3480 .storage
3481 .export(global_id)
3482 .expect("sink known to exist")
3483 .write_frontier;
3484 let as_of = ctx.read_hold.least_valid_read();
3485 assert!(
3486 write_frontier.iter().all(|t| as_of.less_than(t)),
3487 "{:?} should be strictly less than {:?}",
3488 &*as_of,
3489 &**write_frontier
3490 );
3491
3492 let create_sql = &old_sink.create_sql;
3498 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3499 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3500 unreachable!("invalid statement kind for sink");
3501 };
3502
3503 stmt.with_options
3505 .retain(|o| o.name != CreateSinkOptionName::Version);
3506 stmt.with_options.push(CreateSinkOption {
3507 name: CreateSinkOptionName::Version,
3508 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3509 sink_plan.version.to_string(),
3510 ))),
3511 });
3512
3513 let conn_catalog = self.catalog().for_system_session();
3514 let (mut stmt, resolved_ids) =
3515 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3516
3517 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3519 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3520 stmt.from = ResolvedItemName::Item {
3521 id: from_entry.id(),
3522 qualifiers: from_entry.name.qualifiers.clone(),
3523 full_name,
3524 print_id: true,
3525 version: from_entry.version,
3526 };
3527
3528 let new_sink = Sink {
3529 create_sql: stmt.to_ast_string_stable(),
3530 global_id,
3531 from: sink_plan.from,
3532 connection: sink_plan.connection.clone(),
3533 envelope: sink_plan.envelope,
3534 version: sink_plan.version,
3535 with_snapshot,
3536 resolved_ids: resolved_ids.clone(),
3537 cluster_id: in_cluster,
3538 commit_interval: sink_plan.commit_interval,
3539 };
3540
3541 let ops = vec![catalog::Op::UpdateItem {
3542 id: item_id,
3543 name: entry.name().clone(),
3544 to_item: CatalogItem::Sink(new_sink),
3545 }];
3546
3547 match self
3548 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3549 .await
3550 {
3551 Ok(()) => {}
3552 Err(err) => {
3553 ctx.retire(Err(err));
3554 return;
3555 }
3556 }
3557
3558 let storage_sink_desc = StorageSinkDesc {
3559 from: sink_plan.from,
3560 from_desc: from_entry
3561 .relation_desc()
3562 .expect("sinks can only be built on items with descs")
3563 .into_owned(),
3564 connection: sink_plan
3565 .connection
3566 .clone()
3567 .into_inline_connection(self.catalog().state()),
3568 envelope: sink_plan.envelope,
3569 as_of,
3570 with_snapshot,
3571 version: sink_plan.version,
3572 from_storage_metadata: (),
3573 to_storage_metadata: (),
3574 commit_interval: sink_plan.commit_interval,
3575 };
3576
3577 self.controller
3578 .storage
3579 .alter_export(
3580 global_id,
3581 ExportDescription {
3582 sink: storage_sink_desc,
3583 instance_id: in_cluster,
3584 },
3585 )
3586 .await
3587 .unwrap_or_terminate("cannot fail to alter source desc");
3588
3589 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3590 }
3591
3592 #[instrument]
3593 pub(super) async fn sequence_alter_connection(
3594 &mut self,
3595 ctx: ExecuteContext,
3596 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3597 ) {
3598 match action {
3599 AlterConnectionAction::RotateKeys => {
3600 self.sequence_rotate_keys(ctx, id).await;
3601 }
3602 AlterConnectionAction::AlterOptions {
3603 set_options,
3604 drop_options,
3605 validate,
3606 } => {
3607 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3608 .await
3609 }
3610 }
3611 }
3612
3613 #[instrument]
3614 async fn sequence_alter_connection_options(
3615 &mut self,
3616 mut ctx: ExecuteContext,
3617 id: CatalogItemId,
3618 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3619 drop_options: BTreeSet<ConnectionOptionName>,
3620 validate: bool,
3621 ) {
3622 let cur_entry = self.catalog().get_entry(&id);
3623 let cur_conn = cur_entry.connection().expect("known to be connection");
3624 let connection_gid = cur_conn.global_id();
3625
3626 let inner = || -> Result<Connection, AdapterError> {
3627 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3629 .expect("invalid create sql persisted to catalog")
3630 .into_element()
3631 .ast
3632 {
3633 Statement::CreateConnection(stmt) => stmt,
3634 _ => unreachable!("proved type is source"),
3635 };
3636
3637 let catalog = self.catalog().for_system_session();
3638
3639 let (mut create_conn_stmt, resolved_ids) =
3641 mz_sql::names::resolve(&catalog, create_conn_stmt)
3642 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3643
3644 create_conn_stmt
3646 .values
3647 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3648
3649 create_conn_stmt.values.extend(
3651 set_options
3652 .into_iter()
3653 .map(|(name, value)| ConnectionOption { name, value }),
3654 );
3655
3656 let mut catalog = self.catalog().for_system_session();
3659 catalog.mark_id_unresolvable_for_replanning(id);
3660
3661 let plan = match mz_sql::plan::plan(
3663 None,
3664 &catalog,
3665 Statement::CreateConnection(create_conn_stmt),
3666 &Params::empty(),
3667 &resolved_ids,
3668 )
3669 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3670 {
3671 Plan::CreateConnection(plan) => plan,
3672 _ => unreachable!("create source plan is only valid response"),
3673 };
3674
3675 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3677 .expect("invalid create sql persisted to catalog")
3678 .into_element()
3679 .ast
3680 {
3681 Statement::CreateConnection(stmt) => stmt,
3682 _ => unreachable!("proved type is source"),
3683 };
3684
3685 let catalog = self.catalog().for_system_session();
3686
3687 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3689 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3690
3691 Ok(Connection {
3692 create_sql: plan.connection.create_sql,
3693 global_id: cur_conn.global_id,
3694 details: plan.connection.details,
3695 resolved_ids: new_deps,
3696 })
3697 };
3698
3699 let conn = match inner() {
3700 Ok(conn) => conn,
3701 Err(e) => {
3702 return ctx.retire(Err(e));
3703 }
3704 };
3705
3706 if validate {
3707 let connection = conn
3708 .details
3709 .to_connection()
3710 .into_inline_connection(self.catalog().state());
3711
3712 let internal_cmd_tx = self.internal_cmd_tx.clone();
3713 let transient_revision = self.catalog().transient_revision();
3714 let conn_id = ctx.session().conn_id().clone();
3715 let otel_ctx = OpenTelemetryContext::obtain();
3716 let role_metadata = ctx.session().role_metadata().clone();
3717 let current_storage_parameters = self.controller.storage.config().clone();
3718
3719 task::spawn(
3720 || format!("validate_alter_connection:{conn_id}"),
3721 async move {
3722 let resolved_ids = conn.resolved_ids.clone();
3723 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3724 let result = match connection.validate(id, ¤t_storage_parameters).await {
3725 Ok(()) => Ok(conn),
3726 Err(err) => Err(err.into()),
3727 };
3728
3729 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3731 AlterConnectionValidationReady {
3732 ctx,
3733 result,
3734 connection_id: id,
3735 connection_gid,
3736 plan_validity: PlanValidity::new(
3737 transient_revision,
3738 dependency_ids.clone(),
3739 None,
3740 None,
3741 role_metadata,
3742 ),
3743 otel_ctx,
3744 resolved_ids,
3745 },
3746 ));
3747 if let Err(e) = result {
3748 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3749 }
3750 },
3751 );
3752 } else {
3753 let result = self
3754 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3755 .await;
3756 ctx.retire(result);
3757 }
3758 }
3759
3760 #[instrument]
3761 pub(crate) async fn sequence_alter_connection_stage_finish(
3762 &mut self,
3763 session: &Session,
3764 id: CatalogItemId,
3765 connection: Connection,
3766 ) -> Result<ExecuteResponse, AdapterError> {
3767 match self.catalog.get_entry(&id).item() {
3768 CatalogItem::Connection(curr_conn) => {
3769 curr_conn
3770 .details
3771 .to_connection()
3772 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3773 .map_err(StorageError::from)?;
3774 }
3775 _ => unreachable!("known to be a connection"),
3776 };
3777
3778 let ops = vec![catalog::Op::UpdateItem {
3779 id,
3780 name: self.catalog.get_entry(&id).name().clone(),
3781 to_item: CatalogItem::Connection(connection.clone()),
3782 }];
3783
3784 self.catalog_transact(Some(session), ops).await?;
3785
3786 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3793 }
3794
3795 #[instrument]
3796 pub(super) async fn sequence_alter_source(
3797 &mut self,
3798 session: &Session,
3799 plan::AlterSourcePlan {
3800 item_id,
3801 ingestion_id,
3802 action,
3803 }: plan::AlterSourcePlan,
3804 ) -> Result<ExecuteResponse, AdapterError> {
3805 let cur_entry = self.catalog().get_entry(&item_id);
3806 let cur_source = cur_entry.source().expect("known to be source");
3807
3808 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3809 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3811 .expect("invalid create sql persisted to catalog")
3812 .into_element()
3813 .ast
3814 {
3815 Statement::CreateSource(stmt) => stmt,
3816 _ => unreachable!("proved type is source"),
3817 };
3818
3819 let catalog = coord.catalog().for_system_session();
3820
3821 mz_sql::names::resolve(&catalog, create_source_stmt)
3823 .map_err(|e| AdapterError::internal(err_cx, e))
3824 };
3825
3826 match action {
3827 plan::AlterSourceAction::AddSubsourceExports {
3828 subsources,
3829 options,
3830 } => {
3831 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3832
3833 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3834 text_columns: mut new_text_columns,
3835 exclude_columns: mut new_exclude_columns,
3836 ..
3837 } = options.try_into()?;
3838
3839 let (mut create_source_stmt, resolved_ids) =
3841 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3842
3843 let catalog = self.catalog();
3845 let curr_references: BTreeSet<_> = catalog
3846 .get_entry(&item_id)
3847 .used_by()
3848 .into_iter()
3849 .filter_map(|subsource| {
3850 catalog
3851 .get_entry(subsource)
3852 .subsource_details()
3853 .map(|(_id, reference, _details)| reference)
3854 })
3855 .collect();
3856
3857 let purification_err =
3860 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3861
3862 match &mut create_source_stmt.connection {
3866 CreateSourceConnection::Postgres {
3867 options: curr_options,
3868 ..
3869 } => {
3870 let mz_sql::plan::PgConfigOptionExtracted {
3871 mut text_columns, ..
3872 } = curr_options.clone().try_into()?;
3873
3874 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3877
3878 text_columns.retain(|column_qualified_reference| {
3880 mz_ore::soft_assert_eq_or_log!(
3881 column_qualified_reference.0.len(),
3882 4,
3883 "all TEXT COLUMNS values must be column-qualified references"
3884 );
3885 let mut table = column_qualified_reference.clone();
3886 table.0.truncate(3);
3887 curr_references.contains(&table)
3888 });
3889
3890 new_text_columns.extend(text_columns);
3892
3893 if !new_text_columns.is_empty() {
3895 new_text_columns.sort();
3896 let new_text_columns = new_text_columns
3897 .into_iter()
3898 .map(WithOptionValue::UnresolvedItemName)
3899 .collect();
3900
3901 curr_options.push(PgConfigOption {
3902 name: PgConfigOptionName::TextColumns,
3903 value: Some(WithOptionValue::Sequence(new_text_columns)),
3904 });
3905 }
3906 }
3907 CreateSourceConnection::MySql {
3908 options: curr_options,
3909 ..
3910 } => {
3911 let mz_sql::plan::MySqlConfigOptionExtracted {
3912 mut text_columns,
3913 mut exclude_columns,
3914 ..
3915 } = curr_options.clone().try_into()?;
3916
3917 curr_options.retain(|o| {
3920 !matches!(
3921 o.name,
3922 MySqlConfigOptionName::TextColumns
3923 | MySqlConfigOptionName::ExcludeColumns
3924 )
3925 });
3926
3927 let column_referenced =
3929 |column_qualified_reference: &UnresolvedItemName| {
3930 mz_ore::soft_assert_eq_or_log!(
3931 column_qualified_reference.0.len(),
3932 3,
3933 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3934 );
3935 let mut table = column_qualified_reference.clone();
3936 table.0.truncate(2);
3937 curr_references.contains(&table)
3938 };
3939 text_columns.retain(column_referenced);
3940 exclude_columns.retain(column_referenced);
3941
3942 new_text_columns.extend(text_columns);
3944 new_exclude_columns.extend(exclude_columns);
3945
3946 if !new_text_columns.is_empty() {
3948 new_text_columns.sort();
3949 let new_text_columns = new_text_columns
3950 .into_iter()
3951 .map(WithOptionValue::UnresolvedItemName)
3952 .collect();
3953
3954 curr_options.push(MySqlConfigOption {
3955 name: MySqlConfigOptionName::TextColumns,
3956 value: Some(WithOptionValue::Sequence(new_text_columns)),
3957 });
3958 }
3959 if !new_exclude_columns.is_empty() {
3961 new_exclude_columns.sort();
3962 let new_exclude_columns = new_exclude_columns
3963 .into_iter()
3964 .map(WithOptionValue::UnresolvedItemName)
3965 .collect();
3966
3967 curr_options.push(MySqlConfigOption {
3968 name: MySqlConfigOptionName::ExcludeColumns,
3969 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3970 });
3971 }
3972 }
3973 CreateSourceConnection::SqlServer {
3974 options: curr_options,
3975 ..
3976 } => {
3977 let mz_sql::plan::SqlServerConfigOptionExtracted {
3978 mut text_columns,
3979 mut exclude_columns,
3980 ..
3981 } = curr_options.clone().try_into()?;
3982
3983 curr_options.retain(|o| {
3986 !matches!(
3987 o.name,
3988 SqlServerConfigOptionName::TextColumns
3989 | SqlServerConfigOptionName::ExcludeColumns
3990 )
3991 });
3992
3993 let column_referenced =
3995 |column_qualified_reference: &UnresolvedItemName| {
3996 mz_ore::soft_assert_eq_or_log!(
3997 column_qualified_reference.0.len(),
3998 3,
3999 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4000 );
4001 let mut table = column_qualified_reference.clone();
4002 table.0.truncate(2);
4003 curr_references.contains(&table)
4004 };
4005 text_columns.retain(column_referenced);
4006 exclude_columns.retain(column_referenced);
4007
4008 new_text_columns.extend(text_columns);
4010 new_exclude_columns.extend(exclude_columns);
4011
4012 if !new_text_columns.is_empty() {
4014 new_text_columns.sort();
4015 let new_text_columns = new_text_columns
4016 .into_iter()
4017 .map(WithOptionValue::UnresolvedItemName)
4018 .collect();
4019
4020 curr_options.push(SqlServerConfigOption {
4021 name: SqlServerConfigOptionName::TextColumns,
4022 value: Some(WithOptionValue::Sequence(new_text_columns)),
4023 });
4024 }
4025 if !new_exclude_columns.is_empty() {
4027 new_exclude_columns.sort();
4028 let new_exclude_columns = new_exclude_columns
4029 .into_iter()
4030 .map(WithOptionValue::UnresolvedItemName)
4031 .collect();
4032
4033 curr_options.push(SqlServerConfigOption {
4034 name: SqlServerConfigOptionName::ExcludeColumns,
4035 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4036 });
4037 }
4038 }
4039 _ => return Err(purification_err()),
4040 };
4041
4042 let mut catalog = self.catalog().for_system_session();
4043 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4044
4045 let plan = match mz_sql::plan::plan(
4047 None,
4048 &catalog,
4049 Statement::CreateSource(create_source_stmt),
4050 &Params::empty(),
4051 &resolved_ids,
4052 )
4053 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4054 {
4055 Plan::CreateSource(plan) => plan,
4056 _ => unreachable!("create source plan is only valid response"),
4057 };
4058
4059 let source = Source::new(
4063 plan,
4064 cur_source.global_id,
4065 resolved_ids,
4066 cur_source.custom_logical_compaction_window,
4067 cur_source.is_retained_metrics_object,
4068 );
4069
4070 let desc = match &source.data_source {
4072 DataSourceDesc::Ingestion { desc, .. }
4073 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4074 desc.clone().into_inline_connection(self.catalog().state())
4075 }
4076 _ => unreachable!("already verified of type ingestion"),
4077 };
4078
4079 self.controller
4080 .storage
4081 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4082 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4083
4084 let mut ops = vec![catalog::Op::UpdateItem {
4087 id: item_id,
4088 name: self.catalog.get_entry(&item_id).name().clone(),
4091 to_item: CatalogItem::Source(source),
4092 }];
4093
4094 let CreateSourceInner {
4095 ops: new_ops,
4096 sources: _,
4097 if_not_exists_ids,
4098 } = self.create_source_inner(session, subsources).await?;
4099
4100 ops.extend(new_ops.into_iter());
4101
4102 assert!(
4103 if_not_exists_ids.is_empty(),
4104 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4105 );
4106
4107 self.catalog_transact(Some(session), ops).await?;
4108 }
4109 plan::AlterSourceAction::RefreshReferences { references } => {
4110 self.catalog_transact(
4111 Some(session),
4112 vec![catalog::Op::UpdateSourceReferences {
4113 source_id: item_id,
4114 references: references.into(),
4115 }],
4116 )
4117 .await?;
4118 }
4119 }
4120
4121 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4122 }
4123
4124 #[instrument]
4125 pub(super) async fn sequence_alter_system_set(
4126 &mut self,
4127 session: &Session,
4128 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4129 ) -> Result<ExecuteResponse, AdapterError> {
4130 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4131 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4133 self.validate_alter_system_network_policy(session, &value)?;
4134 }
4135
4136 let op = match value {
4137 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4138 name: name.clone(),
4139 value: OwnedVarInput::SqlSet(values),
4140 },
4141 plan::VariableValue::Default => {
4142 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4143 }
4144 };
4145 self.catalog_transact(Some(session), vec![op]).await?;
4146
4147 session.add_notice(AdapterNotice::VarDefaultUpdated {
4148 role: None,
4149 var_name: Some(name),
4150 });
4151 Ok(ExecuteResponse::AlteredSystemConfiguration)
4152 }
4153
4154 #[instrument]
4155 pub(super) async fn sequence_alter_system_reset(
4156 &mut self,
4157 session: &Session,
4158 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4159 ) -> Result<ExecuteResponse, AdapterError> {
4160 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4161 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4162 self.catalog_transact(Some(session), vec![op]).await?;
4163 session.add_notice(AdapterNotice::VarDefaultUpdated {
4164 role: None,
4165 var_name: Some(name),
4166 });
4167 Ok(ExecuteResponse::AlteredSystemConfiguration)
4168 }
4169
4170 #[instrument]
4171 pub(super) async fn sequence_alter_system_reset_all(
4172 &mut self,
4173 session: &Session,
4174 _: plan::AlterSystemResetAllPlan,
4175 ) -> Result<ExecuteResponse, AdapterError> {
4176 self.is_user_allowed_to_alter_system(session, None)?;
4177 let op = catalog::Op::ResetAllSystemConfiguration;
4178 self.catalog_transact(Some(session), vec![op]).await?;
4179 session.add_notice(AdapterNotice::VarDefaultUpdated {
4180 role: None,
4181 var_name: None,
4182 });
4183 Ok(ExecuteResponse::AlteredSystemConfiguration)
4184 }
4185
4186 fn is_user_allowed_to_alter_system(
4188 &self,
4189 session: &Session,
4190 var_name: Option<&str>,
4191 ) -> Result<(), AdapterError> {
4192 match (session.user().kind(), var_name) {
4193 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4195 (UserKind::Superuser, Some(name))
4197 if session.user().is_internal()
4198 || self.catalog().system_config().user_modifiable(name) =>
4199 {
4200 let var = self.catalog().system_config().get(name)?;
4203 var.visible(session.user(), self.catalog().system_config())?;
4204 Ok(())
4205 }
4206 (UserKind::Regular, Some(name))
4209 if self.catalog().system_config().user_modifiable(name) =>
4210 {
4211 Err(AdapterError::Unauthorized(
4212 rbac::UnauthorizedError::Superuser {
4213 action: format!("toggle the '{name}' system configuration parameter"),
4214 },
4215 ))
4216 }
4217 _ => Err(AdapterError::Unauthorized(
4218 rbac::UnauthorizedError::MzSystem {
4219 action: "alter system".into(),
4220 },
4221 )),
4222 }
4223 }
4224
4225 fn validate_alter_system_network_policy(
4226 &self,
4227 session: &Session,
4228 policy_value: &plan::VariableValue,
4229 ) -> Result<(), AdapterError> {
4230 let policy_name = match &policy_value {
4231 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4233 plan::VariableValue::Values(values) if values.len() == 1 => {
4234 values.iter().next().cloned()
4235 }
4236 plan::VariableValue::Values(values) => {
4237 tracing::warn!(?values, "can't set multiple network policies at once");
4238 None
4239 }
4240 };
4241 let maybe_network_policy = policy_name
4242 .as_ref()
4243 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4244 let Some(network_policy) = maybe_network_policy else {
4245 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4246 VarError::InvalidParameterValue {
4247 name: NETWORK_POLICY.name(),
4248 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4249 reason: "no network policy with such name exists".to_string(),
4250 },
4251 )));
4252 };
4253 self.validate_alter_network_policy(session, &network_policy.rules)
4254 }
4255
4256 fn validate_alter_network_policy(
4261 &self,
4262 session: &Session,
4263 policy_rules: &Vec<NetworkPolicyRule>,
4264 ) -> Result<(), AdapterError> {
4265 if session.user().is_internal() {
4268 return Ok(());
4269 }
4270 if let Some(ip) = session.meta().client_ip() {
4271 validate_ip_with_policy_rules(ip, policy_rules)
4272 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4273 } else {
4274 return Err(AdapterError::NetworkPolicyDenied(
4277 NetworkPolicyError::MissingIp,
4278 ));
4279 }
4280 Ok(())
4281 }
4282
4283 #[instrument]
4285 pub(super) fn sequence_execute(
4286 &self,
4287 session: &mut Session,
4288 plan: plan::ExecutePlan,
4289 ) -> Result<String, AdapterError> {
4290 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4292 let ps = session
4293 .get_prepared_statement_unverified(&plan.name)
4294 .expect("known to exist");
4295 let stmt = ps.stmt().cloned();
4296 let desc = ps.desc().clone();
4297 let state_revision = ps.state_revision;
4298 let logging = Arc::clone(ps.logging());
4299 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4300 }
4301
4302 #[instrument]
4303 pub(super) async fn sequence_grant_privileges(
4304 &mut self,
4305 session: &Session,
4306 plan::GrantPrivilegesPlan {
4307 update_privileges,
4308 grantees,
4309 }: plan::GrantPrivilegesPlan,
4310 ) -> Result<ExecuteResponse, AdapterError> {
4311 self.sequence_update_privileges(
4312 session,
4313 update_privileges,
4314 grantees,
4315 UpdatePrivilegeVariant::Grant,
4316 )
4317 .await
4318 }
4319
4320 #[instrument]
4321 pub(super) async fn sequence_revoke_privileges(
4322 &mut self,
4323 session: &Session,
4324 plan::RevokePrivilegesPlan {
4325 update_privileges,
4326 revokees,
4327 }: plan::RevokePrivilegesPlan,
4328 ) -> Result<ExecuteResponse, AdapterError> {
4329 self.sequence_update_privileges(
4330 session,
4331 update_privileges,
4332 revokees,
4333 UpdatePrivilegeVariant::Revoke,
4334 )
4335 .await
4336 }
4337
4338 #[instrument]
4339 async fn sequence_update_privileges(
4340 &mut self,
4341 session: &Session,
4342 update_privileges: Vec<UpdatePrivilege>,
4343 grantees: Vec<RoleId>,
4344 variant: UpdatePrivilegeVariant,
4345 ) -> Result<ExecuteResponse, AdapterError> {
4346 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4347 let mut warnings = Vec::new();
4348 let catalog = self.catalog().for_session(session);
4349
4350 for UpdatePrivilege {
4351 acl_mode,
4352 target_id,
4353 grantor,
4354 } in update_privileges
4355 {
4356 let actual_object_type = catalog.get_system_object_type(&target_id);
4357 if actual_object_type.is_relation() {
4360 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4361 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4362 if !non_applicable_privileges.is_empty() {
4363 let object_description =
4364 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4365 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4366 non_applicable_privileges,
4367 object_description,
4368 })
4369 }
4370 }
4371
4372 if let SystemObjectId::Object(object_id) = &target_id {
4373 self.catalog()
4374 .ensure_not_reserved_object(object_id, session.conn_id())?;
4375 }
4376
4377 let privileges = self
4378 .catalog()
4379 .get_privileges(&target_id, session.conn_id())
4380 .ok_or(AdapterError::Unsupported(
4383 "GRANTs/REVOKEs on an object type with no privileges",
4384 ))?;
4385
4386 for grantee in &grantees {
4387 self.catalog().ensure_not_system_role(grantee)?;
4388 self.catalog().ensure_not_predefined_role(grantee)?;
4389 let existing_privilege = privileges
4390 .get_acl_item(grantee, &grantor)
4391 .map(Cow::Borrowed)
4392 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4393
4394 match variant {
4395 UpdatePrivilegeVariant::Grant
4396 if !existing_privilege.acl_mode.contains(acl_mode) =>
4397 {
4398 ops.push(catalog::Op::UpdatePrivilege {
4399 target_id: target_id.clone(),
4400 privilege: MzAclItem {
4401 grantee: *grantee,
4402 grantor,
4403 acl_mode,
4404 },
4405 variant,
4406 });
4407 }
4408 UpdatePrivilegeVariant::Revoke
4409 if !existing_privilege
4410 .acl_mode
4411 .intersection(acl_mode)
4412 .is_empty() =>
4413 {
4414 ops.push(catalog::Op::UpdatePrivilege {
4415 target_id: target_id.clone(),
4416 privilege: MzAclItem {
4417 grantee: *grantee,
4418 grantor,
4419 acl_mode,
4420 },
4421 variant,
4422 });
4423 }
4424 _ => {}
4426 }
4427 }
4428 }
4429
4430 if ops.is_empty() {
4431 session.add_notices(warnings);
4432 return Ok(variant.into());
4433 }
4434
4435 let res = self
4436 .catalog_transact(Some(session), ops)
4437 .await
4438 .map(|_| match variant {
4439 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4440 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4441 });
4442 if res.is_ok() {
4443 session.add_notices(warnings);
4444 }
4445 res
4446 }
4447
4448 #[instrument]
4449 pub(super) async fn sequence_alter_default_privileges(
4450 &mut self,
4451 session: &Session,
4452 plan::AlterDefaultPrivilegesPlan {
4453 privilege_objects,
4454 privilege_acl_items,
4455 is_grant,
4456 }: plan::AlterDefaultPrivilegesPlan,
4457 ) -> Result<ExecuteResponse, AdapterError> {
4458 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4459 let variant = if is_grant {
4460 UpdatePrivilegeVariant::Grant
4461 } else {
4462 UpdatePrivilegeVariant::Revoke
4463 };
4464 for privilege_object in &privilege_objects {
4465 self.catalog()
4466 .ensure_not_system_role(&privilege_object.role_id)?;
4467 self.catalog()
4468 .ensure_not_predefined_role(&privilege_object.role_id)?;
4469 if let Some(database_id) = privilege_object.database_id {
4470 self.catalog()
4471 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4472 }
4473 if let Some(schema_id) = privilege_object.schema_id {
4474 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4475 let schema_spec: SchemaSpecifier = schema_id.into();
4476
4477 self.catalog().ensure_not_reserved_object(
4478 &(database_spec, schema_spec).into(),
4479 session.conn_id(),
4480 )?;
4481 }
4482 for privilege_acl_item in &privilege_acl_items {
4483 self.catalog()
4484 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4485 self.catalog()
4486 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4487 ops.push(catalog::Op::UpdateDefaultPrivilege {
4488 privilege_object: privilege_object.clone(),
4489 privilege_acl_item: privilege_acl_item.clone(),
4490 variant,
4491 })
4492 }
4493 }
4494
4495 self.catalog_transact(Some(session), ops).await?;
4496 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4497 }
4498
4499 #[instrument]
4500 pub(super) async fn sequence_grant_role(
4501 &mut self,
4502 session: &Session,
4503 plan::GrantRolePlan {
4504 role_ids,
4505 member_ids,
4506 grantor_id,
4507 }: plan::GrantRolePlan,
4508 ) -> Result<ExecuteResponse, AdapterError> {
4509 let catalog = self.catalog();
4510 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4511 for role_id in role_ids {
4512 for member_id in &member_ids {
4513 let member_membership: BTreeSet<_> =
4514 catalog.get_role(member_id).membership().keys().collect();
4515 if member_membership.contains(&role_id) {
4516 let role_name = catalog.get_role(&role_id).name().to_string();
4517 let member_name = catalog.get_role(member_id).name().to_string();
4518 catalog.ensure_not_reserved_role(member_id)?;
4520 catalog.ensure_grantable_role(&role_id)?;
4521 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4522 role_name,
4523 member_name,
4524 });
4525 } else {
4526 ops.push(catalog::Op::GrantRole {
4527 role_id,
4528 member_id: *member_id,
4529 grantor_id,
4530 });
4531 }
4532 }
4533 }
4534
4535 if ops.is_empty() {
4536 return Ok(ExecuteResponse::GrantedRole);
4537 }
4538
4539 self.catalog_transact(Some(session), ops)
4540 .await
4541 .map(|_| ExecuteResponse::GrantedRole)
4542 }
4543
4544 #[instrument]
4545 pub(super) async fn sequence_revoke_role(
4546 &mut self,
4547 session: &Session,
4548 plan::RevokeRolePlan {
4549 role_ids,
4550 member_ids,
4551 grantor_id,
4552 }: plan::RevokeRolePlan,
4553 ) -> Result<ExecuteResponse, AdapterError> {
4554 let catalog = self.catalog();
4555 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4556 for role_id in role_ids {
4557 for member_id in &member_ids {
4558 let member_membership: BTreeSet<_> =
4559 catalog.get_role(member_id).membership().keys().collect();
4560 if !member_membership.contains(&role_id) {
4561 let role_name = catalog.get_role(&role_id).name().to_string();
4562 let member_name = catalog.get_role(member_id).name().to_string();
4563 catalog.ensure_not_reserved_role(member_id)?;
4565 catalog.ensure_grantable_role(&role_id)?;
4566 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4567 role_name,
4568 member_name,
4569 });
4570 } else {
4571 ops.push(catalog::Op::RevokeRole {
4572 role_id,
4573 member_id: *member_id,
4574 grantor_id,
4575 });
4576 }
4577 }
4578 }
4579
4580 if ops.is_empty() {
4581 return Ok(ExecuteResponse::RevokedRole);
4582 }
4583
4584 self.catalog_transact(Some(session), ops)
4585 .await
4586 .map(|_| ExecuteResponse::RevokedRole)
4587 }
4588
4589 #[instrument]
4590 pub(super) async fn sequence_alter_owner(
4591 &mut self,
4592 session: &Session,
4593 plan::AlterOwnerPlan {
4594 id,
4595 object_type,
4596 new_owner,
4597 }: plan::AlterOwnerPlan,
4598 ) -> Result<ExecuteResponse, AdapterError> {
4599 let mut ops = vec![catalog::Op::UpdateOwner {
4600 id: id.clone(),
4601 new_owner,
4602 }];
4603
4604 match &id {
4605 ObjectId::Item(global_id) => {
4606 let entry = self.catalog().get_entry(global_id);
4607
4608 if entry.is_index() {
4610 let name = self
4611 .catalog()
4612 .resolve_full_name(entry.name(), Some(session.conn_id()))
4613 .to_string();
4614 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4615 return Ok(ExecuteResponse::AlteredObject(object_type));
4616 }
4617
4618 let dependent_index_ops = entry
4620 .used_by()
4621 .into_iter()
4622 .filter(|id| self.catalog().get_entry(id).is_index())
4623 .map(|id| catalog::Op::UpdateOwner {
4624 id: ObjectId::Item(*id),
4625 new_owner,
4626 });
4627 ops.extend(dependent_index_ops);
4628
4629 let dependent_subsources =
4631 entry
4632 .progress_id()
4633 .into_iter()
4634 .map(|item_id| catalog::Op::UpdateOwner {
4635 id: ObjectId::Item(item_id),
4636 new_owner,
4637 });
4638 ops.extend(dependent_subsources);
4639 }
4640 ObjectId::Cluster(cluster_id) => {
4641 let cluster = self.catalog().get_cluster(*cluster_id);
4642 let managed_cluster_replica_ops =
4644 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4645 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4646 new_owner,
4647 });
4648 ops.extend(managed_cluster_replica_ops);
4649 }
4650 _ => {}
4651 }
4652
4653 self.catalog_transact(Some(session), ops)
4654 .await
4655 .map(|_| ExecuteResponse::AlteredObject(object_type))
4656 }
4657
4658 #[instrument]
4659 pub(super) async fn sequence_reassign_owned(
4660 &mut self,
4661 session: &Session,
4662 plan::ReassignOwnedPlan {
4663 old_roles,
4664 new_role,
4665 reassign_ids,
4666 }: plan::ReassignOwnedPlan,
4667 ) -> Result<ExecuteResponse, AdapterError> {
4668 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4669 self.catalog().ensure_not_reserved_role(role_id)?;
4670 }
4671
4672 let ops = reassign_ids
4673 .into_iter()
4674 .map(|id| catalog::Op::UpdateOwner {
4675 id,
4676 new_owner: new_role,
4677 })
4678 .collect();
4679
4680 self.catalog_transact(Some(session), ops)
4681 .await
4682 .map(|_| ExecuteResponse::ReassignOwned)
4683 }
4684
4685 #[instrument]
4686 pub(crate) async fn handle_deferred_statement(&mut self) {
4687 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4691 return;
4692 };
4693 match ps {
4694 crate::coord::PlanStatement::Statement { stmt, params } => {
4695 self.handle_execute_inner(stmt, params, ctx).await;
4696 }
4697 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4698 self.sequence_plan(ctx, plan, resolved_ids).await;
4699 }
4700 }
4701 }
4702
4703 #[instrument]
4704 #[allow(clippy::unused_async)]
4706 pub(super) async fn sequence_alter_table(
4707 &mut self,
4708 ctx: &mut ExecuteContext,
4709 plan: plan::AlterTablePlan,
4710 ) -> Result<ExecuteResponse, AdapterError> {
4711 let plan::AlterTablePlan {
4712 relation_id,
4713 column_name,
4714 column_type,
4715 raw_sql_type,
4716 } = plan;
4717
4718 let id_ts = self.get_catalog_write_ts().await;
4720 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4721 let ops = vec![catalog::Op::AlterAddColumn {
4722 id: relation_id,
4723 new_global_id,
4724 name: column_name,
4725 typ: column_type,
4726 sql: raw_sql_type,
4727 }];
4728
4729 self.catalog_transact_with_context(None, Some(ctx), ops)
4730 .await?;
4731
4732 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4733 }
4734
4735 #[instrument]
4737 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4738 &mut self,
4739 ctx: ExecuteContext,
4740 plan: AlterMaterializedViewApplyReplacementPlan,
4741 ) {
4742 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4752
4753 let plan_validity = PlanValidity::new(
4754 self.catalog().transient_revision(),
4755 BTreeSet::from_iter([id, replacement_id]),
4756 None,
4757 None,
4758 ctx.session().role_metadata().clone(),
4759 );
4760
4761 let target = self.catalog.get_entry(&id);
4762 let target_gid = target.latest_global_id();
4763
4764 let replacement = self.catalog.get_entry(&replacement_id);
4765 let replacement_gid = replacement.latest_global_id();
4766
4767 let target_upper = self
4768 .controller
4769 .storage_collections
4770 .collection_frontiers(target_gid)
4771 .expect("target MV exists")
4772 .write_frontier;
4773 let replacement_upper = self
4774 .controller
4775 .compute
4776 .collection_frontiers(replacement_gid, replacement.cluster_id())
4777 .expect("replacement MV exists")
4778 .write_frontier;
4779
4780 info!(
4781 %id, %replacement_id, ?target_upper, ?replacement_upper,
4782 "preparing materialized view replacement application",
4783 );
4784
4785 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4786 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4795 name: target.name().item.clone(),
4796 }));
4797 return;
4798 };
4799
4800 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4804
4805 self.install_storage_watch_set(
4809 ctx.session().conn_id().clone(),
4810 BTreeSet::from_iter([target_gid]),
4811 replacement_upper_ts,
4812 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4813 ctx: Some(ctx),
4814 otel_ctx: OpenTelemetryContext::obtain(),
4815 plan,
4816 plan_validity,
4817 }),
4818 )
4819 .expect("target collection exists");
4820 }
4821
4822 #[instrument]
4824 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4825 &mut self,
4826 mut ctx: AlterMaterializedViewReadyContext,
4827 ) {
4828 ctx.otel_ctx.attach_as_parent();
4829
4830 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4831
4832 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4837 ctx.retire(Err(err));
4838 return;
4839 }
4840
4841 info!(
4842 %id, %replacement_id,
4843 "finishing materialized view replacement application",
4844 );
4845
4846 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4847 match self
4848 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4849 .await
4850 {
4851 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4852 ObjectType::MaterializedView,
4853 ))),
4854 Err(err) => ctx.retire(Err(err)),
4855 }
4856 }
4857
4858 pub(super) async fn statistics_oracle(
4859 &self,
4860 session: &Session,
4861 source_ids: &BTreeSet<GlobalId>,
4862 query_as_of: &Antichain<Timestamp>,
4863 is_oneshot: bool,
4864 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4865 super::statistics_oracle(
4866 session,
4867 source_ids,
4868 query_as_of,
4869 is_oneshot,
4870 self.catalog().system_config(),
4871 self.controller.storage_collections.as_ref(),
4872 )
4873 .await
4874 }
4875}
4876
4877impl Coordinator {
4878 async fn process_dataflow_metainfo(
4880 &mut self,
4881 df_meta: DataflowMetainfo,
4882 export_id: GlobalId,
4883 ctx: Option<&mut ExecuteContext>,
4884 notice_ids: Vec<GlobalId>,
4885 ) -> Option<BuiltinTableAppendNotify> {
4886 if let Some(ctx) = ctx {
4888 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4889 }
4890
4891 let df_meta = self
4893 .catalog()
4894 .render_notices(df_meta, notice_ids, Some(export_id));
4895
4896 if self.catalog().state().system_config().enable_mz_notices()
4899 && !df_meta.optimizer_notices.is_empty()
4900 {
4901 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4902 self.catalog().state().pack_optimizer_notices(
4903 &mut builtin_table_updates,
4904 df_meta.optimizer_notices.iter(),
4905 Diff::ONE,
4906 );
4907
4908 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4910
4911 Some(
4912 self.builtin_table_update()
4913 .execute(builtin_table_updates)
4914 .await
4915 .0,
4916 )
4917 } else {
4918 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4920
4921 None
4922 }
4923 }
4924}