1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::objects::{
25 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
26};
27use mz_expr::{
28 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
29};
30use mz_ore::cast::CastFrom;
31use mz_ore::collections::{CollectionExt, HashSet};
32use mz_ore::task::{self, JoinHandle, spawn};
33use mz_ore::tracing::OpenTelemetryContext;
34use mz_ore::{assert_none, instrument};
35use mz_repr::adt::jsonb::Jsonb;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::explain::ExprHumanizer;
38use mz_repr::explain::json::json_string;
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
42 RowIterator, Timestamp,
43};
44use mz_sql::ast::{
45 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
46 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
47 SqlServerConfigOptionName,
48};
49use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
50use mz_sql::catalog::{
51 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
52 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
53 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
54};
55use mz_sql::names::{
56 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
57 SchemaSpecifier, SystemObjectId,
58};
59use mz_sql::plan::{
60 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
61 StatementContext,
62};
63use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
64use mz_storage_types::sinks::StorageSinkDesc;
65use mz_storage_types::sources::GenericSourceConnection;
66use mz_sql::plan::{
68 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
69 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
70 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
71};
72use mz_sql::session::metadata::SessionMetadata;
73use mz_sql::session::user::UserKind;
74use mz_sql::session::vars::{
75 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
76 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
77};
78use mz_sql::{plan, rbac};
79use mz_sql_parser::ast::display::AstDisplay;
80use mz_sql_parser::ast::{
81 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
82 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
83 WithOptionValue,
84};
85use mz_ssh_util::keys::SshKeyPairSet;
86use mz_storage_client::controller::ExportDescription;
87use mz_storage_types::AlterCompatible;
88use mz_storage_types::connections::inline::IntoInlineConnection;
89use mz_storage_types::controller::StorageError;
90use mz_transform::dataflow::DataflowMetainfo;
91use smallvec::SmallVec;
92use timely::progress::Antichain;
93use tokio::sync::{oneshot, watch};
94use tracing::{Instrument, Span, info, warn};
95
96use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
97use crate::command::{ExecuteResponse, Response};
98use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
99use crate::coord::sequencer::emit_optimizer_notices;
100use crate::coord::{
101 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
102 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
103 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
104 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
105 validate_ip_with_policy_rules,
106};
107use crate::error::AdapterError;
108use crate::notice::{AdapterNotice, DroppedInUseIndex};
109use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
110use crate::optimize::{self, Optimize};
111use crate::session::{
112 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
113 WriteLocks, WriteOp,
114};
115use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
116use crate::{PeekResponseUnary, ReadHolds};
117
118mod cluster;
119mod copy_from;
120mod create_continual_task;
121mod create_index;
122mod create_materialized_view;
123mod create_view;
124mod explain_timestamp;
125mod peek;
126mod secret;
127mod subscribe;
128
129macro_rules! return_if_err {
132 ($expr:expr, $ctx:expr) => {
133 match $expr {
134 Ok(v) => v,
135 Err(e) => return $ctx.retire(Err(e.into())),
136 }
137 };
138}
139
140pub(super) use return_if_err;
141
142struct DropOps {
143 ops: Vec<catalog::Op>,
144 dropped_active_db: bool,
145 dropped_active_cluster: bool,
146 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
147}
148
149struct CreateSourceInner {
151 ops: Vec<catalog::Op>,
152 sources: Vec<(CatalogItemId, Source)>,
153 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
154}
155
156impl Coordinator {
157 pub(crate) async fn sequence_staged<S>(
162 &mut self,
163 mut ctx: S::Ctx,
164 parent_span: Span,
165 mut stage: S,
166 ) where
167 S: Staged + 'static,
168 S::Ctx: Send + 'static,
169 {
170 return_if_err!(stage.validity().check(self.catalog()), ctx);
171 loop {
172 let mut cancel_enabled = stage.cancel_enabled();
173 if let Some(session) = ctx.session() {
174 if cancel_enabled {
175 if let Some((_prev_tx, prev_rx)) = self
178 .staged_cancellation
179 .insert(session.conn_id().clone(), watch::channel(false))
180 {
181 let was_canceled = *prev_rx.borrow();
182 if was_canceled {
183 ctx.retire(Err(AdapterError::Canceled));
184 return;
185 }
186 }
187 } else {
188 self.staged_cancellation.remove(session.conn_id());
191 }
192 } else {
193 cancel_enabled = false
194 };
195 let next = stage
196 .stage(self, &mut ctx)
197 .instrument(parent_span.clone())
198 .await;
199 let res = return_if_err!(next, ctx);
200 stage = match res {
201 StageResult::Handle(handle) => {
202 let internal_cmd_tx = self.internal_cmd_tx.clone();
203 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
204 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
205 });
206 return;
207 }
208 StageResult::HandleRetire(handle) => {
209 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
210 ctx.retire(Ok(resp));
211 });
212 return;
213 }
214 StageResult::Response(resp) => {
215 ctx.retire(Ok(resp));
216 return;
217 }
218 StageResult::Immediate(stage) => *stage,
219 }
220 }
221 }
222
223 fn handle_spawn<C, T, F>(
224 &self,
225 ctx: C,
226 handle: JoinHandle<Result<T, AdapterError>>,
227 cancel_enabled: bool,
228 f: F,
229 ) where
230 C: StagedContext + Send + 'static,
231 T: Send + 'static,
232 F: FnOnce(C, T) + Send + 'static,
233 {
234 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
235 .session()
236 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
237 {
238 let mut rx = rx.clone();
239 Box::pin(async move {
240 let _ = rx.wait_for(|v| *v).await;
242 ()
243 })
244 } else {
245 Box::pin(future::pending())
246 };
247 spawn(|| "sequence_staged", async move {
248 tokio::select! {
249 res = handle => {
250 let next = return_if_err!(res, ctx);
251 f(ctx, next);
252 }
253 _ = rx, if cancel_enabled => {
254 ctx.retire(Err(AdapterError::Canceled));
255 }
256 }
257 });
258 }
259
260 async fn create_source_inner(
261 &self,
262 session: &Session,
263 plans: Vec<plan::CreateSourcePlanBundle>,
264 ) -> Result<CreateSourceInner, AdapterError> {
265 let mut ops = vec![];
266 let mut sources = vec![];
267
268 let if_not_exists_ids = plans
269 .iter()
270 .filter_map(
271 |plan::CreateSourcePlanBundle {
272 item_id,
273 global_id: _,
274 plan,
275 resolved_ids: _,
276 available_source_references: _,
277 }| {
278 if plan.if_not_exists {
279 Some((*item_id, plan.name.clone()))
280 } else {
281 None
282 }
283 },
284 )
285 .collect::<BTreeMap<_, _>>();
286
287 for plan::CreateSourcePlanBundle {
288 item_id,
289 global_id,
290 mut plan,
291 resolved_ids,
292 available_source_references,
293 } in plans
294 {
295 let name = plan.name.clone();
296
297 match plan.source.data_source {
298 plan::DataSourceDesc::Ingestion(ref desc)
299 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
300 let cluster_id = plan
301 .in_cluster
302 .expect("ingestion plans must specify cluster");
303 match desc.connection {
304 GenericSourceConnection::Postgres(_)
305 | GenericSourceConnection::MySql(_)
306 | GenericSourceConnection::SqlServer(_)
307 | GenericSourceConnection::Kafka(_)
308 | GenericSourceConnection::LoadGenerator(_) => {
309 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
310 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
311 .get(self.catalog().system_config().dyncfgs());
312
313 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
314 {
315 return Err(AdapterError::Unsupported(
316 "sources in clusters with >1 replicas",
317 ));
318 }
319 }
320 }
321 }
322 }
323 plan::DataSourceDesc::Webhook { .. } => {
324 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
325 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
326 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
327 .get(self.catalog().system_config().dyncfgs());
328
329 if !enable_multi_replica_sources {
330 if cluster.replica_ids().len() > 1 {
331 return Err(AdapterError::Unsupported(
332 "webhook sources in clusters with >1 replicas",
333 ));
334 }
335 }
336 }
337 }
338 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
339 }
340
341 if let mz_sql::plan::DataSourceDesc::Webhook {
343 validate_using: Some(validate),
344 ..
345 } = &mut plan.source.data_source
346 {
347 if let Err(reason) = validate.reduce_expression().await {
348 self.metrics
349 .webhook_validation_reduce_failures
350 .with_label_values(&[reason])
351 .inc();
352 return Err(AdapterError::Internal(format!(
353 "failed to reduce check expression, {reason}"
354 )));
355 }
356 }
357
358 let mut reference_ops = vec![];
361 if let Some(references) = &available_source_references {
362 reference_ops.push(catalog::Op::UpdateSourceReferences {
363 source_id: item_id,
364 references: references.clone().into(),
365 });
366 }
367
368 let source = Source::new(plan, global_id, resolved_ids, None, false);
369 ops.push(catalog::Op::CreateItem {
370 id: item_id,
371 name,
372 item: CatalogItem::Source(source.clone()),
373 owner_id: *session.current_role_id(),
374 });
375 sources.push((item_id, source));
376 ops.extend(reference_ops);
378 }
379
380 Ok(CreateSourceInner {
381 ops,
382 sources,
383 if_not_exists_ids,
384 })
385 }
386
387 pub(crate) fn plan_subsource(
395 &self,
396 session: &Session,
397 params: &mz_sql::plan::Params,
398 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
399 item_id: CatalogItemId,
400 global_id: GlobalId,
401 ) -> Result<CreateSourcePlanBundle, AdapterError> {
402 let catalog = self.catalog().for_session(session);
403 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
404
405 let plan = self.plan_statement(
406 session,
407 Statement::CreateSubsource(subsource_stmt),
408 params,
409 &resolved_ids,
410 )?;
411 let plan = match plan {
412 Plan::CreateSource(plan) => plan,
413 _ => unreachable!(),
414 };
415 Ok(CreateSourcePlanBundle {
416 item_id,
417 global_id,
418 plan,
419 resolved_ids,
420 available_source_references: None,
421 })
422 }
423
424 pub(crate) async fn plan_purified_alter_source_add_subsource(
426 &mut self,
427 session: &Session,
428 params: Params,
429 source_name: ResolvedItemName,
430 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
431 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
432 ) -> Result<(Plan, ResolvedIds), AdapterError> {
433 let mut subsource_plans = Vec::with_capacity(subsources.len());
434
435 let conn_catalog = self.catalog().for_system_session();
437 let pcx = plan::PlanContext::zero();
438 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
439
440 let entry = self.catalog().get_entry(source_name.item_id());
441 let source = entry.source().ok_or_else(|| {
442 AdapterError::internal(
443 "plan alter source",
444 format!("expected Source found {entry:?}"),
445 )
446 })?;
447
448 let item_id = entry.id();
449 let ingestion_id = source.global_id();
450 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
451
452 let id_ts = self.get_catalog_write_ts().await;
453 let ids = self
454 .catalog()
455 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
456 .await?;
457 for (subsource_stmt, (item_id, global_id)) in
458 subsource_stmts.into_iter().zip_eq(ids.into_iter())
459 {
460 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
461 subsource_plans.push(s);
462 }
463
464 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
465 subsources: subsource_plans,
466 options,
467 };
468
469 Ok((
470 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
471 item_id,
472 ingestion_id,
473 action,
474 }),
475 ResolvedIds::empty(),
476 ))
477 }
478
479 pub(crate) fn plan_purified_alter_source_refresh_references(
481 &self,
482 _session: &Session,
483 _params: Params,
484 source_name: ResolvedItemName,
485 available_source_references: plan::SourceReferences,
486 ) -> Result<(Plan, ResolvedIds), AdapterError> {
487 let entry = self.catalog().get_entry(source_name.item_id());
488 let source = entry.source().ok_or_else(|| {
489 AdapterError::internal(
490 "plan alter source",
491 format!("expected Source found {entry:?}"),
492 )
493 })?;
494 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
495 references: available_source_references,
496 };
497
498 Ok((
499 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
500 item_id: entry.id(),
501 ingestion_id: source.global_id(),
502 action,
503 }),
504 ResolvedIds::empty(),
505 ))
506 }
507
508 pub(crate) async fn plan_purified_create_source(
512 &mut self,
513 ctx: &ExecuteContext,
514 params: Params,
515 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
516 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
517 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
518 available_source_references: plan::SourceReferences,
519 ) -> Result<(Plan, ResolvedIds), AdapterError> {
520 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
521
522 if let Some(progress_stmt) = progress_stmt {
524 assert_none!(progress_stmt.of_source);
529 let id_ts = self.get_catalog_write_ts().await;
530 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
531 let progress_plan =
532 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
533 let progress_full_name = self
534 .catalog()
535 .resolve_full_name(&progress_plan.plan.name, None);
536 let progress_subsource = ResolvedItemName::Item {
537 id: progress_plan.item_id,
538 qualifiers: progress_plan.plan.name.qualifiers.clone(),
539 full_name: progress_full_name,
540 print_id: true,
541 version: RelationVersionSelector::Latest,
542 };
543
544 create_source_plans.push(progress_plan);
545
546 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
547 }
548
549 let catalog = self.catalog().for_session(ctx.session());
550 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
551
552 let propagated_with_options: Vec<_> = source_stmt
553 .with_options
554 .iter()
555 .filter_map(|opt| match opt.name {
556 CreateSourceOptionName::TimestampInterval => None,
557 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
558 name: CreateSubsourceOptionName::RetainHistory,
559 value: opt.value.clone(),
560 }),
561 })
562 .collect();
563
564 let source_plan = match self.plan_statement(
566 ctx.session(),
567 Statement::CreateSource(source_stmt),
568 ¶ms,
569 &resolved_ids,
570 )? {
571 Plan::CreateSource(plan) => plan,
572 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
573 };
574
575 let id_ts = self.get_catalog_write_ts().await;
576 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
577
578 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579 let of_source = ResolvedItemName::Item {
580 id: item_id,
581 qualifiers: source_plan.name.qualifiers.clone(),
582 full_name: source_full_name,
583 print_id: true,
584 version: RelationVersionSelector::Latest,
585 };
586
587 let conn_catalog = self.catalog().for_system_session();
589 let pcx = plan::PlanContext::zero();
590 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594 for subsource_stmt in subsource_stmts.iter_mut() {
595 subsource_stmt
596 .with_options
597 .extend(propagated_with_options.iter().cloned())
598 }
599
600 create_source_plans.push(CreateSourcePlanBundle {
601 item_id,
602 global_id,
603 plan: source_plan,
604 resolved_ids: resolved_ids.clone(),
605 available_source_references: Some(available_source_references),
606 });
607
608 let id_ts = self.get_catalog_write_ts().await;
610 let ids = self
611 .catalog()
612 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
613 .await?;
614 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
615 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
616 create_source_plans.push(plan);
617 }
618
619 Ok((
620 Plan::CreateSources(create_source_plans),
621 ResolvedIds::empty(),
622 ))
623 }
624
625 #[instrument]
626 pub(super) async fn sequence_create_source(
627 &mut self,
628 ctx: &mut ExecuteContext,
629 plans: Vec<plan::CreateSourcePlanBundle>,
630 ) -> Result<ExecuteResponse, AdapterError> {
631 let CreateSourceInner {
632 ops,
633 sources,
634 if_not_exists_ids,
635 } = self.create_source_inner(ctx.session(), plans).await?;
636
637 let transact_result = self
638 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
639 .await;
640
641 for (item_id, source) in &sources {
643 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
644 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
645 ctx.session()
646 .add_notice(AdapterNotice::WebhookSourceCreated { url });
647 }
648 }
649 }
650
651 match transact_result {
652 Ok(()) => Ok(ExecuteResponse::CreatedSource),
653 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
654 kind:
655 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
656 })) if if_not_exists_ids.contains_key(&id) => {
657 ctx.session()
658 .add_notice(AdapterNotice::ObjectAlreadyExists {
659 name: if_not_exists_ids[&id].item.clone(),
660 ty: "source",
661 });
662 Ok(ExecuteResponse::CreatedSource)
663 }
664 Err(err) => Err(err),
665 }
666 }
667
668 #[instrument]
669 pub(super) async fn sequence_create_connection(
670 &mut self,
671 mut ctx: ExecuteContext,
672 plan: plan::CreateConnectionPlan,
673 resolved_ids: ResolvedIds,
674 ) {
675 let id_ts = self.get_catalog_write_ts().await;
676 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
677 Ok(item_id) => item_id,
678 Err(err) => return ctx.retire(Err(err.into())),
679 };
680
681 match &plan.connection.details {
682 ConnectionDetails::Ssh { key_1, key_2, .. } => {
683 let key_1 = match key_1.as_key_pair() {
684 Some(key_1) => key_1.clone(),
685 None => {
686 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
687 "the PUBLIC KEY 1 option cannot be explicitly specified"
688 ))));
689 }
690 };
691
692 let key_2 = match key_2.as_key_pair() {
693 Some(key_2) => key_2.clone(),
694 None => {
695 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
696 "the PUBLIC KEY 2 option cannot be explicitly specified"
697 ))));
698 }
699 };
700
701 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
702 let secret = key_set.to_bytes();
703 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
704 return ctx.retire(Err(err.into()));
705 }
706 }
707 _ => (),
708 };
709
710 if plan.validate {
711 let internal_cmd_tx = self.internal_cmd_tx.clone();
712 let transient_revision = self.catalog().transient_revision();
713 let conn_id = ctx.session().conn_id().clone();
714 let otel_ctx = OpenTelemetryContext::obtain();
715 let role_metadata = ctx.session().role_metadata().clone();
716
717 let connection = plan
718 .connection
719 .details
720 .to_connection()
721 .into_inline_connection(self.catalog().state());
722
723 let current_storage_parameters = self.controller.storage.config().clone();
724 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
725 let result = match connection
726 .validate(connection_id, ¤t_storage_parameters)
727 .await
728 {
729 Ok(()) => Ok(plan),
730 Err(err) => Err(err.into()),
731 };
732
733 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
735 CreateConnectionValidationReady {
736 ctx,
737 result,
738 connection_id,
739 connection_gid,
740 plan_validity: PlanValidity::new(
741 transient_revision,
742 resolved_ids.items().copied().collect(),
743 None,
744 None,
745 role_metadata,
746 ),
747 otel_ctx,
748 resolved_ids: resolved_ids.clone(),
749 },
750 ));
751 if let Err(e) = result {
752 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
753 }
754 });
755 } else {
756 let result = self
757 .sequence_create_connection_stage_finish(
758 &mut ctx,
759 connection_id,
760 connection_gid,
761 plan,
762 resolved_ids,
763 )
764 .await;
765 ctx.retire(result);
766 }
767 }
768
769 #[instrument]
770 pub(crate) async fn sequence_create_connection_stage_finish(
771 &mut self,
772 ctx: &mut ExecuteContext,
773 connection_id: CatalogItemId,
774 connection_gid: GlobalId,
775 plan: plan::CreateConnectionPlan,
776 resolved_ids: ResolvedIds,
777 ) -> Result<ExecuteResponse, AdapterError> {
778 let ops = vec![catalog::Op::CreateItem {
779 id: connection_id,
780 name: plan.name.clone(),
781 item: CatalogItem::Connection(Connection {
782 create_sql: plan.connection.create_sql,
783 global_id: connection_gid,
784 details: plan.connection.details.clone(),
785 resolved_ids,
786 }),
787 owner_id: *ctx.session().current_role_id(),
788 }];
789
790 let conn_id = ctx.session().conn_id().clone();
793 let transact_result = self
794 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
795 .await;
796
797 match transact_result {
798 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
799 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
800 kind:
801 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
802 })) if plan.if_not_exists => {
803 ctx.session()
804 .add_notice(AdapterNotice::ObjectAlreadyExists {
805 name: plan.name.item,
806 ty: "connection",
807 });
808 Ok(ExecuteResponse::CreatedConnection)
809 }
810 Err(err) => Err(err),
811 }
812 }
813
814 #[instrument]
815 pub(super) async fn sequence_create_database(
816 &mut self,
817 session: &Session,
818 plan: plan::CreateDatabasePlan,
819 ) -> Result<ExecuteResponse, AdapterError> {
820 let ops = vec![catalog::Op::CreateDatabase {
821 name: plan.name.clone(),
822 owner_id: *session.current_role_id(),
823 }];
824 match self.catalog_transact(Some(session), ops).await {
825 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
826 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
827 kind:
828 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
829 })) if plan.if_not_exists => {
830 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
831 Ok(ExecuteResponse::CreatedDatabase)
832 }
833 Err(err) => Err(err),
834 }
835 }
836
837 #[instrument]
838 pub(super) async fn sequence_create_schema(
839 &mut self,
840 session: &Session,
841 plan: plan::CreateSchemaPlan,
842 ) -> Result<ExecuteResponse, AdapterError> {
843 let op = catalog::Op::CreateSchema {
844 database_id: plan.database_spec,
845 schema_name: plan.schema_name.clone(),
846 owner_id: *session.current_role_id(),
847 };
848 match self.catalog_transact(Some(session), vec![op]).await {
849 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
850 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
851 kind:
852 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
853 })) if plan.if_not_exists => {
854 session.add_notice(AdapterNotice::SchemaAlreadyExists {
855 name: plan.schema_name,
856 });
857 Ok(ExecuteResponse::CreatedSchema)
858 }
859 Err(err) => Err(err),
860 }
861 }
862
863 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
865 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
866 if attributes.superuser.is_some()
867 || attributes.password.is_some()
868 || attributes.login.is_some()
869 {
870 return Err(AdapterError::UnavailableFeature {
871 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
872 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
873 });
874 }
875 }
876 Ok(())
877 }
878
879 #[instrument]
880 pub(super) async fn sequence_create_role(
881 &mut self,
882 conn_id: Option<&ConnectionId>,
883 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
884 ) -> Result<ExecuteResponse, AdapterError> {
885 self.validate_role_attributes(&attributes.clone())?;
886 let op = catalog::Op::CreateRole { name, attributes };
887 self.catalog_transact_with_context(conn_id, None, vec![op])
888 .await
889 .map(|_| ExecuteResponse::CreatedRole)
890 }
891
892 #[instrument]
893 pub(super) async fn sequence_create_network_policy(
894 &mut self,
895 session: &Session,
896 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
897 ) -> Result<ExecuteResponse, AdapterError> {
898 let op = catalog::Op::CreateNetworkPolicy {
899 rules,
900 name,
901 owner_id: *session.current_role_id(),
902 };
903 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
904 .await
905 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
906 }
907
908 #[instrument]
909 pub(super) async fn sequence_alter_network_policy(
910 &mut self,
911 session: &Session,
912 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
913 ) -> Result<ExecuteResponse, AdapterError> {
914 let current_network_policy_name =
916 self.catalog().system_config().default_network_policy_name();
917 if current_network_policy_name == name {
919 self.validate_alter_network_policy(session, &rules)?;
920 }
921
922 let op = catalog::Op::AlterNetworkPolicy {
923 id,
924 rules,
925 name,
926 owner_id: *session.current_role_id(),
927 };
928 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
929 .await
930 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
931 }
932
933 #[instrument]
934 pub(super) async fn sequence_create_table(
935 &mut self,
936 ctx: &mut ExecuteContext,
937 plan: plan::CreateTablePlan,
938 resolved_ids: ResolvedIds,
939 ) -> Result<ExecuteResponse, AdapterError> {
940 let plan::CreateTablePlan {
941 name,
942 table,
943 if_not_exists,
944 } = plan;
945
946 let conn_id = if table.temporary {
947 Some(ctx.session().conn_id())
948 } else {
949 None
950 };
951 let id_ts = self.get_catalog_write_ts().await;
952 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
953 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
954
955 let data_source = match table.data_source {
956 plan::TableDataSource::TableWrites { defaults } => {
957 TableDataSource::TableWrites { defaults }
958 }
959 plan::TableDataSource::DataSource {
960 desc: data_source_plan,
961 timeline,
962 } => match data_source_plan {
963 plan::DataSourceDesc::IngestionExport {
964 ingestion_id,
965 external_reference,
966 details,
967 data_config,
968 } => TableDataSource::DataSource {
969 desc: DataSourceDesc::IngestionExport {
970 ingestion_id,
971 external_reference,
972 details,
973 data_config,
974 },
975 timeline,
976 },
977 plan::DataSourceDesc::Webhook {
978 validate_using,
979 body_format,
980 headers,
981 cluster_id,
982 } => TableDataSource::DataSource {
983 desc: DataSourceDesc::Webhook {
984 validate_using,
985 body_format,
986 headers,
987 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
988 },
989 timeline,
990 },
991 o => {
992 unreachable!("CREATE TABLE data source got {:?}", o)
993 }
994 },
995 };
996
997 let is_webhook = if let TableDataSource::DataSource {
998 desc: DataSourceDesc::Webhook { .. },
999 timeline: _,
1000 } = &data_source
1001 {
1002 true
1003 } else {
1004 false
1005 };
1006
1007 let table = Table {
1008 create_sql: Some(table.create_sql),
1009 desc: table.desc,
1010 collections,
1011 conn_id: conn_id.cloned(),
1012 resolved_ids,
1013 custom_logical_compaction_window: table.compaction_window,
1014 is_retained_metrics_object: false,
1015 data_source,
1016 };
1017 let ops = vec![catalog::Op::CreateItem {
1018 id: table_id,
1019 name: name.clone(),
1020 item: CatalogItem::Table(table.clone()),
1021 owner_id: *ctx.session().current_role_id(),
1022 }];
1023
1024 let catalog_result = self
1025 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1026 .await;
1027
1028 if is_webhook {
1029 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1032 ctx.session()
1033 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1034 }
1035 }
1036
1037 match catalog_result {
1038 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1039 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1040 kind:
1041 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1042 })) if if_not_exists => {
1043 ctx.session_mut()
1044 .add_notice(AdapterNotice::ObjectAlreadyExists {
1045 name: name.item,
1046 ty: "table",
1047 });
1048 Ok(ExecuteResponse::CreatedTable)
1049 }
1050 Err(err) => Err(err),
1051 }
1052 }
1053
1054 #[instrument]
1055 pub(super) async fn sequence_create_sink(
1056 &mut self,
1057 ctx: ExecuteContext,
1058 plan: plan::CreateSinkPlan,
1059 resolved_ids: ResolvedIds,
1060 ) {
1061 let plan::CreateSinkPlan {
1062 name,
1063 sink,
1064 with_snapshot,
1065 if_not_exists,
1066 in_cluster,
1067 } = plan;
1068
1069 let id_ts = self.get_catalog_write_ts().await;
1071 let (item_id, global_id) =
1072 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1073
1074 let catalog_sink = Sink {
1075 create_sql: sink.create_sql,
1076 global_id,
1077 from: sink.from,
1078 connection: sink.connection,
1079 envelope: sink.envelope,
1080 version: sink.version,
1081 with_snapshot,
1082 resolved_ids,
1083 cluster_id: in_cluster,
1084 commit_interval: sink.commit_interval,
1085 };
1086
1087 let ops = vec![catalog::Op::CreateItem {
1088 id: item_id,
1089 name: name.clone(),
1090 item: CatalogItem::Sink(catalog_sink.clone()),
1091 owner_id: *ctx.session().current_role_id(),
1092 }];
1093
1094 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1095
1096 match result {
1097 Ok(()) => {}
1098 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1099 kind:
1100 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1101 })) if if_not_exists => {
1102 ctx.session()
1103 .add_notice(AdapterNotice::ObjectAlreadyExists {
1104 name: name.item,
1105 ty: "sink",
1106 });
1107 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1108 return;
1109 }
1110 Err(e) => {
1111 ctx.retire(Err(e));
1112 return;
1113 }
1114 };
1115
1116 self.create_storage_export(global_id, &catalog_sink)
1117 .await
1118 .unwrap_or_terminate("cannot fail to create exports");
1119
1120 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1121 .await;
1122
1123 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1124 }
1125
1126 pub(super) fn validate_system_column_references(
1149 &self,
1150 uses_ambiguous_columns: bool,
1151 depends_on: &BTreeSet<GlobalId>,
1152 ) -> Result<(), AdapterError> {
1153 if uses_ambiguous_columns
1154 && depends_on
1155 .iter()
1156 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1157 {
1158 Err(AdapterError::AmbiguousSystemColumnReference)
1159 } else {
1160 Ok(())
1161 }
1162 }
1163
1164 #[instrument]
1165 pub(super) async fn sequence_create_type(
1166 &mut self,
1167 session: &Session,
1168 plan: plan::CreateTypePlan,
1169 resolved_ids: ResolvedIds,
1170 ) -> Result<ExecuteResponse, AdapterError> {
1171 let id_ts = self.get_catalog_write_ts().await;
1172 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1173 plan.typ
1175 .inner
1176 .desc(&self.catalog().for_session(session))
1177 .map_err(AdapterError::from)?;
1178 let typ = Type {
1179 create_sql: Some(plan.typ.create_sql),
1180 global_id,
1181 details: CatalogTypeDetails {
1182 array_id: None,
1183 typ: plan.typ.inner,
1184 pg_metadata: None,
1185 },
1186 resolved_ids,
1187 };
1188 let op = catalog::Op::CreateItem {
1189 id: item_id,
1190 name: plan.name,
1191 item: CatalogItem::Type(typ),
1192 owner_id: *session.current_role_id(),
1193 };
1194 match self.catalog_transact(Some(session), vec![op]).await {
1195 Ok(()) => Ok(ExecuteResponse::CreatedType),
1196 Err(err) => Err(err),
1197 }
1198 }
1199
1200 #[instrument]
1201 pub(super) async fn sequence_comment_on(
1202 &mut self,
1203 session: &Session,
1204 plan: plan::CommentPlan,
1205 ) -> Result<ExecuteResponse, AdapterError> {
1206 let op = catalog::Op::Comment {
1207 object_id: plan.object_id,
1208 sub_component: plan.sub_component,
1209 comment: plan.comment,
1210 };
1211 self.catalog_transact(Some(session), vec![op]).await?;
1212 Ok(ExecuteResponse::Comment)
1213 }
1214
1215 #[instrument]
1216 pub(super) async fn sequence_drop_objects(
1217 &mut self,
1218 ctx: &mut ExecuteContext,
1219 plan::DropObjectsPlan {
1220 drop_ids,
1221 object_type,
1222 referenced_ids,
1223 }: plan::DropObjectsPlan,
1224 ) -> Result<ExecuteResponse, AdapterError> {
1225 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1226 let mut objects = Vec::new();
1227 for obj_id in &drop_ids {
1228 if !referenced_ids_hashset.contains(obj_id) {
1229 let object_info = ErrorMessageObjectDescription::from_id(
1230 obj_id,
1231 &self.catalog().for_session(ctx.session()),
1232 )
1233 .to_string();
1234 objects.push(object_info);
1235 }
1236 }
1237
1238 if !objects.is_empty() {
1239 ctx.session()
1240 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1241 }
1242
1243 let DropOps {
1244 ops,
1245 dropped_active_db,
1246 dropped_active_cluster,
1247 dropped_in_use_indexes,
1248 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1249
1250 self.catalog_transact_with_context(None, Some(ctx), ops)
1251 .await?;
1252
1253 fail::fail_point!("after_sequencer_drop_replica");
1254
1255 if dropped_active_db {
1256 ctx.session()
1257 .add_notice(AdapterNotice::DroppedActiveDatabase {
1258 name: ctx.session().vars().database().to_string(),
1259 });
1260 }
1261 if dropped_active_cluster {
1262 ctx.session()
1263 .add_notice(AdapterNotice::DroppedActiveCluster {
1264 name: ctx.session().vars().cluster().to_string(),
1265 });
1266 }
1267 for dropped_in_use_index in dropped_in_use_indexes {
1268 ctx.session()
1269 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1270 self.metrics
1271 .optimization_notices
1272 .with_label_values(&["DroppedInUseIndex"])
1273 .inc_by(1);
1274 }
1275 Ok(ExecuteResponse::DroppedObject(object_type))
1276 }
1277
1278 fn validate_dropped_role_ownership(
1279 &self,
1280 session: &Session,
1281 dropped_roles: &BTreeMap<RoleId, &str>,
1282 ) -> Result<(), AdapterError> {
1283 fn privilege_check(
1284 privileges: &PrivilegeMap,
1285 dropped_roles: &BTreeMap<RoleId, &str>,
1286 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1287 object_id: &SystemObjectId,
1288 catalog: &ConnCatalog,
1289 ) {
1290 for privilege in privileges.all_values() {
1291 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1292 let grantor_name = catalog.get_role(&privilege.grantor).name();
1293 let object_description =
1294 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1295 dependent_objects
1296 .entry(role_name.to_string())
1297 .or_default()
1298 .push(format!(
1299 "privileges on {object_description} granted by {grantor_name}",
1300 ));
1301 }
1302 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1303 let grantee_name = catalog.get_role(&privilege.grantee).name();
1304 let object_description =
1305 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1306 dependent_objects
1307 .entry(role_name.to_string())
1308 .or_default()
1309 .push(format!(
1310 "privileges granted on {object_description} to {grantee_name}"
1311 ));
1312 }
1313 }
1314 }
1315
1316 let catalog = self.catalog().for_session(session);
1317 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1318 for entry in self.catalog.entries() {
1319 let id = SystemObjectId::Object(entry.id().into());
1320 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1321 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1322 dependent_objects
1323 .entry(role_name.to_string())
1324 .or_default()
1325 .push(format!("owner of {object_description}"));
1326 }
1327 privilege_check(
1328 entry.privileges(),
1329 dropped_roles,
1330 &mut dependent_objects,
1331 &id,
1332 &catalog,
1333 );
1334 }
1335 for database in self.catalog.databases() {
1336 let database_id = SystemObjectId::Object(database.id().into());
1337 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1338 let object_description =
1339 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1340 dependent_objects
1341 .entry(role_name.to_string())
1342 .or_default()
1343 .push(format!("owner of {object_description}"));
1344 }
1345 privilege_check(
1346 &database.privileges,
1347 dropped_roles,
1348 &mut dependent_objects,
1349 &database_id,
1350 &catalog,
1351 );
1352 for schema in database.schemas_by_id.values() {
1353 let schema_id = SystemObjectId::Object(
1354 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1355 );
1356 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1357 let object_description =
1358 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1359 dependent_objects
1360 .entry(role_name.to_string())
1361 .or_default()
1362 .push(format!("owner of {object_description}"));
1363 }
1364 privilege_check(
1365 &schema.privileges,
1366 dropped_roles,
1367 &mut dependent_objects,
1368 &schema_id,
1369 &catalog,
1370 );
1371 }
1372 }
1373 for cluster in self.catalog.clusters() {
1374 let cluster_id = SystemObjectId::Object(cluster.id().into());
1375 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1376 let object_description =
1377 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1378 dependent_objects
1379 .entry(role_name.to_string())
1380 .or_default()
1381 .push(format!("owner of {object_description}"));
1382 }
1383 privilege_check(
1384 &cluster.privileges,
1385 dropped_roles,
1386 &mut dependent_objects,
1387 &cluster_id,
1388 &catalog,
1389 );
1390 for replica in cluster.replicas() {
1391 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1392 let replica_id =
1393 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1394 let object_description =
1395 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1396 dependent_objects
1397 .entry(role_name.to_string())
1398 .or_default()
1399 .push(format!("owner of {object_description}"));
1400 }
1401 }
1402 }
1403 privilege_check(
1404 self.catalog().system_privileges(),
1405 dropped_roles,
1406 &mut dependent_objects,
1407 &SystemObjectId::System,
1408 &catalog,
1409 );
1410 for (default_privilege_object, default_privilege_acl_items) in
1411 self.catalog.default_privileges()
1412 {
1413 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1414 dependent_objects
1415 .entry(role_name.to_string())
1416 .or_default()
1417 .push(format!(
1418 "default privileges on {}S created by {}",
1419 default_privilege_object.object_type, role_name
1420 ));
1421 }
1422 for default_privilege_acl_item in default_privilege_acl_items {
1423 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1424 dependent_objects
1425 .entry(role_name.to_string())
1426 .or_default()
1427 .push(format!(
1428 "default privileges on {}S granted to {}",
1429 default_privilege_object.object_type, role_name
1430 ));
1431 }
1432 }
1433 }
1434
1435 if !dependent_objects.is_empty() {
1436 Err(AdapterError::DependentObject(dependent_objects))
1437 } else {
1438 Ok(())
1439 }
1440 }
1441
1442 #[instrument]
1443 pub(super) async fn sequence_drop_owned(
1444 &mut self,
1445 session: &Session,
1446 plan: plan::DropOwnedPlan,
1447 ) -> Result<ExecuteResponse, AdapterError> {
1448 for role_id in &plan.role_ids {
1449 self.catalog().ensure_not_reserved_role(role_id)?;
1450 }
1451
1452 let mut privilege_revokes = plan.privilege_revokes;
1453
1454 let session_catalog = self.catalog().for_session(session);
1456 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1457 && !session.is_superuser()
1458 {
1459 let role_membership =
1461 session_catalog.collect_role_membership(session.current_role_id());
1462 let invalid_revokes: BTreeSet<_> = privilege_revokes
1463 .extract_if(.., |(_, privilege)| {
1464 !role_membership.contains(&privilege.grantor)
1465 })
1466 .map(|(object_id, _)| object_id)
1467 .collect();
1468 for invalid_revoke in invalid_revokes {
1469 let object_description =
1470 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1471 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1472 }
1473 }
1474
1475 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1476 catalog::Op::UpdatePrivilege {
1477 target_id: object_id,
1478 privilege,
1479 variant: UpdatePrivilegeVariant::Revoke,
1480 }
1481 });
1482 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1483 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1484 privilege_object,
1485 privilege_acl_item,
1486 variant: UpdatePrivilegeVariant::Revoke,
1487 },
1488 );
1489 let DropOps {
1490 ops: drop_ops,
1491 dropped_active_db,
1492 dropped_active_cluster,
1493 dropped_in_use_indexes,
1494 } = self.sequence_drop_common(session, plan.drop_ids)?;
1495
1496 let ops = privilege_revoke_ops
1497 .chain(default_privilege_revoke_ops)
1498 .chain(drop_ops.into_iter())
1499 .collect();
1500
1501 self.catalog_transact(Some(session), ops).await?;
1502
1503 if dropped_active_db {
1504 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1505 name: session.vars().database().to_string(),
1506 });
1507 }
1508 if dropped_active_cluster {
1509 session.add_notice(AdapterNotice::DroppedActiveCluster {
1510 name: session.vars().cluster().to_string(),
1511 });
1512 }
1513 for dropped_in_use_index in dropped_in_use_indexes {
1514 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1515 }
1516 Ok(ExecuteResponse::DroppedOwned)
1517 }
1518
1519 fn sequence_drop_common(
1520 &self,
1521 session: &Session,
1522 ids: Vec<ObjectId>,
1523 ) -> Result<DropOps, AdapterError> {
1524 let mut dropped_active_db = false;
1525 let mut dropped_active_cluster = false;
1526 let mut dropped_in_use_indexes = Vec::new();
1527 let mut dropped_roles = BTreeMap::new();
1528 let mut dropped_databases = BTreeSet::new();
1529 let mut dropped_schemas = BTreeSet::new();
1530 let mut role_revokes = BTreeSet::new();
1534 let mut default_privilege_revokes = BTreeSet::new();
1537
1538 let mut clusters_to_drop = BTreeSet::new();
1540
1541 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1542 for id in &ids {
1543 match id {
1544 ObjectId::Database(id) => {
1545 let name = self.catalog().get_database(id).name();
1546 if name == session.vars().database() {
1547 dropped_active_db = true;
1548 }
1549 dropped_databases.insert(id);
1550 }
1551 ObjectId::Schema((_, spec)) => {
1552 if let SchemaSpecifier::Id(id) = spec {
1553 dropped_schemas.insert(id);
1554 }
1555 }
1556 ObjectId::Cluster(id) => {
1557 clusters_to_drop.insert(*id);
1558 if let Some(active_id) = self
1559 .catalog()
1560 .active_cluster(session)
1561 .ok()
1562 .map(|cluster| cluster.id())
1563 {
1564 if id == &active_id {
1565 dropped_active_cluster = true;
1566 }
1567 }
1568 }
1569 ObjectId::Role(id) => {
1570 let role = self.catalog().get_role(id);
1571 let name = role.name();
1572 dropped_roles.insert(*id, name);
1573 for (group_id, grantor_id) in &role.membership.map {
1575 role_revokes.insert((*group_id, *id, *grantor_id));
1576 }
1577 }
1578 ObjectId::Item(id) => {
1579 if let Some(index) = self.catalog().get_entry(id).index() {
1580 let humanizer = self.catalog().for_session(session);
1581 let dependants = self
1582 .controller
1583 .compute
1584 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1585 .ok()
1586 .into_iter()
1587 .flatten()
1588 .filter(|dependant_id| {
1589 if dependant_id.is_transient() {
1596 return false;
1597 }
1598 let Some(dependent_id) = humanizer
1600 .try_get_item_by_global_id(dependant_id)
1601 .map(|item| item.id())
1602 else {
1603 return false;
1604 };
1605 !ids_set.contains(&ObjectId::Item(dependent_id))
1608 })
1609 .flat_map(|dependant_id| {
1610 humanizer.humanize_id(dependant_id)
1614 })
1615 .collect_vec();
1616 if !dependants.is_empty() {
1617 dropped_in_use_indexes.push(DroppedInUseIndex {
1618 index_name: humanizer
1619 .humanize_id(index.global_id())
1620 .unwrap_or_else(|| id.to_string()),
1621 dependant_objects: dependants,
1622 });
1623 }
1624 }
1625 }
1626 _ => {}
1627 }
1628 }
1629
1630 for id in &ids {
1631 match id {
1632 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1636 if !clusters_to_drop.contains(cluster_id) {
1637 let cluster = self.catalog.get_cluster(*cluster_id);
1638 if cluster.is_managed() {
1639 let replica =
1640 cluster.replica(*replica_id).expect("Catalog out of sync");
1641 if !replica.config.location.internal() {
1642 coord_bail!("cannot drop replica of managed cluster");
1643 }
1644 }
1645 }
1646 }
1647 _ => {}
1648 }
1649 }
1650
1651 for role_id in dropped_roles.keys() {
1652 self.catalog().ensure_not_reserved_role(role_id)?;
1653 }
1654 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1655 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1657 for role in self.catalog().user_roles() {
1658 for dropped_role_id in
1659 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1660 {
1661 role_revokes.insert((
1662 **dropped_role_id,
1663 role.id(),
1664 *role
1665 .membership
1666 .map
1667 .get(*dropped_role_id)
1668 .expect("included in keys above"),
1669 ));
1670 }
1671 }
1672
1673 for (default_privilege_object, default_privilege_acls) in
1674 self.catalog().default_privileges()
1675 {
1676 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1677 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1678 {
1679 for default_privilege_acl in default_privilege_acls {
1680 default_privilege_revokes.insert((
1681 default_privilege_object.clone(),
1682 default_privilege_acl.clone(),
1683 ));
1684 }
1685 }
1686 }
1687
1688 let ops = role_revokes
1689 .into_iter()
1690 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1691 role_id,
1692 member_id,
1693 grantor_id,
1694 })
1695 .chain(default_privilege_revokes.into_iter().map(
1696 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1697 privilege_object,
1698 privilege_acl_item,
1699 variant: UpdatePrivilegeVariant::Revoke,
1700 },
1701 ))
1702 .chain(iter::once(catalog::Op::DropObjects(
1703 ids.into_iter()
1704 .map(DropObjectInfo::manual_drop_from_object_id)
1705 .collect(),
1706 )))
1707 .collect();
1708
1709 Ok(DropOps {
1710 ops,
1711 dropped_active_db,
1712 dropped_active_cluster,
1713 dropped_in_use_indexes,
1714 })
1715 }
1716
1717 pub(super) fn sequence_explain_schema(
1718 &self,
1719 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1720 ) -> Result<ExecuteResponse, AdapterError> {
1721 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1722 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1723 })?;
1724
1725 let json_string = json_string(&json_value);
1726 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1727 Ok(Self::send_immediate_rows(row))
1728 }
1729
1730 pub(super) fn sequence_show_all_variables(
1731 &self,
1732 session: &Session,
1733 ) -> Result<ExecuteResponse, AdapterError> {
1734 let mut rows = viewable_variables(self.catalog().state(), session)
1735 .map(|v| (v.name(), v.value(), v.description()))
1736 .collect::<Vec<_>>();
1737 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1738
1739 let rows: Vec<_> = rows
1741 .into_iter()
1742 .map(|(name, val, desc)| {
1743 Row::pack_slice(&[
1744 Datum::String(name),
1745 Datum::String(&val),
1746 Datum::String(desc),
1747 ])
1748 })
1749 .collect();
1750 Ok(Self::send_immediate_rows(rows))
1751 }
1752
1753 pub(super) fn sequence_show_variable(
1754 &self,
1755 session: &Session,
1756 plan: plan::ShowVariablePlan,
1757 ) -> Result<ExecuteResponse, AdapterError> {
1758 if &plan.name == SCHEMA_ALIAS {
1759 let schemas = self.catalog.resolve_search_path(session);
1760 let schema = schemas.first();
1761 return match schema {
1762 Some((database_spec, schema_spec)) => {
1763 let schema_name = &self
1764 .catalog
1765 .get_schema(database_spec, schema_spec, session.conn_id())
1766 .name()
1767 .schema;
1768 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1769 Ok(Self::send_immediate_rows(row))
1770 }
1771 None => {
1772 if session.vars().current_object_missing_warnings() {
1773 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1774 search_path: session
1775 .vars()
1776 .search_path()
1777 .into_iter()
1778 .map(|schema| schema.to_string())
1779 .collect(),
1780 });
1781 }
1782 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1783 }
1784 };
1785 }
1786
1787 let variable = session
1788 .vars()
1789 .get(self.catalog().system_config(), &plan.name)
1790 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1791
1792 variable.visible(session.user(), self.catalog().system_config())?;
1795
1796 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1797 if variable.name() == vars::DATABASE.name()
1798 && matches!(
1799 self.catalog().resolve_database(&variable.value()),
1800 Err(CatalogError::UnknownDatabase(_))
1801 )
1802 && session.vars().current_object_missing_warnings()
1803 {
1804 let name = variable.value();
1805 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1806 } else if variable.name() == vars::CLUSTER.name()
1807 && matches!(
1808 self.catalog().resolve_cluster(&variable.value()),
1809 Err(CatalogError::UnknownCluster(_))
1810 )
1811 && session.vars().current_object_missing_warnings()
1812 {
1813 let name = variable.value();
1814 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1815 }
1816 Ok(Self::send_immediate_rows(row))
1817 }
1818
1819 #[instrument]
1820 pub(super) async fn sequence_inspect_shard(
1821 &self,
1822 session: &Session,
1823 plan: plan::InspectShardPlan,
1824 ) -> Result<ExecuteResponse, AdapterError> {
1825 if !session.user().is_internal() {
1828 return Err(AdapterError::Unauthorized(
1829 rbac::UnauthorizedError::MzSystem {
1830 action: "inspect".into(),
1831 },
1832 ));
1833 }
1834 let state = self
1835 .controller
1836 .storage
1837 .inspect_persist_state(plan.id)
1838 .await?;
1839 let jsonb = Jsonb::from_serde_json(state)?;
1840 Ok(Self::send_immediate_rows(jsonb.into_row()))
1841 }
1842
1843 #[instrument]
1844 pub(super) fn sequence_set_variable(
1845 &self,
1846 session: &mut Session,
1847 plan: plan::SetVariablePlan,
1848 ) -> Result<ExecuteResponse, AdapterError> {
1849 let (name, local) = (plan.name, plan.local);
1850 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1851 self.validate_set_isolation_level(session)?;
1852 }
1853 if &name == vars::CLUSTER.name() {
1854 self.validate_set_cluster(session)?;
1855 }
1856
1857 let vars = session.vars_mut();
1858 let values = match plan.value {
1859 plan::VariableValue::Default => None,
1860 plan::VariableValue::Values(values) => Some(values),
1861 };
1862
1863 match values {
1864 Some(values) => {
1865 vars.set(
1866 self.catalog().system_config(),
1867 &name,
1868 VarInput::SqlSet(&values),
1869 local,
1870 )?;
1871
1872 let vars = session.vars();
1873
1874 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1877 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1878 } else if name == vars::CLUSTER.name()
1879 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1880 {
1881 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1882 }
1883
1884 if name.as_str() == vars::DATABASE.name()
1886 && matches!(
1887 self.catalog().resolve_database(vars.database()),
1888 Err(CatalogError::UnknownDatabase(_))
1889 )
1890 && session.vars().current_object_missing_warnings()
1891 {
1892 let name = vars.database().to_string();
1893 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1894 } else if name.as_str() == vars::CLUSTER.name()
1895 && matches!(
1896 self.catalog().resolve_cluster(vars.cluster()),
1897 Err(CatalogError::UnknownCluster(_))
1898 )
1899 && session.vars().current_object_missing_warnings()
1900 {
1901 let name = vars.cluster().to_string();
1902 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1903 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1904 let v = values.into_first().to_lowercase();
1905 if v == IsolationLevel::ReadUncommitted.as_str()
1906 || v == IsolationLevel::ReadCommitted.as_str()
1907 || v == IsolationLevel::RepeatableRead.as_str()
1908 {
1909 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1910 isolation_level: v,
1911 });
1912 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1913 session.add_notice(AdapterNotice::StrongSessionSerializable);
1914 }
1915 }
1916 }
1917 None => vars.reset(self.catalog().system_config(), &name, local)?,
1918 }
1919
1920 Ok(ExecuteResponse::SetVariable { name, reset: false })
1921 }
1922
1923 pub(super) fn sequence_reset_variable(
1924 &self,
1925 session: &mut Session,
1926 plan: plan::ResetVariablePlan,
1927 ) -> Result<ExecuteResponse, AdapterError> {
1928 let name = plan.name;
1929 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1930 self.validate_set_isolation_level(session)?;
1931 }
1932 if &name == vars::CLUSTER.name() {
1933 self.validate_set_cluster(session)?;
1934 }
1935 session
1936 .vars_mut()
1937 .reset(self.catalog().system_config(), &name, false)?;
1938 Ok(ExecuteResponse::SetVariable { name, reset: true })
1939 }
1940
1941 pub(super) fn sequence_set_transaction(
1942 &self,
1943 session: &mut Session,
1944 plan: plan::SetTransactionPlan,
1945 ) -> Result<ExecuteResponse, AdapterError> {
1946 for mode in plan.modes {
1948 match mode {
1949 TransactionMode::AccessMode(_) => {
1950 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1951 }
1952 TransactionMode::IsolationLevel(isolation_level) => {
1953 self.validate_set_isolation_level(session)?;
1954
1955 session.vars_mut().set(
1956 self.catalog().system_config(),
1957 TRANSACTION_ISOLATION_VAR_NAME,
1958 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1959 plan.local,
1960 )?
1961 }
1962 }
1963 }
1964 Ok(ExecuteResponse::SetVariable {
1965 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1966 reset: false,
1967 })
1968 }
1969
1970 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1971 if session.transaction().contains_ops() {
1972 Err(AdapterError::InvalidSetIsolationLevel)
1973 } else {
1974 Ok(())
1975 }
1976 }
1977
1978 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1979 if session.transaction().contains_ops() {
1980 Err(AdapterError::InvalidSetCluster)
1981 } else {
1982 Ok(())
1983 }
1984 }
1985
1986 #[instrument]
1987 pub(super) async fn sequence_end_transaction(
1988 &mut self,
1989 mut ctx: ExecuteContext,
1990 mut action: EndTransactionAction,
1991 ) {
1992 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
1994 (&action, ctx.session().transaction())
1995 {
1996 action = EndTransactionAction::Rollback;
1997 }
1998 let response = match action {
1999 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2000 params: BTreeMap::new(),
2001 }),
2002 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2003 params: BTreeMap::new(),
2004 }),
2005 };
2006
2007 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2008
2009 let (response, action) = match result {
2010 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2011 (response, action)
2012 }
2013 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2014 let validated_locks = match write_lock_guards {
2018 None => None,
2019 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2020 Ok(locks) => Some(locks),
2021 Err(missing) => {
2022 tracing::error!(?missing, "programming error, missing write locks");
2023 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2024 }
2025 },
2026 };
2027
2028 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2029 for WriteOp { id, rows } in writes {
2030 let total_rows = collected_writes.entry(id).or_default();
2031 total_rows.push(rows);
2032 }
2033
2034 self.submit_write(PendingWriteTxn::User {
2035 span: Span::current(),
2036 writes: collected_writes,
2037 write_locks: validated_locks,
2038 pending_txn: PendingTxn {
2039 ctx,
2040 response,
2041 action,
2042 },
2043 });
2044 return;
2045 }
2046 Ok((
2047 Some(TransactionOps::Peeks {
2048 determination,
2049 requires_linearization: RequireLinearization::Required,
2050 ..
2051 }),
2052 _,
2053 )) if ctx.session().vars().transaction_isolation()
2054 == &IsolationLevel::StrictSerializable =>
2055 {
2056 let conn_id = ctx.session().conn_id().clone();
2057 let pending_read_txn = PendingReadTxn {
2058 txn: PendingRead::Read {
2059 txn: PendingTxn {
2060 ctx,
2061 response,
2062 action,
2063 },
2064 },
2065 timestamp_context: determination.timestamp_context,
2066 created: Instant::now(),
2067 num_requeues: 0,
2068 otel_ctx: OpenTelemetryContext::obtain(),
2069 };
2070 self.strict_serializable_reads_tx
2071 .send((conn_id, pending_read_txn))
2072 .expect("sending to strict_serializable_reads_tx cannot fail");
2073 return;
2074 }
2075 Ok((
2076 Some(TransactionOps::Peeks {
2077 determination,
2078 requires_linearization: RequireLinearization::Required,
2079 ..
2080 }),
2081 _,
2082 )) if ctx.session().vars().transaction_isolation()
2083 == &IsolationLevel::StrongSessionSerializable =>
2084 {
2085 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2086 ctx.session_mut()
2087 .ensure_timestamp_oracle(timeline.clone())
2088 .apply_write(*ts);
2089 }
2090 (response, action)
2091 }
2092 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2093 self.internal_cmd_tx
2094 .send(Message::ExecuteSingleStatementTransaction {
2095 ctx,
2096 otel_ctx: OpenTelemetryContext::obtain(),
2097 stmt,
2098 params,
2099 })
2100 .expect("must send");
2101 return;
2102 }
2103 Ok((_, _)) => (response, action),
2104 Err(err) => (Err(err), EndTransactionAction::Rollback),
2105 };
2106 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2107 let response = response.map(|mut r| {
2109 r.extend_params(changed);
2110 ExecuteResponse::from(r)
2111 });
2112
2113 ctx.retire(response);
2114 }
2115
2116 #[instrument]
2117 async fn sequence_end_transaction_inner(
2118 &mut self,
2119 ctx: &mut ExecuteContext,
2120 action: EndTransactionAction,
2121 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2122 let txn = self.clear_transaction(ctx.session_mut()).await;
2123
2124 if let EndTransactionAction::Commit = action {
2125 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2126 match &mut ops {
2127 TransactionOps::Writes(writes) => {
2128 for WriteOp { id, .. } in &mut writes.iter() {
2129 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2131 AdapterError::Catalog(mz_catalog::memory::error::Error {
2132 kind: mz_catalog::memory::error::ErrorKind::Sql(
2133 CatalogError::UnknownItem(id.to_string()),
2134 ),
2135 })
2136 })?;
2137 }
2138
2139 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2141 }
2142 TransactionOps::DDL {
2143 ops,
2144 state: _,
2145 side_effects,
2146 revision,
2147 } => {
2148 if *revision != self.catalog().transient_revision() {
2150 return Err(AdapterError::DDLTransactionRace);
2151 }
2152 let ops = std::mem::take(ops);
2154 let side_effects = std::mem::take(side_effects);
2155 self.catalog_transact_with_side_effects(
2156 Some(ctx),
2157 ops,
2158 move |a, mut ctx| {
2159 Box::pin(async move {
2160 for side_effect in side_effects {
2161 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2162 }
2163 })
2164 },
2165 )
2166 .await?;
2167 }
2168 _ => (),
2169 }
2170 return Ok((Some(ops), write_lock_guards));
2171 }
2172 }
2173
2174 Ok((None, None))
2175 }
2176
2177 pub(super) async fn sequence_side_effecting_func(
2178 &mut self,
2179 ctx: ExecuteContext,
2180 plan: SideEffectingFunc,
2181 ) {
2182 match plan {
2183 SideEffectingFunc::PgCancelBackend { connection_id } => {
2184 if ctx.session().conn_id().unhandled() == connection_id {
2185 ctx.retire(Err(AdapterError::Canceled));
2189 return;
2190 }
2191
2192 let res = if let Some((id_handle, _conn_meta)) =
2193 self.active_conns.get_key_value(&connection_id)
2194 {
2195 self.handle_privileged_cancel(id_handle.clone()).await;
2197 Datum::True
2198 } else {
2199 Datum::False
2200 };
2201 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2202 }
2203 }
2204 }
2205
2206 pub(crate) async fn execute_side_effecting_func(
2215 &mut self,
2216 plan: SideEffectingFunc,
2217 conn_id: ConnectionId,
2218 current_role: RoleId,
2219 ) -> Result<ExecuteResponse, AdapterError> {
2220 match plan {
2221 SideEffectingFunc::PgCancelBackend { connection_id } => {
2222 if conn_id.unhandled() == connection_id {
2223 return Err(AdapterError::Canceled);
2227 }
2228
2229 if let Some((_id_handle, conn_meta)) =
2232 self.active_conns.get_key_value(&connection_id)
2233 {
2234 let target_role = *conn_meta.authenticated_role_id();
2235 let role_membership = self
2236 .catalog()
2237 .state()
2238 .collect_role_membership(¤t_role);
2239 if !role_membership.contains(&target_role) {
2240 let target_role_name = self
2241 .catalog()
2242 .try_get_role(&target_role)
2243 .map(|role| role.name().to_string())
2244 .unwrap_or_else(|| target_role.to_string());
2245 return Err(AdapterError::Unauthorized(
2246 rbac::UnauthorizedError::RoleMembership {
2247 role_names: vec![target_role_name],
2248 },
2249 ));
2250 }
2251
2252 let id_handle = self
2254 .active_conns
2255 .get_key_value(&connection_id)
2256 .map(|(id, _)| id.clone())
2257 .expect("checked above");
2258 self.handle_privileged_cancel(id_handle).await;
2259 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2260 } else {
2261 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2263 }
2264 }
2265 }
2266 }
2267
2268 pub(crate) async fn determine_real_time_recent_timestamp(
2272 &self,
2273 source_ids: impl Iterator<Item = GlobalId>,
2274 real_time_recency_timeout: Duration,
2275 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2276 {
2277 let item_ids = source_ids
2278 .map(|gid| {
2279 self.catalog
2280 .try_resolve_item_id(&gid)
2281 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2282 })
2283 .collect::<Result<Vec<_>, _>>()?;
2284
2285 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2291 if to_visit.is_empty() {
2294 return Ok(None);
2295 }
2296
2297 let mut timestamp_objects = BTreeSet::new();
2298
2299 while let Some(id) = to_visit.pop_front() {
2300 timestamp_objects.insert(id);
2301 to_visit.extend(
2302 self.catalog()
2303 .get_entry(&id)
2304 .uses()
2305 .into_iter()
2306 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2307 );
2308 }
2309 let timestamp_objects = timestamp_objects
2310 .into_iter()
2311 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2312 .collect();
2313
2314 let r = self
2315 .controller
2316 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2317 .await?;
2318
2319 Ok(Some(r))
2320 }
2321
2322 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2325 &self,
2326 session: &Session,
2327 source_ids: impl Iterator<Item = GlobalId>,
2328 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2329 {
2330 let vars = session.vars();
2331
2332 if vars.real_time_recency()
2333 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2334 && !session.contains_read_timestamp()
2335 {
2336 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2337 .await
2338 } else {
2339 Ok(None)
2340 }
2341 }
2342
2343 #[instrument]
2344 pub(super) async fn sequence_explain_plan(
2345 &mut self,
2346 ctx: ExecuteContext,
2347 plan: plan::ExplainPlanPlan,
2348 target_cluster: TargetCluster,
2349 ) {
2350 match &plan.explainee {
2351 plan::Explainee::Statement(stmt) => match stmt {
2352 plan::ExplaineeStatement::CreateView { .. } => {
2353 self.explain_create_view(ctx, plan).await;
2354 }
2355 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2356 self.explain_create_materialized_view(ctx, plan).await;
2357 }
2358 plan::ExplaineeStatement::CreateIndex { .. } => {
2359 self.explain_create_index(ctx, plan).await;
2360 }
2361 plan::ExplaineeStatement::Select { .. } => {
2362 self.explain_peek(ctx, plan, target_cluster).await;
2363 }
2364 },
2365 plan::Explainee::View(_) => {
2366 let result = self.explain_view(&ctx, plan);
2367 ctx.retire(result);
2368 }
2369 plan::Explainee::MaterializedView(_) => {
2370 let result = self.explain_materialized_view(&ctx, plan);
2371 ctx.retire(result);
2372 }
2373 plan::Explainee::Index(_) => {
2374 let result = self.explain_index(&ctx, plan);
2375 ctx.retire(result);
2376 }
2377 plan::Explainee::ReplanView(_) => {
2378 self.explain_replan_view(ctx, plan).await;
2379 }
2380 plan::Explainee::ReplanMaterializedView(_) => {
2381 self.explain_replan_materialized_view(ctx, plan).await;
2382 }
2383 plan::Explainee::ReplanIndex(_) => {
2384 self.explain_replan_index(ctx, plan).await;
2385 }
2386 };
2387 }
2388
2389 pub(super) async fn sequence_explain_pushdown(
2390 &mut self,
2391 ctx: ExecuteContext,
2392 plan: plan::ExplainPushdownPlan,
2393 target_cluster: TargetCluster,
2394 ) {
2395 match plan.explainee {
2396 Explainee::Statement(ExplaineeStatement::Select {
2397 broken: false,
2398 plan,
2399 desc: _,
2400 }) => {
2401 let stage = return_if_err!(
2402 self.peek_validate(
2403 ctx.session(),
2404 plan,
2405 target_cluster,
2406 None,
2407 ExplainContext::Pushdown,
2408 Some(ctx.session().vars().max_query_result_size()),
2409 ),
2410 ctx
2411 );
2412 self.sequence_staged(ctx, Span::current(), stage).await;
2413 }
2414 Explainee::MaterializedView(item_id) => {
2415 self.explain_pushdown_materialized_view(ctx, item_id).await;
2416 }
2417 _ => {
2418 ctx.retire(Err(AdapterError::Unsupported(
2419 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2420 )));
2421 }
2422 };
2423 }
2424
2425 async fn execute_explain_pushdown_with_read_holds(
2427 &self,
2428 ctx: ExecuteContext,
2429 as_of: Antichain<Timestamp>,
2430 mz_now: ResultSpec<'static>,
2431 read_holds: Option<ReadHolds<Timestamp>>,
2432 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2433 ) {
2434 let fut = self
2435 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2436 .await;
2437 task::spawn(|| "render explain pushdown", async move {
2438 let _read_holds = read_holds;
2440 let res = fut.await;
2441 ctx.retire(res);
2442 });
2443 }
2444
2445 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2447 &self,
2448 session: &Session,
2449 as_of: Antichain<Timestamp>,
2450 mz_now: ResultSpec<'static>,
2451 imports: I,
2452 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2453 super::explain_pushdown_future_inner(
2455 session,
2456 &self.catalog,
2457 &self.controller.storage_collections,
2458 as_of,
2459 mz_now,
2460 imports,
2461 )
2462 .await
2463 }
2464
2465 #[instrument]
2466 pub(super) async fn sequence_insert(
2467 &mut self,
2468 mut ctx: ExecuteContext,
2469 plan: plan::InsertPlan,
2470 ) {
2471 if !ctx.session_mut().transaction().allows_writes() {
2479 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2480 return;
2481 }
2482
2483 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2497 let expr = return_if_err!(
2500 plan.values
2501 .clone()
2502 .lower(self.catalog().system_config(), None),
2503 ctx
2504 );
2505 OptimizedMirRelationExpr(expr)
2506 } else {
2507 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2509
2510 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2512
2513 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2515 };
2516
2517 match optimized_mir.into_inner() {
2518 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2519 let catalog = self.owned_catalog();
2520 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2521 let result =
2522 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2523 ctx.retire(result);
2524 });
2525 }
2526 _ => {
2528 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2529 Some(table) => {
2530 let desc = table.relation_desc_latest().expect("table has a desc");
2532 desc.arity()
2533 }
2534 None => {
2535 ctx.retire(Err(AdapterError::Catalog(
2536 mz_catalog::memory::error::Error {
2537 kind: mz_catalog::memory::error::ErrorKind::Sql(
2538 CatalogError::UnknownItem(plan.id.to_string()),
2539 ),
2540 },
2541 )));
2542 return;
2543 }
2544 };
2545
2546 let finishing = RowSetFinishing {
2547 order_by: vec![],
2548 limit: None,
2549 offset: 0,
2550 project: (0..desc_arity).collect(),
2551 };
2552
2553 let read_then_write_plan = plan::ReadThenWritePlan {
2554 id: plan.id,
2555 selection: plan.values,
2556 finishing,
2557 assignments: BTreeMap::new(),
2558 kind: MutationKind::Insert,
2559 returning: plan.returning,
2560 };
2561
2562 self.sequence_read_then_write(ctx, read_then_write_plan)
2563 .await;
2564 }
2565 }
2566 }
2567
2568 #[instrument]
2573 pub(super) async fn sequence_read_then_write(
2574 &mut self,
2575 mut ctx: ExecuteContext,
2576 plan: plan::ReadThenWritePlan,
2577 ) {
2578 let mut source_ids: BTreeSet<_> = plan
2579 .selection
2580 .depends_on()
2581 .into_iter()
2582 .map(|gid| self.catalog().resolve_item_id(&gid))
2583 .collect();
2584 source_ids.insert(plan.id);
2585
2586 if ctx.session().transaction().write_locks().is_none() {
2588 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2590
2591 for id in &source_ids {
2593 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2594 write_locks.insert_lock(*id, lock);
2595 }
2596 }
2597
2598 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2600 Ok(locks) => locks,
2601 Err(missing) => {
2602 let role_metadata = ctx.session().role_metadata().clone();
2604 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2605 let plan = DeferredPlan {
2606 ctx,
2607 plan: Plan::ReadThenWrite(plan),
2608 validity: PlanValidity::new(
2609 self.catalog.transient_revision(),
2610 source_ids.clone(),
2611 None,
2612 None,
2613 role_metadata,
2614 ),
2615 requires_locks: source_ids,
2616 };
2617 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2618 }
2619 };
2620
2621 ctx.session_mut()
2622 .try_grant_write_locks(write_locks)
2623 .expect("session has already been granted write locks");
2624 }
2625
2626 let plan::ReadThenWritePlan {
2627 id,
2628 kind,
2629 selection,
2630 mut assignments,
2631 finishing,
2632 mut returning,
2633 } = plan;
2634
2635 let desc = match self.catalog().try_get_entry(&id) {
2637 Some(table) => {
2638 table
2640 .relation_desc_latest()
2641 .expect("table has a desc")
2642 .into_owned()
2643 }
2644 None => {
2645 ctx.retire(Err(AdapterError::Catalog(
2646 mz_catalog::memory::error::Error {
2647 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2648 id.to_string(),
2649 )),
2650 },
2651 )));
2652 return;
2653 }
2654 };
2655
2656 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2658 || assignments.values().any(|e| e.contains_temporal())
2659 || returning.iter().any(|e| e.contains_temporal());
2660 if contains_temporal {
2661 ctx.retire(Err(AdapterError::Unsupported(
2662 "calls to mz_now in write statements",
2663 )));
2664 return;
2665 }
2666
2667 fn validate_read_dependencies(
2675 catalog: &Catalog,
2676 id: &CatalogItemId,
2677 ) -> Result<(), AdapterError> {
2678 use CatalogItemType::*;
2679 use mz_catalog::memory::objects;
2680 let mut ids_to_check = Vec::new();
2681 let valid = match catalog.try_get_entry(id) {
2682 Some(entry) => {
2683 if let CatalogItem::View(objects::View { optimized_expr, .. })
2684 | CatalogItem::MaterializedView(objects::MaterializedView {
2685 optimized_expr,
2686 ..
2687 }) = entry.item()
2688 {
2689 if optimized_expr.contains_temporal() {
2690 return Err(AdapterError::Unsupported(
2691 "calls to mz_now in write statements",
2692 ));
2693 }
2694 }
2695 match entry.item().typ() {
2696 typ @ (Func | View | MaterializedView | ContinualTask) => {
2697 ids_to_check.extend(entry.uses());
2698 let valid_id = id.is_user() || matches!(typ, Func);
2699 valid_id
2700 }
2701 Source | Secret | Connection => false,
2702 Sink | Index => unreachable!(),
2704 Table => {
2705 if !id.is_user() {
2706 false
2708 } else {
2709 entry.source_export_details().is_none()
2711 }
2712 }
2713 Type => true,
2714 }
2715 }
2716 None => false,
2717 };
2718 if !valid {
2719 return Err(AdapterError::InvalidTableMutationSelection);
2720 }
2721 for id in ids_to_check {
2722 validate_read_dependencies(catalog, &id)?;
2723 }
2724 Ok(())
2725 }
2726
2727 for gid in selection.depends_on() {
2728 let item_id = self.catalog().resolve_item_id(&gid);
2729 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2730 ctx.retire(Err(err));
2731 return;
2732 }
2733 }
2734
2735 let (peek_tx, peek_rx) = oneshot::channel();
2736 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2737 let (tx, _, session, extra) = ctx.into_parts();
2738 let peek_ctx = ExecuteContext::from_parts(
2750 peek_client_tx,
2751 self.internal_cmd_tx.clone(),
2752 session,
2753 Default::default(),
2754 );
2755
2756 self.sequence_peek(
2757 peek_ctx,
2758 plan::SelectPlan {
2759 select: None,
2760 source: selection,
2761 when: QueryWhen::FreshestTableWrite,
2762 finishing,
2763 copy_to: None,
2764 },
2765 TargetCluster::Active,
2766 None,
2767 )
2768 .await;
2769
2770 let internal_cmd_tx = self.internal_cmd_tx.clone();
2771 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2772 let catalog = self.owned_catalog();
2773 let max_result_size = self.catalog().system_config().max_result_size();
2774
2775 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2776 let (peek_response, session) = match peek_rx.await {
2777 Ok(Response {
2778 result: Ok(resp),
2779 session,
2780 otel_ctx,
2781 }) => {
2782 otel_ctx.attach_as_parent();
2783 (resp, session)
2784 }
2785 Ok(Response {
2786 result: Err(e),
2787 session,
2788 otel_ctx,
2789 }) => {
2790 let ctx =
2791 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2792 otel_ctx.attach_as_parent();
2793 ctx.retire(Err(e));
2794 return;
2795 }
2796 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2798 };
2799 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2800 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2801
2802 if timeout_dur == Duration::ZERO {
2804 timeout_dur = Duration::MAX;
2805 }
2806
2807 let style = ExprPrepOneShot {
2808 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2810 catalog_state: catalog.state(),
2811 };
2812 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2813 return_if_err!(style.prep_scalar_expr(expr), ctx);
2814 }
2815
2816 let make_diffs =
2817 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2818 let arena = RowArena::new();
2819 let mut diffs = Vec::new();
2820 let mut datum_vec = mz_repr::DatumVec::new();
2821
2822 while let Some(row) = rows.next() {
2823 if !assignments.is_empty() {
2824 assert!(
2825 matches!(kind, MutationKind::Update),
2826 "only updates support assignments"
2827 );
2828 let mut datums = datum_vec.borrow_with(row);
2829 let mut updates = vec![];
2830 for (idx, expr) in &assignments {
2831 let updated = match expr.eval(&datums, &arena) {
2832 Ok(updated) => updated,
2833 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2834 };
2835 updates.push((*idx, updated));
2836 }
2837 for (idx, new_value) in updates {
2838 datums[idx] = new_value;
2839 }
2840 let updated = Row::pack_slice(&datums);
2841 diffs.push((updated, Diff::ONE));
2842 }
2843 match kind {
2844 MutationKind::Update | MutationKind::Delete => {
2848 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2849 }
2850 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2851 }
2852 }
2853
2854 let mut byte_size: u64 = 0;
2857 for (row, diff) in &diffs {
2858 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2859 if diff.is_positive() {
2860 for (idx, datum) in row.iter().enumerate() {
2861 desc.constraints_met(idx, &datum)?;
2862 }
2863 }
2864 }
2865 Ok((diffs, byte_size))
2866 };
2867
2868 let diffs = match peek_response {
2869 ExecuteResponse::SendingRowsStreaming {
2870 rows: mut rows_stream,
2871 ..
2872 } => {
2873 let mut byte_size: u64 = 0;
2874 let mut diffs = Vec::new();
2875 let result = loop {
2876 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2877 Ok(Some(res)) => match res {
2878 PeekResponseUnary::Rows(new_rows) => {
2879 match make_diffs(new_rows) {
2880 Ok((mut new_diffs, new_byte_size)) => {
2881 byte_size = byte_size.saturating_add(new_byte_size);
2882 if byte_size > max_result_size {
2883 break Err(AdapterError::ResultSize(format!(
2884 "result exceeds max size of {max_result_size}"
2885 )));
2886 }
2887 diffs.append(&mut new_diffs)
2888 }
2889 Err(e) => break Err(e),
2890 };
2891 }
2892 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2893 PeekResponseUnary::Error(e) => {
2894 break Err(AdapterError::Unstructured(anyhow!(e)));
2895 }
2896 },
2897 Ok(None) => break Ok(diffs),
2898 Err(_) => {
2899 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2904 conn_id: ctx.session().conn_id().clone(),
2905 });
2906 if let Err(e) = result {
2907 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2908 }
2909 break Err(AdapterError::StatementTimeout);
2910 }
2911 }
2912 };
2913
2914 result
2915 }
2916 ExecuteResponse::SendingRowsImmediate { rows } => {
2917 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2918 }
2919 resp => Err(AdapterError::Unstructured(anyhow!(
2920 "unexpected peek response: {resp:?}"
2921 ))),
2922 };
2923
2924 let mut returning_rows = Vec::new();
2925 let mut diff_err: Option<AdapterError> = None;
2926 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2927 let arena = RowArena::new();
2928 for (row, diff) in diffs {
2929 if !diff.is_positive() {
2930 continue;
2931 }
2932 let mut returning_row = Row::with_capacity(returning.len());
2933 let mut packer = returning_row.packer();
2934 for expr in &returning {
2935 let datums: Vec<_> = row.iter().collect();
2936 match expr.eval(&datums, &arena) {
2937 Ok(datum) => {
2938 packer.push(datum);
2939 }
2940 Err(err) => {
2941 diff_err = Some(err.into());
2942 break;
2943 }
2944 }
2945 }
2946 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2947 let diff = match NonZeroUsize::try_from(diff) {
2948 Ok(diff) => diff,
2949 Err(err) => {
2950 diff_err = Some(err.into());
2951 break;
2952 }
2953 };
2954 returning_rows.push((returning_row, diff));
2955 if diff_err.is_some() {
2956 break;
2957 }
2958 }
2959 }
2960 let diffs = if let Some(err) = diff_err {
2961 Err(err)
2962 } else {
2963 diffs
2964 };
2965
2966 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
2969 if let Some(timestamp_context) = timestamp_context {
2978 let (tx, rx) = tokio::sync::oneshot::channel();
2979 let conn_id = ctx.session().conn_id().clone();
2980 let pending_read_txn = PendingReadTxn {
2981 txn: PendingRead::ReadThenWrite { ctx, tx },
2982 timestamp_context,
2983 created: Instant::now(),
2984 num_requeues: 0,
2985 otel_ctx: OpenTelemetryContext::obtain(),
2986 };
2987 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
2988 if let Err(e) = result {
2990 warn!(
2991 "strict_serializable_reads_tx dropped before we could send: {:?}",
2992 e
2993 );
2994 return;
2995 }
2996 let result = rx.await;
2997 ctx = match result {
2999 Ok(Some(ctx)) => ctx,
3000 Ok(None) => {
3001 return;
3004 }
3005 Err(e) => {
3006 warn!(
3007 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3008 e
3009 );
3010 return;
3011 }
3012 };
3013 }
3014
3015 match diffs {
3016 Ok(diffs) => {
3017 let result = Self::send_diffs(
3018 ctx.session_mut(),
3019 plan::SendDiffsPlan {
3020 id,
3021 updates: diffs,
3022 kind,
3023 returning: returning_rows,
3024 max_result_size,
3025 },
3026 );
3027 ctx.retire(result);
3028 }
3029 Err(e) => {
3030 ctx.retire(Err(e));
3031 }
3032 }
3033 });
3034 }
3035
3036 #[instrument]
3037 pub(super) async fn sequence_alter_item_rename(
3038 &mut self,
3039 ctx: &mut ExecuteContext,
3040 plan: plan::AlterItemRenamePlan,
3041 ) -> Result<ExecuteResponse, AdapterError> {
3042 let op = catalog::Op::RenameItem {
3043 id: plan.id,
3044 current_full_name: plan.current_full_name,
3045 to_name: plan.to_name,
3046 };
3047 match self
3048 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3049 .await
3050 {
3051 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3052 Err(err) => Err(err),
3053 }
3054 }
3055
3056 #[instrument]
3057 pub(super) async fn sequence_alter_retain_history(
3058 &mut self,
3059 ctx: &mut ExecuteContext,
3060 plan: plan::AlterRetainHistoryPlan,
3061 ) -> Result<ExecuteResponse, AdapterError> {
3062 let ops = vec![catalog::Op::AlterRetainHistory {
3063 id: plan.id,
3064 value: plan.value,
3065 window: plan.window,
3066 }];
3067 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3068 Box::pin(async move {
3069 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3070 let cluster = match catalog_item {
3071 CatalogItem::Table(_)
3072 | CatalogItem::MaterializedView(_)
3073 | CatalogItem::Source(_)
3074 | CatalogItem::ContinualTask(_) => None,
3075 CatalogItem::Index(index) => Some(index.cluster_id),
3076 CatalogItem::Log(_)
3077 | CatalogItem::View(_)
3078 | CatalogItem::Sink(_)
3079 | CatalogItem::Type(_)
3080 | CatalogItem::Func(_)
3081 | CatalogItem::Secret(_)
3082 | CatalogItem::Connection(_) => unreachable!(),
3083 };
3084 match cluster {
3085 Some(cluster) => {
3086 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3087 }
3088 None => {
3089 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3090 }
3091 }
3092 })
3093 })
3094 .await?;
3095 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3096 }
3097
3098 #[instrument]
3099 pub(super) async fn sequence_alter_schema_rename(
3100 &mut self,
3101 ctx: &mut ExecuteContext,
3102 plan: plan::AlterSchemaRenamePlan,
3103 ) -> Result<ExecuteResponse, AdapterError> {
3104 let (database_spec, schema_spec) = plan.cur_schema_spec;
3105 let op = catalog::Op::RenameSchema {
3106 database_spec,
3107 schema_spec,
3108 new_name: plan.new_schema_name,
3109 check_reserved_names: true,
3110 };
3111 match self
3112 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3113 .await
3114 {
3115 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3116 Err(err) => Err(err),
3117 }
3118 }
3119
3120 #[instrument]
3121 pub(super) async fn sequence_alter_schema_swap(
3122 &mut self,
3123 ctx: &mut ExecuteContext,
3124 plan: plan::AlterSchemaSwapPlan,
3125 ) -> Result<ExecuteResponse, AdapterError> {
3126 let plan::AlterSchemaSwapPlan {
3127 schema_a_spec: (schema_a_db, schema_a),
3128 schema_a_name,
3129 schema_b_spec: (schema_b_db, schema_b),
3130 schema_b_name,
3131 name_temp,
3132 } = plan;
3133
3134 let op_a = catalog::Op::RenameSchema {
3135 database_spec: schema_a_db,
3136 schema_spec: schema_a,
3137 new_name: name_temp,
3138 check_reserved_names: false,
3139 };
3140 let op_b = catalog::Op::RenameSchema {
3141 database_spec: schema_b_db,
3142 schema_spec: schema_b,
3143 new_name: schema_a_name,
3144 check_reserved_names: false,
3145 };
3146 let op_c = catalog::Op::RenameSchema {
3147 database_spec: schema_a_db,
3148 schema_spec: schema_a,
3149 new_name: schema_b_name,
3150 check_reserved_names: false,
3151 };
3152
3153 match self
3154 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3155 Box::pin(async {})
3156 })
3157 .await
3158 {
3159 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3160 Err(err) => Err(err),
3161 }
3162 }
3163
3164 #[instrument]
3165 pub(super) async fn sequence_alter_role(
3166 &mut self,
3167 session: &Session,
3168 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3169 ) -> Result<ExecuteResponse, AdapterError> {
3170 let catalog = self.catalog().for_session(session);
3171 let role = catalog.get_role(&id);
3172
3173 let mut notices = vec![];
3175
3176 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3178 let mut vars = role.vars().clone();
3179
3180 let mut nopassword = false;
3183
3184 match option {
3186 PlannedAlterRoleOption::Attributes(attrs) => {
3187 self.validate_role_attributes(&attrs.clone().into())?;
3188
3189 if let Some(inherit) = attrs.inherit {
3190 attributes.inherit = inherit;
3191 }
3192
3193 if let Some(password) = attrs.password {
3194 attributes.password = Some(password);
3195 attributes.scram_iterations =
3196 Some(self.catalog().system_config().scram_iterations())
3197 }
3198
3199 if let Some(superuser) = attrs.superuser {
3200 attributes.superuser = Some(superuser);
3201 }
3202
3203 if let Some(login) = attrs.login {
3204 attributes.login = Some(login);
3205 }
3206
3207 if attrs.nopassword.unwrap_or(false) {
3208 nopassword = true;
3209 }
3210
3211 if let Some(notice) = self.should_emit_rbac_notice(session) {
3212 notices.push(notice);
3213 }
3214 }
3215 PlannedAlterRoleOption::Variable(variable) => {
3216 let session_var = session.vars().inspect(variable.name())?;
3218 session_var.visible(session.user(), catalog.system_vars())?;
3220
3221 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3224 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3225 } else if let PlannedRoleVariable::Set {
3226 name,
3227 value: VariableValue::Values(vals),
3228 } = &variable
3229 {
3230 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3231 notices.push(AdapterNotice::IntrospectionClusterUsage);
3232 }
3233 }
3234
3235 let var_name = match variable {
3236 PlannedRoleVariable::Set { name, value } => {
3237 match &value {
3239 VariableValue::Default => {
3240 vars.remove(&name);
3241 }
3242 VariableValue::Values(vals) => {
3243 let var = match &vals[..] {
3244 [val] => OwnedVarInput::Flat(val.clone()),
3245 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3246 };
3247 session_var.check(var.borrow())?;
3249
3250 vars.insert(name.clone(), var);
3251 }
3252 };
3253 name
3254 }
3255 PlannedRoleVariable::Reset { name } => {
3256 vars.remove(&name);
3258 name
3259 }
3260 };
3261
3262 notices.push(AdapterNotice::VarDefaultUpdated {
3264 role: Some(name.clone()),
3265 var_name: Some(var_name),
3266 });
3267 }
3268 }
3269
3270 let op = catalog::Op::AlterRole {
3271 id,
3272 name,
3273 attributes,
3274 nopassword,
3275 vars: RoleVars { map: vars },
3276 };
3277 let response = self
3278 .catalog_transact(Some(session), vec![op])
3279 .await
3280 .map(|_| ExecuteResponse::AlteredRole)?;
3281
3282 session.add_notices(notices);
3284
3285 Ok(response)
3286 }
3287
3288 #[instrument]
3289 pub(super) async fn sequence_alter_sink_prepare(
3290 &mut self,
3291 ctx: ExecuteContext,
3292 plan: plan::AlterSinkPlan,
3293 ) {
3294 let id_bundle = crate::CollectionIdBundle {
3296 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3297 compute_ids: BTreeMap::new(),
3298 };
3299 let read_hold = self.acquire_read_holds(&id_bundle);
3300
3301 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3302 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3303 return;
3304 };
3305
3306 let otel_ctx = OpenTelemetryContext::obtain();
3307 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3308
3309 let plan_validity = PlanValidity::new(
3310 self.catalog().transient_revision(),
3311 BTreeSet::from_iter([plan.item_id, from_item_id]),
3312 Some(plan.in_cluster),
3313 None,
3314 ctx.session().role_metadata().clone(),
3315 );
3316
3317 info!(
3318 "preparing alter sink for {}: frontiers={:?} export={:?}",
3319 plan.global_id,
3320 self.controller
3321 .storage_collections
3322 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3323 self.controller.storage.export(plan.global_id)
3324 );
3325
3326 self.install_storage_watch_set(
3332 ctx.session().conn_id().clone(),
3333 BTreeSet::from_iter([plan.global_id]),
3334 read_ts,
3335 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3336 ctx: Some(ctx),
3337 otel_ctx,
3338 plan,
3339 plan_validity,
3340 read_hold,
3341 }),
3342 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3343 }
3344
3345 #[instrument]
3346 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3347 ctx.otel_ctx.attach_as_parent();
3348
3349 let plan::AlterSinkPlan {
3350 item_id,
3351 global_id,
3352 sink: sink_plan,
3353 with_snapshot,
3354 in_cluster,
3355 } = ctx.plan.clone();
3356
3357 match ctx.plan_validity.check(self.catalog()) {
3366 Ok(()) => {}
3367 Err(err) => {
3368 ctx.retire(Err(err));
3369 return;
3370 }
3371 }
3372
3373 let entry = self.catalog().get_entry(&item_id);
3374 let CatalogItem::Sink(old_sink) = entry.item() else {
3375 panic!("invalid item kind for `AlterSinkPlan`");
3376 };
3377
3378 if sink_plan.version != old_sink.version + 1 {
3379 ctx.retire(Err(AdapterError::ChangedPlan(
3380 "sink was altered concurrently".into(),
3381 )));
3382 return;
3383 }
3384
3385 info!(
3386 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3387 self.controller
3388 .storage_collections
3389 .collections_frontiers(vec![global_id, sink_plan.from]),
3390 self.controller.storage.export(global_id),
3391 );
3392
3393 let write_frontier = &self
3396 .controller
3397 .storage
3398 .export(global_id)
3399 .expect("sink known to exist")
3400 .write_frontier;
3401 let as_of = ctx.read_hold.least_valid_read();
3402 assert!(
3403 write_frontier.iter().all(|t| as_of.less_than(t)),
3404 "{:?} should be strictly less than {:?}",
3405 &*as_of,
3406 &**write_frontier
3407 );
3408
3409 let create_sql = &old_sink.create_sql;
3415 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3416 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3417 unreachable!("invalid statement kind for sink");
3418 };
3419
3420 stmt.with_options
3422 .retain(|o| o.name != CreateSinkOptionName::Version);
3423 stmt.with_options.push(CreateSinkOption {
3424 name: CreateSinkOptionName::Version,
3425 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3426 sink_plan.version.to_string(),
3427 ))),
3428 });
3429
3430 let conn_catalog = self.catalog().for_system_session();
3431 let (mut stmt, resolved_ids) =
3432 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3433
3434 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3436 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3437 stmt.from = ResolvedItemName::Item {
3438 id: from_entry.id(),
3439 qualifiers: from_entry.name.qualifiers.clone(),
3440 full_name,
3441 print_id: true,
3442 version: from_entry.version,
3443 };
3444
3445 let new_sink = Sink {
3446 create_sql: stmt.to_ast_string_stable(),
3447 global_id,
3448 from: sink_plan.from,
3449 connection: sink_plan.connection.clone(),
3450 envelope: sink_plan.envelope,
3451 version: sink_plan.version,
3452 with_snapshot,
3453 resolved_ids: resolved_ids.clone(),
3454 cluster_id: in_cluster,
3455 commit_interval: sink_plan.commit_interval,
3456 };
3457
3458 let ops = vec![catalog::Op::UpdateItem {
3459 id: item_id,
3460 name: entry.name().clone(),
3461 to_item: CatalogItem::Sink(new_sink),
3462 }];
3463
3464 match self
3465 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3466 .await
3467 {
3468 Ok(()) => {}
3469 Err(err) => {
3470 ctx.retire(Err(err));
3471 return;
3472 }
3473 }
3474
3475 let storage_sink_desc = StorageSinkDesc {
3476 from: sink_plan.from,
3477 from_desc: from_entry
3478 .relation_desc()
3479 .expect("sinks can only be built on items with descs")
3480 .into_owned(),
3481 connection: sink_plan
3482 .connection
3483 .clone()
3484 .into_inline_connection(self.catalog().state()),
3485 envelope: sink_plan.envelope,
3486 as_of,
3487 with_snapshot,
3488 version: sink_plan.version,
3489 from_storage_metadata: (),
3490 to_storage_metadata: (),
3491 commit_interval: sink_plan.commit_interval,
3492 };
3493
3494 self.controller
3495 .storage
3496 .alter_export(
3497 global_id,
3498 ExportDescription {
3499 sink: storage_sink_desc,
3500 instance_id: in_cluster,
3501 },
3502 )
3503 .await
3504 .unwrap_or_terminate("cannot fail to alter source desc");
3505
3506 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3507 }
3508
3509 #[instrument]
3510 pub(super) async fn sequence_alter_connection(
3511 &mut self,
3512 ctx: ExecuteContext,
3513 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3514 ) {
3515 match action {
3516 AlterConnectionAction::RotateKeys => {
3517 self.sequence_rotate_keys(ctx, id).await;
3518 }
3519 AlterConnectionAction::AlterOptions {
3520 set_options,
3521 drop_options,
3522 validate,
3523 } => {
3524 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3525 .await
3526 }
3527 }
3528 }
3529
3530 #[instrument]
3531 async fn sequence_alter_connection_options(
3532 &mut self,
3533 mut ctx: ExecuteContext,
3534 id: CatalogItemId,
3535 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3536 drop_options: BTreeSet<ConnectionOptionName>,
3537 validate: bool,
3538 ) {
3539 let cur_entry = self.catalog().get_entry(&id);
3540 let cur_conn = cur_entry.connection().expect("known to be connection");
3541 let connection_gid = cur_conn.global_id();
3542
3543 let inner = || -> Result<Connection, AdapterError> {
3544 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3546 .expect("invalid create sql persisted to catalog")
3547 .into_element()
3548 .ast
3549 {
3550 Statement::CreateConnection(stmt) => stmt,
3551 _ => unreachable!("proved type is source"),
3552 };
3553
3554 let catalog = self.catalog().for_system_session();
3555
3556 let (mut create_conn_stmt, resolved_ids) =
3558 mz_sql::names::resolve(&catalog, create_conn_stmt)
3559 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3560
3561 create_conn_stmt
3563 .values
3564 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3565
3566 create_conn_stmt.values.extend(
3568 set_options
3569 .into_iter()
3570 .map(|(name, value)| ConnectionOption { name, value }),
3571 );
3572
3573 let mut catalog = self.catalog().for_system_session();
3576 catalog.mark_id_unresolvable_for_replanning(id);
3577
3578 let plan = match mz_sql::plan::plan(
3580 None,
3581 &catalog,
3582 Statement::CreateConnection(create_conn_stmt),
3583 &Params::empty(),
3584 &resolved_ids,
3585 )
3586 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3587 {
3588 Plan::CreateConnection(plan) => plan,
3589 _ => unreachable!("create source plan is only valid response"),
3590 };
3591
3592 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3594 .expect("invalid create sql persisted to catalog")
3595 .into_element()
3596 .ast
3597 {
3598 Statement::CreateConnection(stmt) => stmt,
3599 _ => unreachable!("proved type is source"),
3600 };
3601
3602 let catalog = self.catalog().for_system_session();
3603
3604 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3606 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3607
3608 Ok(Connection {
3609 create_sql: plan.connection.create_sql,
3610 global_id: cur_conn.global_id,
3611 details: plan.connection.details,
3612 resolved_ids: new_deps,
3613 })
3614 };
3615
3616 let conn = match inner() {
3617 Ok(conn) => conn,
3618 Err(e) => {
3619 return ctx.retire(Err(e));
3620 }
3621 };
3622
3623 if validate {
3624 let connection = conn
3625 .details
3626 .to_connection()
3627 .into_inline_connection(self.catalog().state());
3628
3629 let internal_cmd_tx = self.internal_cmd_tx.clone();
3630 let transient_revision = self.catalog().transient_revision();
3631 let conn_id = ctx.session().conn_id().clone();
3632 let otel_ctx = OpenTelemetryContext::obtain();
3633 let role_metadata = ctx.session().role_metadata().clone();
3634 let current_storage_parameters = self.controller.storage.config().clone();
3635
3636 task::spawn(
3637 || format!("validate_alter_connection:{conn_id}"),
3638 async move {
3639 let resolved_ids = conn.resolved_ids.clone();
3640 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3641 let result = match connection.validate(id, ¤t_storage_parameters).await {
3642 Ok(()) => Ok(conn),
3643 Err(err) => Err(err.into()),
3644 };
3645
3646 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3648 AlterConnectionValidationReady {
3649 ctx,
3650 result,
3651 connection_id: id,
3652 connection_gid,
3653 plan_validity: PlanValidity::new(
3654 transient_revision,
3655 dependency_ids.clone(),
3656 None,
3657 None,
3658 role_metadata,
3659 ),
3660 otel_ctx,
3661 resolved_ids,
3662 },
3663 ));
3664 if let Err(e) = result {
3665 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3666 }
3667 },
3668 );
3669 } else {
3670 let result = self
3671 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3672 .await;
3673 ctx.retire(result);
3674 }
3675 }
3676
3677 #[instrument]
3678 pub(crate) async fn sequence_alter_connection_stage_finish(
3679 &mut self,
3680 session: &Session,
3681 id: CatalogItemId,
3682 connection: Connection,
3683 ) -> Result<ExecuteResponse, AdapterError> {
3684 match self.catalog.get_entry(&id).item() {
3685 CatalogItem::Connection(curr_conn) => {
3686 curr_conn
3687 .details
3688 .to_connection()
3689 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3690 .map_err(StorageError::from)?;
3691 }
3692 _ => unreachable!("known to be a connection"),
3693 };
3694
3695 let ops = vec![catalog::Op::UpdateItem {
3696 id,
3697 name: self.catalog.get_entry(&id).name().clone(),
3698 to_item: CatalogItem::Connection(connection.clone()),
3699 }];
3700
3701 self.catalog_transact(Some(session), ops).await?;
3702
3703 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3710 }
3711
3712 #[instrument]
3713 pub(super) async fn sequence_alter_source(
3714 &mut self,
3715 session: &Session,
3716 plan::AlterSourcePlan {
3717 item_id,
3718 ingestion_id,
3719 action,
3720 }: plan::AlterSourcePlan,
3721 ) -> Result<ExecuteResponse, AdapterError> {
3722 let cur_entry = self.catalog().get_entry(&item_id);
3723 let cur_source = cur_entry.source().expect("known to be source");
3724
3725 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3726 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3728 .expect("invalid create sql persisted to catalog")
3729 .into_element()
3730 .ast
3731 {
3732 Statement::CreateSource(stmt) => stmt,
3733 _ => unreachable!("proved type is source"),
3734 };
3735
3736 let catalog = coord.catalog().for_system_session();
3737
3738 mz_sql::names::resolve(&catalog, create_source_stmt)
3740 .map_err(|e| AdapterError::internal(err_cx, e))
3741 };
3742
3743 match action {
3744 plan::AlterSourceAction::AddSubsourceExports {
3745 subsources,
3746 options,
3747 } => {
3748 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3749
3750 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3751 text_columns: mut new_text_columns,
3752 exclude_columns: mut new_exclude_columns,
3753 ..
3754 } = options.try_into()?;
3755
3756 let (mut create_source_stmt, resolved_ids) =
3758 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3759
3760 let catalog = self.catalog();
3762 let curr_references: BTreeSet<_> = catalog
3763 .get_entry(&item_id)
3764 .used_by()
3765 .into_iter()
3766 .filter_map(|subsource| {
3767 catalog
3768 .get_entry(subsource)
3769 .subsource_details()
3770 .map(|(_id, reference, _details)| reference)
3771 })
3772 .collect();
3773
3774 let purification_err =
3777 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3778
3779 match &mut create_source_stmt.connection {
3783 CreateSourceConnection::Postgres {
3784 options: curr_options,
3785 ..
3786 } => {
3787 let mz_sql::plan::PgConfigOptionExtracted {
3788 mut text_columns, ..
3789 } = curr_options.clone().try_into()?;
3790
3791 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3794
3795 text_columns.retain(|column_qualified_reference| {
3797 mz_ore::soft_assert_eq_or_log!(
3798 column_qualified_reference.0.len(),
3799 4,
3800 "all TEXT COLUMNS values must be column-qualified references"
3801 );
3802 let mut table = column_qualified_reference.clone();
3803 table.0.truncate(3);
3804 curr_references.contains(&table)
3805 });
3806
3807 new_text_columns.extend(text_columns);
3809
3810 if !new_text_columns.is_empty() {
3812 new_text_columns.sort();
3813 let new_text_columns = new_text_columns
3814 .into_iter()
3815 .map(WithOptionValue::UnresolvedItemName)
3816 .collect();
3817
3818 curr_options.push(PgConfigOption {
3819 name: PgConfigOptionName::TextColumns,
3820 value: Some(WithOptionValue::Sequence(new_text_columns)),
3821 });
3822 }
3823 }
3824 CreateSourceConnection::MySql {
3825 options: curr_options,
3826 ..
3827 } => {
3828 let mz_sql::plan::MySqlConfigOptionExtracted {
3829 mut text_columns,
3830 mut exclude_columns,
3831 ..
3832 } = curr_options.clone().try_into()?;
3833
3834 curr_options.retain(|o| {
3837 !matches!(
3838 o.name,
3839 MySqlConfigOptionName::TextColumns
3840 | MySqlConfigOptionName::ExcludeColumns
3841 )
3842 });
3843
3844 let column_referenced =
3846 |column_qualified_reference: &UnresolvedItemName| {
3847 mz_ore::soft_assert_eq_or_log!(
3848 column_qualified_reference.0.len(),
3849 3,
3850 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3851 );
3852 let mut table = column_qualified_reference.clone();
3853 table.0.truncate(2);
3854 curr_references.contains(&table)
3855 };
3856 text_columns.retain(column_referenced);
3857 exclude_columns.retain(column_referenced);
3858
3859 new_text_columns.extend(text_columns);
3861 new_exclude_columns.extend(exclude_columns);
3862
3863 if !new_text_columns.is_empty() {
3865 new_text_columns.sort();
3866 let new_text_columns = new_text_columns
3867 .into_iter()
3868 .map(WithOptionValue::UnresolvedItemName)
3869 .collect();
3870
3871 curr_options.push(MySqlConfigOption {
3872 name: MySqlConfigOptionName::TextColumns,
3873 value: Some(WithOptionValue::Sequence(new_text_columns)),
3874 });
3875 }
3876 if !new_exclude_columns.is_empty() {
3878 new_exclude_columns.sort();
3879 let new_exclude_columns = new_exclude_columns
3880 .into_iter()
3881 .map(WithOptionValue::UnresolvedItemName)
3882 .collect();
3883
3884 curr_options.push(MySqlConfigOption {
3885 name: MySqlConfigOptionName::ExcludeColumns,
3886 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3887 });
3888 }
3889 }
3890 CreateSourceConnection::SqlServer {
3891 options: curr_options,
3892 ..
3893 } => {
3894 let mz_sql::plan::SqlServerConfigOptionExtracted {
3895 mut text_columns,
3896 mut exclude_columns,
3897 ..
3898 } = curr_options.clone().try_into()?;
3899
3900 curr_options.retain(|o| {
3903 !matches!(
3904 o.name,
3905 SqlServerConfigOptionName::TextColumns
3906 | SqlServerConfigOptionName::ExcludeColumns
3907 )
3908 });
3909
3910 let column_referenced =
3912 |column_qualified_reference: &UnresolvedItemName| {
3913 mz_ore::soft_assert_eq_or_log!(
3914 column_qualified_reference.0.len(),
3915 3,
3916 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3917 );
3918 let mut table = column_qualified_reference.clone();
3919 table.0.truncate(2);
3920 curr_references.contains(&table)
3921 };
3922 text_columns.retain(column_referenced);
3923 exclude_columns.retain(column_referenced);
3924
3925 new_text_columns.extend(text_columns);
3927 new_exclude_columns.extend(exclude_columns);
3928
3929 if !new_text_columns.is_empty() {
3931 new_text_columns.sort();
3932 let new_text_columns = new_text_columns
3933 .into_iter()
3934 .map(WithOptionValue::UnresolvedItemName)
3935 .collect();
3936
3937 curr_options.push(SqlServerConfigOption {
3938 name: SqlServerConfigOptionName::TextColumns,
3939 value: Some(WithOptionValue::Sequence(new_text_columns)),
3940 });
3941 }
3942 if !new_exclude_columns.is_empty() {
3944 new_exclude_columns.sort();
3945 let new_exclude_columns = new_exclude_columns
3946 .into_iter()
3947 .map(WithOptionValue::UnresolvedItemName)
3948 .collect();
3949
3950 curr_options.push(SqlServerConfigOption {
3951 name: SqlServerConfigOptionName::ExcludeColumns,
3952 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3953 });
3954 }
3955 }
3956 _ => return Err(purification_err()),
3957 };
3958
3959 let mut catalog = self.catalog().for_system_session();
3960 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3961
3962 let plan = match mz_sql::plan::plan(
3964 None,
3965 &catalog,
3966 Statement::CreateSource(create_source_stmt),
3967 &Params::empty(),
3968 &resolved_ids,
3969 )
3970 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
3971 {
3972 Plan::CreateSource(plan) => plan,
3973 _ => unreachable!("create source plan is only valid response"),
3974 };
3975
3976 let source = Source::new(
3980 plan,
3981 cur_source.global_id,
3982 resolved_ids,
3983 cur_source.custom_logical_compaction_window,
3984 cur_source.is_retained_metrics_object,
3985 );
3986
3987 let desc = match &source.data_source {
3989 DataSourceDesc::Ingestion { desc, .. }
3990 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
3991 desc.clone().into_inline_connection(self.catalog().state())
3992 }
3993 _ => unreachable!("already verified of type ingestion"),
3994 };
3995
3996 self.controller
3997 .storage
3998 .check_alter_ingestion_source_desc(ingestion_id, &desc)
3999 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4000
4001 let mut ops = vec![catalog::Op::UpdateItem {
4004 id: item_id,
4005 name: self.catalog.get_entry(&item_id).name().clone(),
4008 to_item: CatalogItem::Source(source),
4009 }];
4010
4011 let CreateSourceInner {
4012 ops: new_ops,
4013 sources: _,
4014 if_not_exists_ids,
4015 } = self.create_source_inner(session, subsources).await?;
4016
4017 ops.extend(new_ops.into_iter());
4018
4019 assert!(
4020 if_not_exists_ids.is_empty(),
4021 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4022 );
4023
4024 self.catalog_transact(Some(session), ops).await?;
4025 }
4026 plan::AlterSourceAction::RefreshReferences { references } => {
4027 self.catalog_transact(
4028 Some(session),
4029 vec![catalog::Op::UpdateSourceReferences {
4030 source_id: item_id,
4031 references: references.into(),
4032 }],
4033 )
4034 .await?;
4035 }
4036 }
4037
4038 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4039 }
4040
4041 #[instrument]
4042 pub(super) async fn sequence_alter_system_set(
4043 &mut self,
4044 session: &Session,
4045 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4046 ) -> Result<ExecuteResponse, AdapterError> {
4047 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4048 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4050 self.validate_alter_system_network_policy(session, &value)?;
4051 }
4052
4053 let op = match value {
4054 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4055 name: name.clone(),
4056 value: OwnedVarInput::SqlSet(values),
4057 },
4058 plan::VariableValue::Default => {
4059 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4060 }
4061 };
4062 self.catalog_transact(Some(session), vec![op]).await?;
4063
4064 session.add_notice(AdapterNotice::VarDefaultUpdated {
4065 role: None,
4066 var_name: Some(name),
4067 });
4068 Ok(ExecuteResponse::AlteredSystemConfiguration)
4069 }
4070
4071 #[instrument]
4072 pub(super) async fn sequence_alter_system_reset(
4073 &mut self,
4074 session: &Session,
4075 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4076 ) -> Result<ExecuteResponse, AdapterError> {
4077 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4078 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4079 self.catalog_transact(Some(session), vec![op]).await?;
4080 session.add_notice(AdapterNotice::VarDefaultUpdated {
4081 role: None,
4082 var_name: Some(name),
4083 });
4084 Ok(ExecuteResponse::AlteredSystemConfiguration)
4085 }
4086
4087 #[instrument]
4088 pub(super) async fn sequence_alter_system_reset_all(
4089 &mut self,
4090 session: &Session,
4091 _: plan::AlterSystemResetAllPlan,
4092 ) -> Result<ExecuteResponse, AdapterError> {
4093 self.is_user_allowed_to_alter_system(session, None)?;
4094 let op = catalog::Op::ResetAllSystemConfiguration;
4095 self.catalog_transact(Some(session), vec![op]).await?;
4096 session.add_notice(AdapterNotice::VarDefaultUpdated {
4097 role: None,
4098 var_name: None,
4099 });
4100 Ok(ExecuteResponse::AlteredSystemConfiguration)
4101 }
4102
4103 fn is_user_allowed_to_alter_system(
4105 &self,
4106 session: &Session,
4107 var_name: Option<&str>,
4108 ) -> Result<(), AdapterError> {
4109 match (session.user().kind(), var_name) {
4110 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4112 (UserKind::Superuser, Some(name))
4114 if session.user().is_internal()
4115 || self.catalog().system_config().user_modifiable(name) =>
4116 {
4117 let var = self.catalog().system_config().get(name)?;
4120 var.visible(session.user(), self.catalog().system_config())?;
4121 Ok(())
4122 }
4123 (UserKind::Regular, Some(name))
4126 if self.catalog().system_config().user_modifiable(name) =>
4127 {
4128 Err(AdapterError::Unauthorized(
4129 rbac::UnauthorizedError::Superuser {
4130 action: format!("toggle the '{name}' system configuration parameter"),
4131 },
4132 ))
4133 }
4134 _ => Err(AdapterError::Unauthorized(
4135 rbac::UnauthorizedError::MzSystem {
4136 action: "alter system".into(),
4137 },
4138 )),
4139 }
4140 }
4141
4142 fn validate_alter_system_network_policy(
4143 &self,
4144 session: &Session,
4145 policy_value: &plan::VariableValue,
4146 ) -> Result<(), AdapterError> {
4147 let policy_name = match &policy_value {
4148 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4150 plan::VariableValue::Values(values) if values.len() == 1 => {
4151 values.iter().next().cloned()
4152 }
4153 plan::VariableValue::Values(values) => {
4154 tracing::warn!(?values, "can't set multiple network policies at once");
4155 None
4156 }
4157 };
4158 let maybe_network_policy = policy_name
4159 .as_ref()
4160 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4161 let Some(network_policy) = maybe_network_policy else {
4162 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4163 VarError::InvalidParameterValue {
4164 name: NETWORK_POLICY.name(),
4165 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4166 reason: "no network policy with such name exists".to_string(),
4167 },
4168 )));
4169 };
4170 self.validate_alter_network_policy(session, &network_policy.rules)
4171 }
4172
4173 fn validate_alter_network_policy(
4178 &self,
4179 session: &Session,
4180 policy_rules: &Vec<NetworkPolicyRule>,
4181 ) -> Result<(), AdapterError> {
4182 if session.user().is_internal() {
4185 return Ok(());
4186 }
4187 if let Some(ip) = session.meta().client_ip() {
4188 validate_ip_with_policy_rules(ip, policy_rules)
4189 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4190 } else {
4191 return Err(AdapterError::NetworkPolicyDenied(
4194 NetworkPolicyError::MissingIp,
4195 ));
4196 }
4197 Ok(())
4198 }
4199
4200 #[instrument]
4202 pub(super) fn sequence_execute(
4203 &self,
4204 session: &mut Session,
4205 plan: plan::ExecutePlan,
4206 ) -> Result<String, AdapterError> {
4207 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4209 let ps = session
4210 .get_prepared_statement_unverified(&plan.name)
4211 .expect("known to exist");
4212 let stmt = ps.stmt().cloned();
4213 let desc = ps.desc().clone();
4214 let state_revision = ps.state_revision;
4215 let logging = Arc::clone(ps.logging());
4216 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4217 }
4218
4219 #[instrument]
4220 pub(super) async fn sequence_grant_privileges(
4221 &mut self,
4222 session: &Session,
4223 plan::GrantPrivilegesPlan {
4224 update_privileges,
4225 grantees,
4226 }: plan::GrantPrivilegesPlan,
4227 ) -> Result<ExecuteResponse, AdapterError> {
4228 self.sequence_update_privileges(
4229 session,
4230 update_privileges,
4231 grantees,
4232 UpdatePrivilegeVariant::Grant,
4233 )
4234 .await
4235 }
4236
4237 #[instrument]
4238 pub(super) async fn sequence_revoke_privileges(
4239 &mut self,
4240 session: &Session,
4241 plan::RevokePrivilegesPlan {
4242 update_privileges,
4243 revokees,
4244 }: plan::RevokePrivilegesPlan,
4245 ) -> Result<ExecuteResponse, AdapterError> {
4246 self.sequence_update_privileges(
4247 session,
4248 update_privileges,
4249 revokees,
4250 UpdatePrivilegeVariant::Revoke,
4251 )
4252 .await
4253 }
4254
4255 #[instrument]
4256 async fn sequence_update_privileges(
4257 &mut self,
4258 session: &Session,
4259 update_privileges: Vec<UpdatePrivilege>,
4260 grantees: Vec<RoleId>,
4261 variant: UpdatePrivilegeVariant,
4262 ) -> Result<ExecuteResponse, AdapterError> {
4263 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4264 let mut warnings = Vec::new();
4265 let catalog = self.catalog().for_session(session);
4266
4267 for UpdatePrivilege {
4268 acl_mode,
4269 target_id,
4270 grantor,
4271 } in update_privileges
4272 {
4273 let actual_object_type = catalog.get_system_object_type(&target_id);
4274 if actual_object_type.is_relation() {
4277 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4278 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4279 if !non_applicable_privileges.is_empty() {
4280 let object_description =
4281 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4282 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4283 non_applicable_privileges,
4284 object_description,
4285 })
4286 }
4287 }
4288
4289 if let SystemObjectId::Object(object_id) = &target_id {
4290 self.catalog()
4291 .ensure_not_reserved_object(object_id, session.conn_id())?;
4292 }
4293
4294 let privileges = self
4295 .catalog()
4296 .get_privileges(&target_id, session.conn_id())
4297 .ok_or(AdapterError::Unsupported(
4300 "GRANTs/REVOKEs on an object type with no privileges",
4301 ))?;
4302
4303 for grantee in &grantees {
4304 self.catalog().ensure_not_system_role(grantee)?;
4305 self.catalog().ensure_not_predefined_role(grantee)?;
4306 let existing_privilege = privileges
4307 .get_acl_item(grantee, &grantor)
4308 .map(Cow::Borrowed)
4309 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4310
4311 match variant {
4312 UpdatePrivilegeVariant::Grant
4313 if !existing_privilege.acl_mode.contains(acl_mode) =>
4314 {
4315 ops.push(catalog::Op::UpdatePrivilege {
4316 target_id: target_id.clone(),
4317 privilege: MzAclItem {
4318 grantee: *grantee,
4319 grantor,
4320 acl_mode,
4321 },
4322 variant,
4323 });
4324 }
4325 UpdatePrivilegeVariant::Revoke
4326 if !existing_privilege
4327 .acl_mode
4328 .intersection(acl_mode)
4329 .is_empty() =>
4330 {
4331 ops.push(catalog::Op::UpdatePrivilege {
4332 target_id: target_id.clone(),
4333 privilege: MzAclItem {
4334 grantee: *grantee,
4335 grantor,
4336 acl_mode,
4337 },
4338 variant,
4339 });
4340 }
4341 _ => {}
4343 }
4344 }
4345 }
4346
4347 if ops.is_empty() {
4348 session.add_notices(warnings);
4349 return Ok(variant.into());
4350 }
4351
4352 let res = self
4353 .catalog_transact(Some(session), ops)
4354 .await
4355 .map(|_| match variant {
4356 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4357 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4358 });
4359 if res.is_ok() {
4360 session.add_notices(warnings);
4361 }
4362 res
4363 }
4364
4365 #[instrument]
4366 pub(super) async fn sequence_alter_default_privileges(
4367 &mut self,
4368 session: &Session,
4369 plan::AlterDefaultPrivilegesPlan {
4370 privilege_objects,
4371 privilege_acl_items,
4372 is_grant,
4373 }: plan::AlterDefaultPrivilegesPlan,
4374 ) -> Result<ExecuteResponse, AdapterError> {
4375 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4376 let variant = if is_grant {
4377 UpdatePrivilegeVariant::Grant
4378 } else {
4379 UpdatePrivilegeVariant::Revoke
4380 };
4381 for privilege_object in &privilege_objects {
4382 self.catalog()
4383 .ensure_not_system_role(&privilege_object.role_id)?;
4384 self.catalog()
4385 .ensure_not_predefined_role(&privilege_object.role_id)?;
4386 if let Some(database_id) = privilege_object.database_id {
4387 self.catalog()
4388 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4389 }
4390 if let Some(schema_id) = privilege_object.schema_id {
4391 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4392 let schema_spec: SchemaSpecifier = schema_id.into();
4393
4394 self.catalog().ensure_not_reserved_object(
4395 &(database_spec, schema_spec).into(),
4396 session.conn_id(),
4397 )?;
4398 }
4399 for privilege_acl_item in &privilege_acl_items {
4400 self.catalog()
4401 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4402 self.catalog()
4403 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4404 ops.push(catalog::Op::UpdateDefaultPrivilege {
4405 privilege_object: privilege_object.clone(),
4406 privilege_acl_item: privilege_acl_item.clone(),
4407 variant,
4408 })
4409 }
4410 }
4411
4412 self.catalog_transact(Some(session), ops).await?;
4413 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4414 }
4415
4416 #[instrument]
4417 pub(super) async fn sequence_grant_role(
4418 &mut self,
4419 session: &Session,
4420 plan::GrantRolePlan {
4421 role_ids,
4422 member_ids,
4423 grantor_id,
4424 }: plan::GrantRolePlan,
4425 ) -> Result<ExecuteResponse, AdapterError> {
4426 let catalog = self.catalog();
4427 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4428 for role_id in role_ids {
4429 for member_id in &member_ids {
4430 let member_membership: BTreeSet<_> =
4431 catalog.get_role(member_id).membership().keys().collect();
4432 if member_membership.contains(&role_id) {
4433 let role_name = catalog.get_role(&role_id).name().to_string();
4434 let member_name = catalog.get_role(member_id).name().to_string();
4435 catalog.ensure_not_reserved_role(member_id)?;
4437 catalog.ensure_grantable_role(&role_id)?;
4438 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4439 role_name,
4440 member_name,
4441 });
4442 } else {
4443 ops.push(catalog::Op::GrantRole {
4444 role_id,
4445 member_id: *member_id,
4446 grantor_id,
4447 });
4448 }
4449 }
4450 }
4451
4452 if ops.is_empty() {
4453 return Ok(ExecuteResponse::GrantedRole);
4454 }
4455
4456 self.catalog_transact(Some(session), ops)
4457 .await
4458 .map(|_| ExecuteResponse::GrantedRole)
4459 }
4460
4461 #[instrument]
4462 pub(super) async fn sequence_revoke_role(
4463 &mut self,
4464 session: &Session,
4465 plan::RevokeRolePlan {
4466 role_ids,
4467 member_ids,
4468 grantor_id,
4469 }: plan::RevokeRolePlan,
4470 ) -> Result<ExecuteResponse, AdapterError> {
4471 let catalog = self.catalog();
4472 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4473 for role_id in role_ids {
4474 for member_id in &member_ids {
4475 let member_membership: BTreeSet<_> =
4476 catalog.get_role(member_id).membership().keys().collect();
4477 if !member_membership.contains(&role_id) {
4478 let role_name = catalog.get_role(&role_id).name().to_string();
4479 let member_name = catalog.get_role(member_id).name().to_string();
4480 catalog.ensure_not_reserved_role(member_id)?;
4482 catalog.ensure_grantable_role(&role_id)?;
4483 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4484 role_name,
4485 member_name,
4486 });
4487 } else {
4488 ops.push(catalog::Op::RevokeRole {
4489 role_id,
4490 member_id: *member_id,
4491 grantor_id,
4492 });
4493 }
4494 }
4495 }
4496
4497 if ops.is_empty() {
4498 return Ok(ExecuteResponse::RevokedRole);
4499 }
4500
4501 self.catalog_transact(Some(session), ops)
4502 .await
4503 .map(|_| ExecuteResponse::RevokedRole)
4504 }
4505
4506 #[instrument]
4507 pub(super) async fn sequence_alter_owner(
4508 &mut self,
4509 session: &Session,
4510 plan::AlterOwnerPlan {
4511 id,
4512 object_type,
4513 new_owner,
4514 }: plan::AlterOwnerPlan,
4515 ) -> Result<ExecuteResponse, AdapterError> {
4516 let mut ops = vec![catalog::Op::UpdateOwner {
4517 id: id.clone(),
4518 new_owner,
4519 }];
4520
4521 match &id {
4522 ObjectId::Item(global_id) => {
4523 let entry = self.catalog().get_entry(global_id);
4524
4525 if entry.is_index() {
4527 let name = self
4528 .catalog()
4529 .resolve_full_name(entry.name(), Some(session.conn_id()))
4530 .to_string();
4531 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4532 return Ok(ExecuteResponse::AlteredObject(object_type));
4533 }
4534
4535 let dependent_index_ops = entry
4537 .used_by()
4538 .into_iter()
4539 .filter(|id| self.catalog().get_entry(id).is_index())
4540 .map(|id| catalog::Op::UpdateOwner {
4541 id: ObjectId::Item(*id),
4542 new_owner,
4543 });
4544 ops.extend(dependent_index_ops);
4545
4546 let dependent_subsources =
4548 entry
4549 .progress_id()
4550 .into_iter()
4551 .map(|item_id| catalog::Op::UpdateOwner {
4552 id: ObjectId::Item(item_id),
4553 new_owner,
4554 });
4555 ops.extend(dependent_subsources);
4556 }
4557 ObjectId::Cluster(cluster_id) => {
4558 let cluster = self.catalog().get_cluster(*cluster_id);
4559 let managed_cluster_replica_ops =
4561 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4562 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4563 new_owner,
4564 });
4565 ops.extend(managed_cluster_replica_ops);
4566 }
4567 _ => {}
4568 }
4569
4570 self.catalog_transact(Some(session), ops)
4571 .await
4572 .map(|_| ExecuteResponse::AlteredObject(object_type))
4573 }
4574
4575 #[instrument]
4576 pub(super) async fn sequence_reassign_owned(
4577 &mut self,
4578 session: &Session,
4579 plan::ReassignOwnedPlan {
4580 old_roles,
4581 new_role,
4582 reassign_ids,
4583 }: plan::ReassignOwnedPlan,
4584 ) -> Result<ExecuteResponse, AdapterError> {
4585 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4586 self.catalog().ensure_not_reserved_role(role_id)?;
4587 }
4588
4589 let ops = reassign_ids
4590 .into_iter()
4591 .map(|id| catalog::Op::UpdateOwner {
4592 id,
4593 new_owner: new_role,
4594 })
4595 .collect();
4596
4597 self.catalog_transact(Some(session), ops)
4598 .await
4599 .map(|_| ExecuteResponse::ReassignOwned)
4600 }
4601
4602 #[instrument]
4603 pub(crate) async fn handle_deferred_statement(&mut self) {
4604 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4608 return;
4609 };
4610 match ps {
4611 crate::coord::PlanStatement::Statement { stmt, params } => {
4612 self.handle_execute_inner(stmt, params, ctx).await;
4613 }
4614 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4615 self.sequence_plan(ctx, plan, resolved_ids).await;
4616 }
4617 }
4618 }
4619
4620 #[instrument]
4621 #[allow(clippy::unused_async)]
4623 pub(super) async fn sequence_alter_table(
4624 &mut self,
4625 ctx: &mut ExecuteContext,
4626 plan: plan::AlterTablePlan,
4627 ) -> Result<ExecuteResponse, AdapterError> {
4628 let plan::AlterTablePlan {
4629 relation_id,
4630 column_name,
4631 column_type,
4632 raw_sql_type,
4633 } = plan;
4634
4635 let id_ts = self.get_catalog_write_ts().await;
4637 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4638 let ops = vec![catalog::Op::AlterAddColumn {
4639 id: relation_id,
4640 new_global_id,
4641 name: column_name,
4642 typ: column_type,
4643 sql: raw_sql_type,
4644 }];
4645
4646 self.catalog_transact_with_context(None, Some(ctx), ops)
4647 .await?;
4648
4649 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4650 }
4651
4652 #[instrument]
4653 pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4654 &mut self,
4655 ctx: &ExecuteContext,
4656 plan: AlterMaterializedViewApplyReplacementPlan,
4657 ) -> Result<ExecuteResponse, AdapterError> {
4658 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4659
4660 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4664 self.catalog_transact(Some(ctx.session()), ops).await?;
4665
4666 Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4667 }
4668
4669 pub(super) async fn statistics_oracle(
4670 &self,
4671 session: &Session,
4672 source_ids: &BTreeSet<GlobalId>,
4673 query_as_of: &Antichain<Timestamp>,
4674 is_oneshot: bool,
4675 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4676 super::statistics_oracle(
4677 session,
4678 source_ids,
4679 query_as_of,
4680 is_oneshot,
4681 self.catalog().system_config(),
4682 self.controller.storage_collections.as_ref(),
4683 )
4684 .await
4685 }
4686}
4687
4688impl Coordinator {
4689 async fn process_dataflow_metainfo(
4691 &mut self,
4692 df_meta: DataflowMetainfo,
4693 export_id: GlobalId,
4694 ctx: Option<&mut ExecuteContext>,
4695 notice_ids: Vec<GlobalId>,
4696 ) -> Option<BuiltinTableAppendNotify> {
4697 if let Some(ctx) = ctx {
4699 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4700 }
4701
4702 let df_meta = self
4704 .catalog()
4705 .render_notices(df_meta, notice_ids, Some(export_id));
4706
4707 if self.catalog().state().system_config().enable_mz_notices()
4710 && !df_meta.optimizer_notices.is_empty()
4711 {
4712 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4713 self.catalog().state().pack_optimizer_notices(
4714 &mut builtin_table_updates,
4715 df_meta.optimizer_notices.iter(),
4716 Diff::ONE,
4717 );
4718
4719 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4721
4722 Some(
4723 self.builtin_table_update()
4724 .execute(builtin_table_updates)
4725 .await
4726 .0,
4727 )
4728 } else {
4729 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4731
4732 None
4733 }
4734 }
4735}