1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::task::{self, JoinHandle, spawn};
34use mz_ore::tracing::OpenTelemetryContext;
35use mz_ore::{assert_none, instrument, soft_assert_or_log};
36use mz_repr::adt::jsonb::Jsonb;
37use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
38use mz_repr::explain::ExprHumanizer;
39use mz_repr::explain::json::json_string;
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
43 RowIterator, Timestamp,
44};
45use mz_sql::ast::{
46 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
47 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
48 SqlServerConfigOptionName,
49};
50use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
51use mz_sql::catalog::{
52 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
53 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
54 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
55};
56use mz_sql::names::{
57 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
58 SchemaSpecifier, SystemObjectId,
59};
60use mz_sql::plan::{
61 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62 StatementContext,
63};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::ExportDescription;
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_transform::dataflow::DataflowMetainfo;
92use smallvec::SmallVec;
93use timely::progress::Antichain;
94use tokio::sync::{oneshot, watch};
95use tracing::{Instrument, Span, info, warn};
96
97use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
98use crate::command::{ExecuteResponse, Response};
99use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
100use crate::coord::sequencer::emit_optimizer_notices;
101use crate::coord::{
102 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
103 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
104 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
105 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
106 WatchSetResponse, validate_ip_with_policy_rules,
107};
108use crate::error::AdapterError;
109use crate::notice::{AdapterNotice, DroppedInUseIndex};
110use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
111use crate::optimize::{self, Optimize};
112use crate::session::{
113 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
114 WriteLocks, WriteOp,
115};
116use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
117use crate::{PeekResponseUnary, ReadHolds};
118
119type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>;
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = return_if_err!(res, ctx);
255 f(ctx, next);
256 }
257 _ = rx, if cancel_enabled => {
258 ctx.retire(Err(AdapterError::Canceled));
259 }
260 }
261 });
262 }
263
264 async fn create_source_inner(
265 &self,
266 session: &Session,
267 plans: Vec<plan::CreateSourcePlanBundle>,
268 ) -> Result<CreateSourceInner, AdapterError> {
269 let mut ops = vec![];
270 let mut sources = vec![];
271
272 let if_not_exists_ids = plans
273 .iter()
274 .filter_map(
275 |plan::CreateSourcePlanBundle {
276 item_id,
277 global_id: _,
278 plan,
279 resolved_ids: _,
280 available_source_references: _,
281 }| {
282 if plan.if_not_exists {
283 Some((*item_id, plan.name.clone()))
284 } else {
285 None
286 }
287 },
288 )
289 .collect::<BTreeMap<_, _>>();
290
291 for plan::CreateSourcePlanBundle {
292 item_id,
293 global_id,
294 mut plan,
295 resolved_ids,
296 available_source_references,
297 } in plans
298 {
299 let name = plan.name.clone();
300
301 match plan.source.data_source {
302 plan::DataSourceDesc::Ingestion(ref desc)
303 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
304 let cluster_id = plan
305 .in_cluster
306 .expect("ingestion plans must specify cluster");
307 match desc.connection {
308 GenericSourceConnection::Postgres(_)
309 | GenericSourceConnection::MySql(_)
310 | GenericSourceConnection::SqlServer(_)
311 | GenericSourceConnection::Kafka(_)
312 | GenericSourceConnection::LoadGenerator(_) => {
313 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
314 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
315 .get(self.catalog().system_config().dyncfgs());
316
317 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
318 {
319 return Err(AdapterError::Unsupported(
320 "sources in clusters with >1 replicas",
321 ));
322 }
323 }
324 }
325 }
326 }
327 plan::DataSourceDesc::Webhook { .. } => {
328 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
329 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
330 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
331 .get(self.catalog().system_config().dyncfgs());
332
333 if !enable_multi_replica_sources {
334 if cluster.replica_ids().len() > 1 {
335 return Err(AdapterError::Unsupported(
336 "webhook sources in clusters with >1 replicas",
337 ));
338 }
339 }
340 }
341 }
342 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
343 }
344
345 if let mz_sql::plan::DataSourceDesc::Webhook {
347 validate_using: Some(validate),
348 ..
349 } = &mut plan.source.data_source
350 {
351 if let Err(reason) = validate.reduce_expression().await {
352 self.metrics
353 .webhook_validation_reduce_failures
354 .with_label_values(&[reason])
355 .inc();
356 return Err(AdapterError::Internal(format!(
357 "failed to reduce check expression, {reason}"
358 )));
359 }
360 }
361
362 let mut reference_ops = vec![];
365 if let Some(references) = &available_source_references {
366 reference_ops.push(catalog::Op::UpdateSourceReferences {
367 source_id: item_id,
368 references: references.clone().into(),
369 });
370 }
371
372 let source = Source::new(plan, global_id, resolved_ids, None, false);
373 ops.push(catalog::Op::CreateItem {
374 id: item_id,
375 name,
376 item: CatalogItem::Source(source.clone()),
377 owner_id: *session.current_role_id(),
378 });
379 sources.push((item_id, source));
380 ops.extend(reference_ops);
382 }
383
384 Ok(CreateSourceInner {
385 ops,
386 sources,
387 if_not_exists_ids,
388 })
389 }
390
391 pub(crate) fn plan_subsource(
399 &self,
400 session: &Session,
401 params: &mz_sql::plan::Params,
402 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
403 item_id: CatalogItemId,
404 global_id: GlobalId,
405 ) -> Result<CreateSourcePlanBundle, AdapterError> {
406 let catalog = self.catalog().for_session(session);
407 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
408
409 let plan = self.plan_statement(
410 session,
411 Statement::CreateSubsource(subsource_stmt),
412 params,
413 &resolved_ids,
414 )?;
415 let plan = match plan {
416 Plan::CreateSource(plan) => plan,
417 _ => unreachable!(),
418 };
419 Ok(CreateSourcePlanBundle {
420 item_id,
421 global_id,
422 plan,
423 resolved_ids,
424 available_source_references: None,
425 })
426 }
427
428 pub(crate) async fn plan_purified_alter_source_add_subsource(
430 &mut self,
431 session: &Session,
432 params: Params,
433 source_name: ResolvedItemName,
434 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
435 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
436 ) -> Result<(Plan, ResolvedIds), AdapterError> {
437 let mut subsource_plans = Vec::with_capacity(subsources.len());
438
439 let conn_catalog = self.catalog().for_system_session();
441 let pcx = plan::PlanContext::zero();
442 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
443
444 let entry = self.catalog().get_entry(source_name.item_id());
445 let source = entry.source().ok_or_else(|| {
446 AdapterError::internal(
447 "plan alter source",
448 format!("expected Source found {entry:?}"),
449 )
450 })?;
451
452 let item_id = entry.id();
453 let ingestion_id = source.global_id();
454 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
455
456 let ids = self
457 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
458 .await?;
459 for (subsource_stmt, (item_id, global_id)) in
460 subsource_stmts.into_iter().zip_eq(ids.into_iter())
461 {
462 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
463 subsource_plans.push(s);
464 }
465
466 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
467 subsources: subsource_plans,
468 options,
469 };
470
471 Ok((
472 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
473 item_id,
474 ingestion_id,
475 action,
476 }),
477 ResolvedIds::empty(),
478 ))
479 }
480
481 pub(crate) fn plan_purified_alter_source_refresh_references(
483 &self,
484 _session: &Session,
485 _params: Params,
486 source_name: ResolvedItemName,
487 available_source_references: plan::SourceReferences,
488 ) -> Result<(Plan, ResolvedIds), AdapterError> {
489 let entry = self.catalog().get_entry(source_name.item_id());
490 let source = entry.source().ok_or_else(|| {
491 AdapterError::internal(
492 "plan alter source",
493 format!("expected Source found {entry:?}"),
494 )
495 })?;
496 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
497 references: available_source_references,
498 };
499
500 Ok((
501 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
502 item_id: entry.id(),
503 ingestion_id: source.global_id(),
504 action,
505 }),
506 ResolvedIds::empty(),
507 ))
508 }
509
510 pub(crate) async fn plan_purified_create_source(
514 &mut self,
515 ctx: &ExecuteContext,
516 params: Params,
517 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
518 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
519 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
520 available_source_references: plan::SourceReferences,
521 ) -> Result<(Plan, ResolvedIds), AdapterError> {
522 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
523
524 if let Some(progress_stmt) = progress_stmt {
526 assert_none!(progress_stmt.of_source);
531 let (item_id, global_id) = self.allocate_user_id().await?;
532 let progress_plan =
533 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
534 let progress_full_name = self
535 .catalog()
536 .resolve_full_name(&progress_plan.plan.name, None);
537 let progress_subsource = ResolvedItemName::Item {
538 id: progress_plan.item_id,
539 qualifiers: progress_plan.plan.name.qualifiers.clone(),
540 full_name: progress_full_name,
541 print_id: true,
542 version: RelationVersionSelector::Latest,
543 };
544
545 create_source_plans.push(progress_plan);
546
547 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
548 }
549
550 let catalog = self.catalog().for_session(ctx.session());
551 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
552
553 let propagated_with_options: Vec<_> = source_stmt
554 .with_options
555 .iter()
556 .filter_map(|opt| match opt.name {
557 CreateSourceOptionName::TimestampInterval => None,
558 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
559 name: CreateSubsourceOptionName::RetainHistory,
560 value: opt.value.clone(),
561 }),
562 })
563 .collect();
564
565 let source_plan = match self.plan_statement(
567 ctx.session(),
568 Statement::CreateSource(source_stmt),
569 ¶ms,
570 &resolved_ids,
571 )? {
572 Plan::CreateSource(plan) => plan,
573 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
574 };
575
576 let (item_id, global_id) = self.allocate_user_id().await?;
577
578 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
579 let of_source = ResolvedItemName::Item {
580 id: item_id,
581 qualifiers: source_plan.name.qualifiers.clone(),
582 full_name: source_full_name,
583 print_id: true,
584 version: RelationVersionSelector::Latest,
585 };
586
587 let conn_catalog = self.catalog().for_system_session();
589 let pcx = plan::PlanContext::zero();
590 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
591
592 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
593
594 for subsource_stmt in subsource_stmts.iter_mut() {
595 subsource_stmt
596 .with_options
597 .extend(propagated_with_options.iter().cloned())
598 }
599
600 create_source_plans.push(CreateSourcePlanBundle {
601 item_id,
602 global_id,
603 plan: source_plan,
604 resolved_ids: resolved_ids.clone(),
605 available_source_references: Some(available_source_references),
606 });
607
608 let ids = self
610 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
611 .await?;
612 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
613 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
614 create_source_plans.push(plan);
615 }
616
617 Ok((
618 Plan::CreateSources(create_source_plans),
619 ResolvedIds::empty(),
620 ))
621 }
622
623 #[instrument]
624 pub(super) async fn sequence_create_source(
625 &mut self,
626 ctx: &mut ExecuteContext,
627 plans: Vec<plan::CreateSourcePlanBundle>,
628 ) -> Result<ExecuteResponse, AdapterError> {
629 let CreateSourceInner {
630 ops,
631 sources,
632 if_not_exists_ids,
633 } = self.create_source_inner(ctx.session(), plans).await?;
634
635 let transact_result = self
636 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
637 .await;
638
639 for (item_id, source) in &sources {
641 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
642 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
643 ctx.session()
644 .add_notice(AdapterNotice::WebhookSourceCreated { url });
645 }
646 }
647 }
648
649 match transact_result {
650 Ok(()) => Ok(ExecuteResponse::CreatedSource),
651 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
652 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
653 })) if if_not_exists_ids.contains_key(&id) => {
654 ctx.session()
655 .add_notice(AdapterNotice::ObjectAlreadyExists {
656 name: if_not_exists_ids[&id].item.clone(),
657 ty: "source",
658 });
659 Ok(ExecuteResponse::CreatedSource)
660 }
661 Err(err) => Err(err),
662 }
663 }
664
665 #[instrument]
666 pub(super) async fn sequence_create_connection(
667 &mut self,
668 mut ctx: ExecuteContext,
669 plan: plan::CreateConnectionPlan,
670 resolved_ids: ResolvedIds,
671 ) {
672 let (connection_id, connection_gid) = match self.allocate_user_id().await {
673 Ok(item_id) => item_id,
674 Err(err) => return ctx.retire(Err(err)),
675 };
676
677 match &plan.connection.details {
678 ConnectionDetails::Ssh { key_1, key_2, .. } => {
679 let key_1 = match key_1.as_key_pair() {
680 Some(key_1) => key_1.clone(),
681 None => {
682 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
683 "the PUBLIC KEY 1 option cannot be explicitly specified"
684 ))));
685 }
686 };
687
688 let key_2 = match key_2.as_key_pair() {
689 Some(key_2) => key_2.clone(),
690 None => {
691 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
692 "the PUBLIC KEY 2 option cannot be explicitly specified"
693 ))));
694 }
695 };
696
697 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
698 let secret = key_set.to_bytes();
699 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
700 return ctx.retire(Err(err.into()));
701 }
702 self.caching_secrets_reader.invalidate(connection_id);
703 }
704 _ => (),
705 };
706
707 if plan.validate {
708 let internal_cmd_tx = self.internal_cmd_tx.clone();
709 let transient_revision = self.catalog().transient_revision();
710 let conn_id = ctx.session().conn_id().clone();
711 let otel_ctx = OpenTelemetryContext::obtain();
712 let role_metadata = ctx.session().role_metadata().clone();
713
714 let connection = plan
715 .connection
716 .details
717 .to_connection()
718 .into_inline_connection(self.catalog().state());
719
720 let current_storage_parameters = self.controller.storage.config().clone();
721 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
722 let result = match connection
723 .validate(connection_id, ¤t_storage_parameters)
724 .await
725 {
726 Ok(()) => Ok(plan),
727 Err(err) => Err(err.into()),
728 };
729
730 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
732 CreateConnectionValidationReady {
733 ctx,
734 result,
735 connection_id,
736 connection_gid,
737 plan_validity: PlanValidity::new(
738 transient_revision,
739 resolved_ids.items().copied().collect(),
740 None,
741 None,
742 role_metadata,
743 ),
744 otel_ctx,
745 resolved_ids: resolved_ids.clone(),
746 },
747 ));
748 if let Err(e) = result {
749 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
750 }
751 });
752 } else {
753 let result = self
754 .sequence_create_connection_stage_finish(
755 &mut ctx,
756 connection_id,
757 connection_gid,
758 plan,
759 resolved_ids,
760 )
761 .await;
762 ctx.retire(result);
763 }
764 }
765
766 #[instrument]
767 pub(crate) async fn sequence_create_connection_stage_finish(
768 &mut self,
769 ctx: &mut ExecuteContext,
770 connection_id: CatalogItemId,
771 connection_gid: GlobalId,
772 plan: plan::CreateConnectionPlan,
773 resolved_ids: ResolvedIds,
774 ) -> Result<ExecuteResponse, AdapterError> {
775 let ops = vec![catalog::Op::CreateItem {
776 id: connection_id,
777 name: plan.name.clone(),
778 item: CatalogItem::Connection(Connection {
779 create_sql: plan.connection.create_sql,
780 global_id: connection_gid,
781 details: plan.connection.details.clone(),
782 resolved_ids,
783 }),
784 owner_id: *ctx.session().current_role_id(),
785 }];
786
787 let conn_id = ctx.session().conn_id().clone();
790 let transact_result = self
791 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
792 .await;
793
794 match transact_result {
795 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
796 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
797 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
798 })) if plan.if_not_exists => {
799 ctx.session()
800 .add_notice(AdapterNotice::ObjectAlreadyExists {
801 name: plan.name.item,
802 ty: "connection",
803 });
804 Ok(ExecuteResponse::CreatedConnection)
805 }
806 Err(err) => Err(err),
807 }
808 }
809
810 #[instrument]
811 pub(super) async fn sequence_create_database(
812 &mut self,
813 session: &Session,
814 plan: plan::CreateDatabasePlan,
815 ) -> Result<ExecuteResponse, AdapterError> {
816 let ops = vec![catalog::Op::CreateDatabase {
817 name: plan.name.clone(),
818 owner_id: *session.current_role_id(),
819 }];
820 match self.catalog_transact(Some(session), ops).await {
821 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
822 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
823 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
824 })) if plan.if_not_exists => {
825 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
826 Ok(ExecuteResponse::CreatedDatabase)
827 }
828 Err(err) => Err(err),
829 }
830 }
831
832 #[instrument]
833 pub(super) async fn sequence_create_schema(
834 &mut self,
835 session: &Session,
836 plan: plan::CreateSchemaPlan,
837 ) -> Result<ExecuteResponse, AdapterError> {
838 let op = catalog::Op::CreateSchema {
839 database_id: plan.database_spec,
840 schema_name: plan.schema_name.clone(),
841 owner_id: *session.current_role_id(),
842 };
843 match self.catalog_transact(Some(session), vec![op]).await {
844 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
845 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
846 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
847 })) if plan.if_not_exists => {
848 session.add_notice(AdapterNotice::SchemaAlreadyExists {
849 name: plan.schema_name,
850 });
851 Ok(ExecuteResponse::CreatedSchema)
852 }
853 Err(err) => Err(err),
854 }
855 }
856
857 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
859 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
860 if attributes.superuser.is_some() || attributes.password.is_some() {
861 return Err(AdapterError::UnavailableFeature {
862 feature: "SUPERUSER and PASSWORD attributes".to_string(),
863 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
864 });
865 }
866 }
867 Ok(())
868 }
869
870 #[instrument]
871 pub(super) async fn sequence_create_role(
872 &mut self,
873 conn_id: Option<&ConnectionId>,
874 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
875 ) -> Result<ExecuteResponse, AdapterError> {
876 self.validate_role_attributes(&attributes.clone())?;
877 let op = catalog::Op::CreateRole { name, attributes };
878 self.catalog_transact_with_context(conn_id, None, vec![op])
879 .await
880 .map(|_| ExecuteResponse::CreatedRole)
881 }
882
883 #[instrument]
884 pub(super) async fn sequence_create_network_policy(
885 &mut self,
886 session: &Session,
887 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
888 ) -> Result<ExecuteResponse, AdapterError> {
889 let op = catalog::Op::CreateNetworkPolicy {
890 rules,
891 name,
892 owner_id: *session.current_role_id(),
893 };
894 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
895 .await
896 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
897 }
898
899 #[instrument]
900 pub(super) async fn sequence_alter_network_policy(
901 &mut self,
902 session: &Session,
903 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
904 ) -> Result<ExecuteResponse, AdapterError> {
905 let current_network_policy_name =
907 self.catalog().system_config().default_network_policy_name();
908 if current_network_policy_name == name {
910 self.validate_alter_network_policy(session, &rules)?;
911 }
912
913 let op = catalog::Op::AlterNetworkPolicy {
914 id,
915 rules,
916 name,
917 owner_id: *session.current_role_id(),
918 };
919 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
920 .await
921 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
922 }
923
924 #[instrument]
925 pub(super) async fn sequence_create_table(
926 &mut self,
927 ctx: &mut ExecuteContext,
928 plan: plan::CreateTablePlan,
929 resolved_ids: ResolvedIds,
930 ) -> Result<ExecuteResponse, AdapterError> {
931 let plan::CreateTablePlan {
932 name,
933 table,
934 if_not_exists,
935 } = plan;
936
937 let conn_id = if table.temporary {
938 Some(ctx.session().conn_id())
939 } else {
940 None
941 };
942 let (table_id, global_id) = self.allocate_user_id().await?;
943 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
944
945 let data_source = match table.data_source {
946 plan::TableDataSource::TableWrites { defaults } => {
947 TableDataSource::TableWrites { defaults }
948 }
949 plan::TableDataSource::DataSource {
950 desc: data_source_plan,
951 timeline,
952 } => match data_source_plan {
953 plan::DataSourceDesc::IngestionExport {
954 ingestion_id,
955 external_reference,
956 details,
957 data_config,
958 } => TableDataSource::DataSource {
959 desc: DataSourceDesc::IngestionExport {
960 ingestion_id,
961 external_reference,
962 details,
963 data_config,
964 },
965 timeline,
966 },
967 plan::DataSourceDesc::Webhook {
968 validate_using,
969 body_format,
970 headers,
971 cluster_id,
972 } => TableDataSource::DataSource {
973 desc: DataSourceDesc::Webhook {
974 validate_using,
975 body_format,
976 headers,
977 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
978 },
979 timeline,
980 },
981 o => {
982 unreachable!("CREATE TABLE data source got {:?}", o)
983 }
984 },
985 };
986
987 let is_webhook = if let TableDataSource::DataSource {
988 desc: DataSourceDesc::Webhook { .. },
989 timeline: _,
990 } = &data_source
991 {
992 true
993 } else {
994 false
995 };
996
997 let table = Table {
998 create_sql: Some(table.create_sql),
999 desc: table.desc,
1000 collections,
1001 conn_id: conn_id.cloned(),
1002 resolved_ids,
1003 custom_logical_compaction_window: table.compaction_window,
1004 is_retained_metrics_object: false,
1005 data_source,
1006 };
1007 let ops = vec![catalog::Op::CreateItem {
1008 id: table_id,
1009 name: name.clone(),
1010 item: CatalogItem::Table(table.clone()),
1011 owner_id: *ctx.session().current_role_id(),
1012 }];
1013
1014 let catalog_result = self
1015 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1016 .await;
1017
1018 if is_webhook {
1019 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1022 ctx.session()
1023 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1024 }
1025 }
1026
1027 match catalog_result {
1028 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1029 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1030 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1031 })) if if_not_exists => {
1032 ctx.session_mut()
1033 .add_notice(AdapterNotice::ObjectAlreadyExists {
1034 name: name.item,
1035 ty: "table",
1036 });
1037 Ok(ExecuteResponse::CreatedTable)
1038 }
1039 Err(err) => Err(err),
1040 }
1041 }
1042
1043 #[instrument]
1044 pub(super) async fn sequence_create_sink(
1045 &mut self,
1046 ctx: ExecuteContext,
1047 plan: plan::CreateSinkPlan,
1048 resolved_ids: ResolvedIds,
1049 ) {
1050 let plan::CreateSinkPlan {
1051 name,
1052 sink,
1053 with_snapshot,
1054 if_not_exists,
1055 in_cluster,
1056 } = plan;
1057
1058 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1060
1061 let catalog_sink = Sink {
1062 create_sql: sink.create_sql,
1063 global_id,
1064 from: sink.from,
1065 connection: sink.connection,
1066 envelope: sink.envelope,
1067 version: sink.version,
1068 with_snapshot,
1069 resolved_ids,
1070 cluster_id: in_cluster,
1071 commit_interval: sink.commit_interval,
1072 };
1073
1074 let ops = vec![catalog::Op::CreateItem {
1075 id: item_id,
1076 name: name.clone(),
1077 item: CatalogItem::Sink(catalog_sink.clone()),
1078 owner_id: *ctx.session().current_role_id(),
1079 }];
1080
1081 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1082
1083 match result {
1084 Ok(()) => {}
1085 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1086 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1087 })) if if_not_exists => {
1088 ctx.session()
1089 .add_notice(AdapterNotice::ObjectAlreadyExists {
1090 name: name.item,
1091 ty: "sink",
1092 });
1093 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1094 return;
1095 }
1096 Err(e) => {
1097 ctx.retire(Err(e));
1098 return;
1099 }
1100 };
1101
1102 self.create_storage_export(global_id, &catalog_sink)
1103 .await
1104 .unwrap_or_terminate("cannot fail to create exports");
1105
1106 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1107 .await;
1108
1109 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1110 }
1111
1112 pub(super) fn validate_system_column_references(
1135 &self,
1136 uses_ambiguous_columns: bool,
1137 depends_on: &BTreeSet<GlobalId>,
1138 ) -> Result<(), AdapterError> {
1139 if uses_ambiguous_columns
1140 && depends_on
1141 .iter()
1142 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1143 {
1144 Err(AdapterError::AmbiguousSystemColumnReference)
1145 } else {
1146 Ok(())
1147 }
1148 }
1149
1150 #[instrument]
1151 pub(super) async fn sequence_create_type(
1152 &mut self,
1153 session: &Session,
1154 plan: plan::CreateTypePlan,
1155 resolved_ids: ResolvedIds,
1156 ) -> Result<ExecuteResponse, AdapterError> {
1157 let (item_id, global_id) = self.allocate_user_id().await?;
1158 plan.typ
1160 .inner
1161 .desc(&self.catalog().for_session(session))
1162 .map_err(AdapterError::from)?;
1163 let typ = Type {
1164 create_sql: Some(plan.typ.create_sql),
1165 global_id,
1166 details: CatalogTypeDetails {
1167 array_id: None,
1168 typ: plan.typ.inner,
1169 pg_metadata: None,
1170 },
1171 resolved_ids,
1172 };
1173 let op = catalog::Op::CreateItem {
1174 id: item_id,
1175 name: plan.name,
1176 item: CatalogItem::Type(typ),
1177 owner_id: *session.current_role_id(),
1178 };
1179 match self.catalog_transact(Some(session), vec![op]).await {
1180 Ok(()) => Ok(ExecuteResponse::CreatedType),
1181 Err(err) => Err(err),
1182 }
1183 }
1184
1185 #[instrument]
1186 pub(super) async fn sequence_comment_on(
1187 &mut self,
1188 session: &Session,
1189 plan: plan::CommentPlan,
1190 ) -> Result<ExecuteResponse, AdapterError> {
1191 let op = catalog::Op::Comment {
1192 object_id: plan.object_id,
1193 sub_component: plan.sub_component,
1194 comment: plan.comment,
1195 };
1196 self.catalog_transact(Some(session), vec![op]).await?;
1197 Ok(ExecuteResponse::Comment)
1198 }
1199
1200 #[instrument]
1201 pub(super) async fn sequence_drop_objects(
1202 &mut self,
1203 ctx: &mut ExecuteContext,
1204 plan::DropObjectsPlan {
1205 drop_ids,
1206 object_type,
1207 referenced_ids,
1208 }: plan::DropObjectsPlan,
1209 ) -> Result<ExecuteResponse, AdapterError> {
1210 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1211 let mut objects = Vec::new();
1212 for obj_id in &drop_ids {
1213 if !referenced_ids_hashset.contains(obj_id) {
1214 let object_info = ErrorMessageObjectDescription::from_id(
1215 obj_id,
1216 &self.catalog().for_session(ctx.session()),
1217 )
1218 .to_string();
1219 objects.push(object_info);
1220 }
1221 }
1222
1223 if !objects.is_empty() {
1224 ctx.session()
1225 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1226 }
1227
1228 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1230 .iter()
1231 .filter_map(|id| match id {
1232 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1233 _ => None,
1234 })
1235 .flatten()
1236 .collect();
1237
1238 let DropOps {
1239 ops,
1240 dropped_active_db,
1241 dropped_active_cluster,
1242 dropped_in_use_indexes,
1243 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1244
1245 self.catalog_transact_with_context(None, Some(ctx), ops)
1246 .await?;
1247
1248 if !expr_cache_invalidate_ids.is_empty() {
1250 let _fut = self.catalog().update_expression_cache(
1251 Default::default(),
1252 Default::default(),
1253 expr_cache_invalidate_ids,
1254 );
1255 }
1256
1257 fail::fail_point!("after_sequencer_drop_replica");
1258
1259 if dropped_active_db {
1260 ctx.session()
1261 .add_notice(AdapterNotice::DroppedActiveDatabase {
1262 name: ctx.session().vars().database().to_string(),
1263 });
1264 }
1265 if dropped_active_cluster {
1266 ctx.session()
1267 .add_notice(AdapterNotice::DroppedActiveCluster {
1268 name: ctx.session().vars().cluster().to_string(),
1269 });
1270 }
1271 for dropped_in_use_index in dropped_in_use_indexes {
1272 ctx.session()
1273 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1274 self.metrics
1275 .optimization_notices
1276 .with_label_values(&["DroppedInUseIndex"])
1277 .inc_by(1);
1278 }
1279 Ok(ExecuteResponse::DroppedObject(object_type))
1280 }
1281
1282 fn validate_dropped_role_ownership(
1283 &self,
1284 session: &Session,
1285 dropped_roles: &BTreeMap<RoleId, &str>,
1286 ) -> Result<(), AdapterError> {
1287 fn privilege_check(
1288 privileges: &PrivilegeMap,
1289 dropped_roles: &BTreeMap<RoleId, &str>,
1290 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1291 object_id: &SystemObjectId,
1292 catalog: &ConnCatalog,
1293 ) {
1294 for privilege in privileges.all_values() {
1295 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1296 let grantor_name = catalog.get_role(&privilege.grantor).name();
1297 let object_description =
1298 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1299 dependent_objects
1300 .entry(role_name.to_string())
1301 .or_default()
1302 .push(format!(
1303 "privileges on {object_description} granted by {grantor_name}",
1304 ));
1305 }
1306 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1307 let grantee_name = catalog.get_role(&privilege.grantee).name();
1308 let object_description =
1309 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1310 dependent_objects
1311 .entry(role_name.to_string())
1312 .or_default()
1313 .push(format!(
1314 "privileges granted on {object_description} to {grantee_name}"
1315 ));
1316 }
1317 }
1318 }
1319
1320 let catalog = self.catalog().for_session(session);
1321 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1322 for entry in self.catalog.entries() {
1323 let id = SystemObjectId::Object(entry.id().into());
1324 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1325 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1326 dependent_objects
1327 .entry(role_name.to_string())
1328 .or_default()
1329 .push(format!("owner of {object_description}"));
1330 }
1331 privilege_check(
1332 entry.privileges(),
1333 dropped_roles,
1334 &mut dependent_objects,
1335 &id,
1336 &catalog,
1337 );
1338 }
1339 for database in self.catalog.databases() {
1340 let database_id = SystemObjectId::Object(database.id().into());
1341 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1342 let object_description =
1343 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1344 dependent_objects
1345 .entry(role_name.to_string())
1346 .or_default()
1347 .push(format!("owner of {object_description}"));
1348 }
1349 privilege_check(
1350 &database.privileges,
1351 dropped_roles,
1352 &mut dependent_objects,
1353 &database_id,
1354 &catalog,
1355 );
1356 for schema in database.schemas_by_id.values() {
1357 let schema_id = SystemObjectId::Object(
1358 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1359 );
1360 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1361 let object_description =
1362 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1363 dependent_objects
1364 .entry(role_name.to_string())
1365 .or_default()
1366 .push(format!("owner of {object_description}"));
1367 }
1368 privilege_check(
1369 &schema.privileges,
1370 dropped_roles,
1371 &mut dependent_objects,
1372 &schema_id,
1373 &catalog,
1374 );
1375 }
1376 }
1377 for cluster in self.catalog.clusters() {
1378 let cluster_id = SystemObjectId::Object(cluster.id().into());
1379 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1380 let object_description =
1381 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1382 dependent_objects
1383 .entry(role_name.to_string())
1384 .or_default()
1385 .push(format!("owner of {object_description}"));
1386 }
1387 privilege_check(
1388 &cluster.privileges,
1389 dropped_roles,
1390 &mut dependent_objects,
1391 &cluster_id,
1392 &catalog,
1393 );
1394 for replica in cluster.replicas() {
1395 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1396 let replica_id =
1397 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1398 let object_description =
1399 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1400 dependent_objects
1401 .entry(role_name.to_string())
1402 .or_default()
1403 .push(format!("owner of {object_description}"));
1404 }
1405 }
1406 }
1407 privilege_check(
1408 self.catalog().system_privileges(),
1409 dropped_roles,
1410 &mut dependent_objects,
1411 &SystemObjectId::System,
1412 &catalog,
1413 );
1414 for (default_privilege_object, default_privilege_acl_items) in
1415 self.catalog.default_privileges()
1416 {
1417 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1418 dependent_objects
1419 .entry(role_name.to_string())
1420 .or_default()
1421 .push(format!(
1422 "default privileges on {}S created by {}",
1423 default_privilege_object.object_type, role_name
1424 ));
1425 }
1426 for default_privilege_acl_item in default_privilege_acl_items {
1427 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1428 dependent_objects
1429 .entry(role_name.to_string())
1430 .or_default()
1431 .push(format!(
1432 "default privileges on {}S granted to {}",
1433 default_privilege_object.object_type, role_name
1434 ));
1435 }
1436 }
1437 }
1438
1439 if !dependent_objects.is_empty() {
1440 Err(AdapterError::DependentObject(dependent_objects))
1441 } else {
1442 Ok(())
1443 }
1444 }
1445
1446 #[instrument]
1447 pub(super) async fn sequence_drop_owned(
1448 &mut self,
1449 session: &Session,
1450 plan: plan::DropOwnedPlan,
1451 ) -> Result<ExecuteResponse, AdapterError> {
1452 for role_id in &plan.role_ids {
1453 self.catalog().ensure_not_reserved_role(role_id)?;
1454 }
1455
1456 let mut privilege_revokes = plan.privilege_revokes;
1457
1458 let session_catalog = self.catalog().for_session(session);
1460 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1461 && !session.is_superuser()
1462 {
1463 let role_membership =
1465 session_catalog.collect_role_membership(session.current_role_id());
1466 let invalid_revokes: BTreeSet<_> = privilege_revokes
1467 .extract_if(.., |(_, privilege)| {
1468 !role_membership.contains(&privilege.grantor)
1469 })
1470 .map(|(object_id, _)| object_id)
1471 .collect();
1472 for invalid_revoke in invalid_revokes {
1473 let object_description =
1474 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1475 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1476 }
1477 }
1478
1479 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1480 catalog::Op::UpdatePrivilege {
1481 target_id: object_id,
1482 privilege,
1483 variant: UpdatePrivilegeVariant::Revoke,
1484 }
1485 });
1486 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1487 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1488 privilege_object,
1489 privilege_acl_item,
1490 variant: UpdatePrivilegeVariant::Revoke,
1491 },
1492 );
1493 let DropOps {
1494 ops: drop_ops,
1495 dropped_active_db,
1496 dropped_active_cluster,
1497 dropped_in_use_indexes,
1498 } = self.sequence_drop_common(session, plan.drop_ids)?;
1499
1500 let ops = privilege_revoke_ops
1501 .chain(default_privilege_revoke_ops)
1502 .chain(drop_ops.into_iter())
1503 .collect();
1504
1505 self.catalog_transact(Some(session), ops).await?;
1506
1507 if dropped_active_db {
1508 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1509 name: session.vars().database().to_string(),
1510 });
1511 }
1512 if dropped_active_cluster {
1513 session.add_notice(AdapterNotice::DroppedActiveCluster {
1514 name: session.vars().cluster().to_string(),
1515 });
1516 }
1517 for dropped_in_use_index in dropped_in_use_indexes {
1518 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1519 }
1520 Ok(ExecuteResponse::DroppedOwned)
1521 }
1522
1523 fn sequence_drop_common(
1524 &self,
1525 session: &Session,
1526 ids: Vec<ObjectId>,
1527 ) -> Result<DropOps, AdapterError> {
1528 let mut dropped_active_db = false;
1529 let mut dropped_active_cluster = false;
1530 let mut dropped_in_use_indexes = Vec::new();
1531 let mut dropped_roles = BTreeMap::new();
1532 let mut dropped_databases = BTreeSet::new();
1533 let mut dropped_schemas = BTreeSet::new();
1534 let mut role_revokes = BTreeSet::new();
1538 let mut default_privilege_revokes = BTreeSet::new();
1541
1542 let mut clusters_to_drop = BTreeSet::new();
1544
1545 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1546 for id in &ids {
1547 match id {
1548 ObjectId::Database(id) => {
1549 let name = self.catalog().get_database(id).name();
1550 if name == session.vars().database() {
1551 dropped_active_db = true;
1552 }
1553 dropped_databases.insert(id);
1554 }
1555 ObjectId::Schema((_, spec)) => {
1556 if let SchemaSpecifier::Id(id) = spec {
1557 dropped_schemas.insert(id);
1558 }
1559 }
1560 ObjectId::Cluster(id) => {
1561 clusters_to_drop.insert(*id);
1562 if let Some(active_id) = self
1563 .catalog()
1564 .active_cluster(session)
1565 .ok()
1566 .map(|cluster| cluster.id())
1567 {
1568 if id == &active_id {
1569 dropped_active_cluster = true;
1570 }
1571 }
1572 }
1573 ObjectId::Role(id) => {
1574 let role = self.catalog().get_role(id);
1575 let name = role.name();
1576 dropped_roles.insert(*id, name);
1577 for (group_id, grantor_id) in &role.membership.map {
1579 role_revokes.insert((*group_id, *id, *grantor_id));
1580 }
1581 }
1582 ObjectId::Item(id) => {
1583 if let Some(index) = self.catalog().get_entry(id).index() {
1584 let humanizer = self.catalog().for_session(session);
1585 let dependants = self
1586 .controller
1587 .compute
1588 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1589 .ok()
1590 .into_iter()
1591 .flatten()
1592 .filter(|dependant_id| {
1593 if dependant_id.is_transient() {
1600 return false;
1601 }
1602 let Some(dependent_id) = humanizer
1604 .try_get_item_by_global_id(dependant_id)
1605 .map(|item| item.id())
1606 else {
1607 return false;
1608 };
1609 !ids_set.contains(&ObjectId::Item(dependent_id))
1612 })
1613 .flat_map(|dependant_id| {
1614 humanizer.humanize_id(dependant_id)
1618 })
1619 .collect_vec();
1620 if !dependants.is_empty() {
1621 dropped_in_use_indexes.push(DroppedInUseIndex {
1622 index_name: humanizer
1623 .humanize_id(index.global_id())
1624 .unwrap_or_else(|| id.to_string()),
1625 dependant_objects: dependants,
1626 });
1627 }
1628 }
1629 }
1630 _ => {}
1631 }
1632 }
1633
1634 for id in &ids {
1635 match id {
1636 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1640 if !clusters_to_drop.contains(cluster_id) {
1641 let cluster = self.catalog.get_cluster(*cluster_id);
1642 if cluster.is_managed() {
1643 let replica =
1644 cluster.replica(*replica_id).expect("Catalog out of sync");
1645 if !replica.config.location.internal() {
1646 coord_bail!("cannot drop replica of managed cluster");
1647 }
1648 }
1649 }
1650 }
1651 _ => {}
1652 }
1653 }
1654
1655 for role_id in dropped_roles.keys() {
1656 self.catalog().ensure_not_reserved_role(role_id)?;
1657 }
1658 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1659 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1661 for role in self.catalog().user_roles() {
1662 for dropped_role_id in
1663 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1664 {
1665 role_revokes.insert((
1666 **dropped_role_id,
1667 role.id(),
1668 *role
1669 .membership
1670 .map
1671 .get(*dropped_role_id)
1672 .expect("included in keys above"),
1673 ));
1674 }
1675 }
1676
1677 for (default_privilege_object, default_privilege_acls) in
1678 self.catalog().default_privileges()
1679 {
1680 if matches!(
1681 &default_privilege_object.database_id,
1682 Some(database_id) if dropped_databases.contains(database_id),
1683 ) || matches!(
1684 &default_privilege_object.schema_id,
1685 Some(schema_id) if dropped_schemas.contains(schema_id),
1686 ) {
1687 for default_privilege_acl in default_privilege_acls {
1688 default_privilege_revokes.insert((
1689 default_privilege_object.clone(),
1690 default_privilege_acl.clone(),
1691 ));
1692 }
1693 }
1694 }
1695
1696 let ops = role_revokes
1697 .into_iter()
1698 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1699 role_id,
1700 member_id,
1701 grantor_id,
1702 })
1703 .chain(default_privilege_revokes.into_iter().map(
1704 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1705 privilege_object,
1706 privilege_acl_item,
1707 variant: UpdatePrivilegeVariant::Revoke,
1708 },
1709 ))
1710 .chain(iter::once(catalog::Op::DropObjects(
1711 ids.into_iter()
1712 .map(DropObjectInfo::manual_drop_from_object_id)
1713 .collect(),
1714 )))
1715 .collect();
1716
1717 Ok(DropOps {
1718 ops,
1719 dropped_active_db,
1720 dropped_active_cluster,
1721 dropped_in_use_indexes,
1722 })
1723 }
1724
1725 pub(super) fn sequence_explain_schema(
1726 &self,
1727 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1728 ) -> Result<ExecuteResponse, AdapterError> {
1729 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1730 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1731 })?;
1732
1733 let json_string = json_string(&json_value);
1734 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1735 Ok(Self::send_immediate_rows(row))
1736 }
1737
1738 pub(super) fn sequence_show_all_variables(
1739 &self,
1740 session: &Session,
1741 ) -> Result<ExecuteResponse, AdapterError> {
1742 let mut rows = viewable_variables(self.catalog().state(), session)
1743 .map(|v| (v.name(), v.value(), v.description()))
1744 .collect::<Vec<_>>();
1745 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1746
1747 let rows: Vec<_> = rows
1749 .into_iter()
1750 .map(|(name, val, desc)| {
1751 Row::pack_slice(&[
1752 Datum::String(name),
1753 Datum::String(&val),
1754 Datum::String(desc),
1755 ])
1756 })
1757 .collect();
1758 Ok(Self::send_immediate_rows(rows))
1759 }
1760
1761 pub(super) fn sequence_show_variable(
1762 &self,
1763 session: &Session,
1764 plan: plan::ShowVariablePlan,
1765 ) -> Result<ExecuteResponse, AdapterError> {
1766 if &plan.name == SCHEMA_ALIAS {
1767 let schemas = self.catalog.resolve_search_path(session);
1768 let schema = schemas.first();
1769 return match schema {
1770 Some((database_spec, schema_spec)) => {
1771 let schema_name = &self
1772 .catalog
1773 .get_schema(database_spec, schema_spec, session.conn_id())
1774 .name()
1775 .schema;
1776 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1777 Ok(Self::send_immediate_rows(row))
1778 }
1779 None => {
1780 if session.vars().current_object_missing_warnings() {
1781 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1782 search_path: session
1783 .vars()
1784 .search_path()
1785 .into_iter()
1786 .map(|schema| schema.to_string())
1787 .collect(),
1788 });
1789 }
1790 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1791 }
1792 };
1793 }
1794
1795 let variable = session
1796 .vars()
1797 .get(self.catalog().system_config(), &plan.name)
1798 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1799
1800 variable.visible(session.user(), self.catalog().system_config())?;
1803
1804 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1805 if variable.name() == vars::DATABASE.name()
1806 && matches!(
1807 self.catalog().resolve_database(&variable.value()),
1808 Err(CatalogError::UnknownDatabase(_))
1809 )
1810 && session.vars().current_object_missing_warnings()
1811 {
1812 let name = variable.value();
1813 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1814 } else if variable.name() == vars::CLUSTER.name()
1815 && matches!(
1816 self.catalog().resolve_cluster(&variable.value()),
1817 Err(CatalogError::UnknownCluster(_))
1818 )
1819 && session.vars().current_object_missing_warnings()
1820 {
1821 let name = variable.value();
1822 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1823 }
1824 Ok(Self::send_immediate_rows(row))
1825 }
1826
1827 #[instrument]
1828 pub(super) async fn sequence_inspect_shard(
1829 &self,
1830 session: &Session,
1831 plan: plan::InspectShardPlan,
1832 ) -> Result<ExecuteResponse, AdapterError> {
1833 if !session.user().is_internal() {
1836 return Err(AdapterError::Unauthorized(
1837 rbac::UnauthorizedError::MzSystem {
1838 action: "inspect".into(),
1839 },
1840 ));
1841 }
1842 let state = self
1843 .controller
1844 .storage
1845 .inspect_persist_state(plan.id)
1846 .await?;
1847 let jsonb = Jsonb::from_serde_json(state)?;
1848 Ok(Self::send_immediate_rows(jsonb.into_row()))
1849 }
1850
1851 #[instrument]
1852 pub(super) fn sequence_set_variable(
1853 &self,
1854 session: &mut Session,
1855 plan: plan::SetVariablePlan,
1856 ) -> Result<ExecuteResponse, AdapterError> {
1857 let (name, local) = (plan.name, plan.local);
1858 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1859 self.validate_set_isolation_level(session)?;
1860 }
1861 if &name == vars::CLUSTER.name() {
1862 self.validate_set_cluster(session)?;
1863 }
1864
1865 let vars = session.vars_mut();
1866 let values = match plan.value {
1867 plan::VariableValue::Default => None,
1868 plan::VariableValue::Values(values) => Some(values),
1869 };
1870
1871 match values {
1872 Some(values) => {
1873 vars.set(
1874 self.catalog().system_config(),
1875 &name,
1876 VarInput::SqlSet(&values),
1877 local,
1878 )?;
1879
1880 let vars = session.vars();
1881
1882 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1885 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1886 } else if name == vars::CLUSTER.name()
1887 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1888 {
1889 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1890 }
1891
1892 if name.as_str() == vars::DATABASE.name()
1894 && matches!(
1895 self.catalog().resolve_database(vars.database()),
1896 Err(CatalogError::UnknownDatabase(_))
1897 )
1898 && session.vars().current_object_missing_warnings()
1899 {
1900 let name = vars.database().to_string();
1901 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1902 } else if name.as_str() == vars::CLUSTER.name()
1903 && matches!(
1904 self.catalog().resolve_cluster(vars.cluster()),
1905 Err(CatalogError::UnknownCluster(_))
1906 )
1907 && session.vars().current_object_missing_warnings()
1908 {
1909 let name = vars.cluster().to_string();
1910 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1911 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1912 let v = values.into_first().to_lowercase();
1913 if v == IsolationLevel::ReadUncommitted.as_str()
1914 || v == IsolationLevel::ReadCommitted.as_str()
1915 || v == IsolationLevel::RepeatableRead.as_str()
1916 {
1917 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1918 isolation_level: v,
1919 });
1920 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
1921 session.add_notice(AdapterNotice::StrongSessionSerializable);
1922 }
1923 }
1924 }
1925 None => vars.reset(self.catalog().system_config(), &name, local)?,
1926 }
1927
1928 Ok(ExecuteResponse::SetVariable { name, reset: false })
1929 }
1930
1931 pub(super) fn sequence_reset_variable(
1932 &self,
1933 session: &mut Session,
1934 plan: plan::ResetVariablePlan,
1935 ) -> Result<ExecuteResponse, AdapterError> {
1936 let name = plan.name;
1937 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1938 self.validate_set_isolation_level(session)?;
1939 }
1940 if &name == vars::CLUSTER.name() {
1941 self.validate_set_cluster(session)?;
1942 }
1943 session
1944 .vars_mut()
1945 .reset(self.catalog().system_config(), &name, false)?;
1946 Ok(ExecuteResponse::SetVariable { name, reset: true })
1947 }
1948
1949 pub(super) fn sequence_set_transaction(
1950 &self,
1951 session: &mut Session,
1952 plan: plan::SetTransactionPlan,
1953 ) -> Result<ExecuteResponse, AdapterError> {
1954 for mode in plan.modes {
1956 match mode {
1957 TransactionMode::AccessMode(_) => {
1958 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
1959 }
1960 TransactionMode::IsolationLevel(isolation_level) => {
1961 self.validate_set_isolation_level(session)?;
1962
1963 session.vars_mut().set(
1964 self.catalog().system_config(),
1965 TRANSACTION_ISOLATION_VAR_NAME,
1966 VarInput::Flat(&isolation_level.to_ast_string_stable()),
1967 plan.local,
1968 )?
1969 }
1970 }
1971 }
1972 Ok(ExecuteResponse::SetVariable {
1973 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
1974 reset: false,
1975 })
1976 }
1977
1978 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
1979 if session.transaction().contains_ops() {
1980 Err(AdapterError::InvalidSetIsolationLevel)
1981 } else {
1982 Ok(())
1983 }
1984 }
1985
1986 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
1987 if session.transaction().contains_ops() {
1988 Err(AdapterError::InvalidSetCluster)
1989 } else {
1990 Ok(())
1991 }
1992 }
1993
1994 #[instrument]
1995 pub(super) async fn sequence_end_transaction(
1996 &mut self,
1997 mut ctx: ExecuteContext,
1998 mut action: EndTransactionAction,
1999 ) {
2000 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2002 (&action, ctx.session().transaction())
2003 {
2004 action = EndTransactionAction::Rollback;
2005 }
2006 let response = match action {
2007 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2008 params: BTreeMap::new(),
2009 }),
2010 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2011 params: BTreeMap::new(),
2012 }),
2013 };
2014
2015 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2016
2017 let (response, action) = match result {
2018 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2019 (response, action)
2020 }
2021 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2022 let validated_locks = match write_lock_guards {
2026 None => None,
2027 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2028 Ok(locks) => Some(locks),
2029 Err(missing) => {
2030 tracing::error!(?missing, "programming error, missing write locks");
2031 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2032 }
2033 },
2034 };
2035
2036 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2037 for WriteOp { id, rows } in writes {
2038 let total_rows = collected_writes.entry(id).or_default();
2039 total_rows.push(rows);
2040 }
2041
2042 self.submit_write(PendingWriteTxn::User {
2043 span: Span::current(),
2044 writes: collected_writes,
2045 write_locks: validated_locks,
2046 pending_txn: PendingTxn {
2047 ctx,
2048 response,
2049 action,
2050 },
2051 });
2052 return;
2053 }
2054 Ok((
2055 Some(TransactionOps::Peeks {
2056 determination,
2057 requires_linearization: RequireLinearization::Required,
2058 ..
2059 }),
2060 _,
2061 )) if ctx.session().vars().transaction_isolation()
2062 == &IsolationLevel::StrictSerializable =>
2063 {
2064 let conn_id = ctx.session().conn_id().clone();
2065 let pending_read_txn = PendingReadTxn {
2066 txn: PendingRead::Read {
2067 txn: PendingTxn {
2068 ctx,
2069 response,
2070 action,
2071 },
2072 },
2073 timestamp_context: determination.timestamp_context,
2074 created: Instant::now(),
2075 num_requeues: 0,
2076 otel_ctx: OpenTelemetryContext::obtain(),
2077 };
2078 self.strict_serializable_reads_tx
2079 .send((conn_id, pending_read_txn))
2080 .expect("sending to strict_serializable_reads_tx cannot fail");
2081 return;
2082 }
2083 Ok((
2084 Some(TransactionOps::Peeks {
2085 determination,
2086 requires_linearization: RequireLinearization::Required,
2087 ..
2088 }),
2089 _,
2090 )) if ctx.session().vars().transaction_isolation()
2091 == &IsolationLevel::StrongSessionSerializable =>
2092 {
2093 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2094 ctx.session_mut()
2095 .ensure_timestamp_oracle(timeline.clone())
2096 .apply_write(*ts);
2097 }
2098 (response, action)
2099 }
2100 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2101 self.internal_cmd_tx
2102 .send(Message::ExecuteSingleStatementTransaction {
2103 ctx,
2104 otel_ctx: OpenTelemetryContext::obtain(),
2105 stmt,
2106 params,
2107 })
2108 .expect("must send");
2109 return;
2110 }
2111 Ok((_, _)) => (response, action),
2112 Err(err) => (Err(err), EndTransactionAction::Rollback),
2113 };
2114 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2115 let response = response.map(|mut r| {
2117 r.extend_params(changed);
2118 ExecuteResponse::from(r)
2119 });
2120
2121 ctx.retire(response);
2122 }
2123
2124 #[instrument]
2125 async fn sequence_end_transaction_inner(
2126 &mut self,
2127 ctx: &mut ExecuteContext,
2128 action: EndTransactionAction,
2129 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2130 let txn = self.clear_transaction(ctx.session_mut()).await;
2131
2132 if let EndTransactionAction::Commit = action {
2133 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2134 match &mut ops {
2135 TransactionOps::Writes(writes) => {
2136 for WriteOp { id, .. } in &mut writes.iter() {
2137 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2139 AdapterError::Catalog(mz_catalog::memory::error::Error {
2140 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2141 })
2142 })?;
2143 }
2144
2145 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2147 }
2148 TransactionOps::DDL {
2149 ops,
2150 state: _,
2151 side_effects,
2152 revision,
2153 snapshot: _,
2154 } => {
2155 if *revision != self.catalog().transient_revision() {
2157 return Err(AdapterError::DDLTransactionRace);
2158 }
2159 let ops = std::mem::take(ops);
2161 let side_effects = std::mem::take(side_effects);
2162 self.catalog_transact_with_side_effects(
2163 Some(ctx),
2164 ops,
2165 move |a, mut ctx| {
2166 Box::pin(async move {
2167 for side_effect in side_effects {
2168 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2169 }
2170 })
2171 },
2172 )
2173 .await?;
2174 }
2175 _ => (),
2176 }
2177 return Ok((Some(ops), write_lock_guards));
2178 }
2179 }
2180
2181 Ok((None, None))
2182 }
2183
2184 pub(super) async fn sequence_side_effecting_func(
2185 &mut self,
2186 ctx: ExecuteContext,
2187 plan: SideEffectingFunc,
2188 ) {
2189 match plan {
2190 SideEffectingFunc::PgCancelBackend { connection_id } => {
2191 if ctx.session().conn_id().unhandled() == connection_id {
2192 ctx.retire(Err(AdapterError::Canceled));
2196 return;
2197 }
2198
2199 let res = if let Some((id_handle, _conn_meta)) =
2200 self.active_conns.get_key_value(&connection_id)
2201 {
2202 self.handle_privileged_cancel(id_handle.clone()).await;
2204 Datum::True
2205 } else {
2206 Datum::False
2207 };
2208 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2209 }
2210 }
2211 }
2212
2213 pub(crate) async fn execute_side_effecting_func(
2222 &mut self,
2223 plan: SideEffectingFunc,
2224 conn_id: ConnectionId,
2225 current_role: RoleId,
2226 ) -> Result<ExecuteResponse, AdapterError> {
2227 match plan {
2228 SideEffectingFunc::PgCancelBackend { connection_id } => {
2229 if conn_id.unhandled() == connection_id {
2230 return Err(AdapterError::Canceled);
2234 }
2235
2236 if let Some((_id_handle, conn_meta)) =
2239 self.active_conns.get_key_value(&connection_id)
2240 {
2241 let target_role = *conn_meta.authenticated_role_id();
2242 let role_membership = self
2243 .catalog()
2244 .state()
2245 .collect_role_membership(¤t_role);
2246 if !role_membership.contains(&target_role) {
2247 let target_role_name = self
2248 .catalog()
2249 .try_get_role(&target_role)
2250 .map(|role| role.name().to_string())
2251 .unwrap_or_else(|| target_role.to_string());
2252 return Err(AdapterError::Unauthorized(
2253 rbac::UnauthorizedError::RoleMembership {
2254 role_names: vec![target_role_name],
2255 },
2256 ));
2257 }
2258
2259 let id_handle = self
2261 .active_conns
2262 .get_key_value(&connection_id)
2263 .map(|(id, _)| id.clone())
2264 .expect("checked above");
2265 self.handle_privileged_cancel(id_handle).await;
2266 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2267 } else {
2268 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2270 }
2271 }
2272 }
2273 }
2274
2275 pub(crate) async fn determine_real_time_recent_timestamp(
2279 &self,
2280 source_ids: impl Iterator<Item = GlobalId>,
2281 real_time_recency_timeout: Duration,
2282 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2283 let item_ids = source_ids
2284 .map(|gid| {
2285 self.catalog
2286 .try_resolve_item_id(&gid)
2287 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2288 })
2289 .collect::<Result<Vec<_>, _>>()?;
2290
2291 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2297 if to_visit.is_empty() {
2300 return Ok(None);
2301 }
2302
2303 let mut timestamp_objects = BTreeSet::new();
2304
2305 while let Some(id) = to_visit.pop_front() {
2306 timestamp_objects.insert(id);
2307 to_visit.extend(
2308 self.catalog()
2309 .get_entry(&id)
2310 .uses()
2311 .into_iter()
2312 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2313 );
2314 }
2315 let timestamp_objects = timestamp_objects
2316 .into_iter()
2317 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2318 .collect();
2319
2320 let r = self
2321 .controller
2322 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2323 .await?;
2324
2325 Ok(Some(r))
2326 }
2327
2328 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2331 &self,
2332 session: &Session,
2333 source_ids: impl Iterator<Item = GlobalId>,
2334 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2335 let vars = session.vars();
2336
2337 if vars.real_time_recency()
2338 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2339 && !session.contains_read_timestamp()
2340 {
2341 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2342 .await
2343 } else {
2344 Ok(None)
2345 }
2346 }
2347
2348 #[instrument]
2349 pub(super) async fn sequence_explain_plan(
2350 &mut self,
2351 ctx: ExecuteContext,
2352 plan: plan::ExplainPlanPlan,
2353 target_cluster: TargetCluster,
2354 ) {
2355 match &plan.explainee {
2356 plan::Explainee::Statement(stmt) => match stmt {
2357 plan::ExplaineeStatement::CreateView { .. } => {
2358 self.explain_create_view(ctx, plan).await;
2359 }
2360 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2361 self.explain_create_materialized_view(ctx, plan).await;
2362 }
2363 plan::ExplaineeStatement::CreateIndex { .. } => {
2364 self.explain_create_index(ctx, plan).await;
2365 }
2366 plan::ExplaineeStatement::Select { .. } => {
2367 self.explain_peek(ctx, plan, target_cluster).await;
2368 }
2369 plan::ExplaineeStatement::Subscribe { .. } => {
2370 self.explain_subscribe(ctx, plan, target_cluster).await;
2371 }
2372 },
2373 plan::Explainee::View(_) => {
2374 let result = self.explain_view(&ctx, plan);
2375 ctx.retire(result);
2376 }
2377 plan::Explainee::MaterializedView(_) => {
2378 let result = self.explain_materialized_view(&ctx, plan);
2379 ctx.retire(result);
2380 }
2381 plan::Explainee::Index(_) => {
2382 let result = self.explain_index(&ctx, plan);
2383 ctx.retire(result);
2384 }
2385 plan::Explainee::ReplanView(_) => {
2386 self.explain_replan_view(ctx, plan).await;
2387 }
2388 plan::Explainee::ReplanMaterializedView(_) => {
2389 self.explain_replan_materialized_view(ctx, plan).await;
2390 }
2391 plan::Explainee::ReplanIndex(_) => {
2392 self.explain_replan_index(ctx, plan).await;
2393 }
2394 };
2395 }
2396
2397 pub(super) async fn sequence_explain_pushdown(
2398 &mut self,
2399 ctx: ExecuteContext,
2400 plan: plan::ExplainPushdownPlan,
2401 target_cluster: TargetCluster,
2402 ) {
2403 match plan.explainee {
2404 Explainee::Statement(ExplaineeStatement::Select {
2405 broken: false,
2406 plan,
2407 desc: _,
2408 }) => {
2409 let stage = return_if_err!(
2410 self.peek_validate(
2411 ctx.session(),
2412 plan,
2413 target_cluster,
2414 None,
2415 ExplainContext::Pushdown,
2416 Some(ctx.session().vars().max_query_result_size()),
2417 ),
2418 ctx
2419 );
2420 self.sequence_staged(ctx, Span::current(), stage).await;
2421 }
2422 Explainee::MaterializedView(item_id) => {
2423 self.explain_pushdown_materialized_view(ctx, item_id).await;
2424 }
2425 _ => {
2426 ctx.retire(Err(AdapterError::Unsupported(
2427 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2428 )));
2429 }
2430 };
2431 }
2432
2433 async fn execute_explain_pushdown_with_read_holds(
2435 &self,
2436 ctx: ExecuteContext,
2437 as_of: Antichain<Timestamp>,
2438 mz_now: ResultSpec<'static>,
2439 read_holds: Option<ReadHolds<Timestamp>>,
2440 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2441 ) {
2442 let fut = self
2443 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2444 .await;
2445 task::spawn(|| "render explain pushdown", async move {
2446 let _read_holds = read_holds;
2448 let res = fut.await;
2449 ctx.retire(res);
2450 });
2451 }
2452
2453 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2455 &self,
2456 session: &Session,
2457 as_of: Antichain<Timestamp>,
2458 mz_now: ResultSpec<'static>,
2459 imports: I,
2460 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2461 super::explain_pushdown_future_inner(
2463 session,
2464 &self.catalog,
2465 &self.controller.storage_collections,
2466 as_of,
2467 mz_now,
2468 imports,
2469 )
2470 .await
2471 }
2472
2473 #[instrument]
2474 pub(super) async fn sequence_insert(
2475 &mut self,
2476 mut ctx: ExecuteContext,
2477 plan: plan::InsertPlan,
2478 ) {
2479 if !ctx.session_mut().transaction().allows_writes() {
2487 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2488 return;
2489 }
2490
2491 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2505 let expr = return_if_err!(
2508 plan.values
2509 .clone()
2510 .lower(self.catalog().system_config(), None),
2511 ctx
2512 );
2513 OptimizedMirRelationExpr(expr)
2514 } else {
2515 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2517
2518 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2520
2521 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2523 };
2524
2525 match optimized_mir.into_inner() {
2526 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2527 let catalog = self.owned_catalog();
2528 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2529 let result =
2530 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2531 ctx.retire(result);
2532 });
2533 }
2534 _ => {
2536 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2537 Some(table) => {
2538 let desc = table.relation_desc_latest().expect("table has a desc");
2540 desc.arity()
2541 }
2542 None => {
2543 ctx.retire(Err(AdapterError::Catalog(
2544 mz_catalog::memory::error::Error {
2545 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2546 plan.id.to_string(),
2547 )),
2548 },
2549 )));
2550 return;
2551 }
2552 };
2553
2554 let finishing = RowSetFinishing {
2555 order_by: vec![],
2556 limit: None,
2557 offset: 0,
2558 project: (0..desc_arity).collect(),
2559 };
2560
2561 let read_then_write_plan = plan::ReadThenWritePlan {
2562 id: plan.id,
2563 selection: plan.values,
2564 finishing,
2565 assignments: BTreeMap::new(),
2566 kind: MutationKind::Insert,
2567 returning: plan.returning,
2568 };
2569
2570 self.sequence_read_then_write(ctx, read_then_write_plan)
2571 .await;
2572 }
2573 }
2574 }
2575
2576 #[instrument]
2581 pub(super) async fn sequence_read_then_write(
2582 &mut self,
2583 mut ctx: ExecuteContext,
2584 plan: plan::ReadThenWritePlan,
2585 ) {
2586 let mut source_ids: BTreeSet<_> = plan
2587 .selection
2588 .depends_on()
2589 .into_iter()
2590 .map(|gid| self.catalog().resolve_item_id(&gid))
2591 .collect();
2592 source_ids.insert(plan.id);
2593
2594 if ctx.session().transaction().write_locks().is_none() {
2596 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2598
2599 for id in &source_ids {
2601 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2602 write_locks.insert_lock(*id, lock);
2603 }
2604 }
2605
2606 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2608 Ok(locks) => locks,
2609 Err(missing) => {
2610 let role_metadata = ctx.session().role_metadata().clone();
2612 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2613 let plan = DeferredPlan {
2614 ctx,
2615 plan: Plan::ReadThenWrite(plan),
2616 validity: PlanValidity::new(
2617 self.catalog.transient_revision(),
2618 source_ids.clone(),
2619 None,
2620 None,
2621 role_metadata,
2622 ),
2623 requires_locks: source_ids,
2624 };
2625 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2626 }
2627 };
2628
2629 ctx.session_mut()
2630 .try_grant_write_locks(write_locks)
2631 .expect("session has already been granted write locks");
2632 }
2633
2634 let plan::ReadThenWritePlan {
2635 id,
2636 kind,
2637 selection,
2638 mut assignments,
2639 finishing,
2640 mut returning,
2641 } = plan;
2642
2643 let desc = match self.catalog().try_get_entry(&id) {
2645 Some(table) => {
2646 table
2648 .relation_desc_latest()
2649 .expect("table has a desc")
2650 .into_owned()
2651 }
2652 None => {
2653 ctx.retire(Err(AdapterError::Catalog(
2654 mz_catalog::memory::error::Error {
2655 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2656 },
2657 )));
2658 return;
2659 }
2660 };
2661
2662 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2664 || assignments.values().any(|e| e.contains_temporal())
2665 || returning.iter().any(|e| e.contains_temporal());
2666 if contains_temporal {
2667 ctx.retire(Err(AdapterError::Unsupported(
2668 "calls to mz_now in write statements",
2669 )));
2670 return;
2671 }
2672
2673 fn validate_read_dependencies(
2681 catalog: &Catalog,
2682 id: &CatalogItemId,
2683 ) -> Result<(), AdapterError> {
2684 use CatalogItemType::*;
2685 use mz_catalog::memory::objects;
2686 let mut ids_to_check = Vec::new();
2687 let valid = match catalog.try_get_entry(id) {
2688 Some(entry) => {
2689 if let CatalogItem::View(objects::View { optimized_expr, .. })
2690 | CatalogItem::MaterializedView(objects::MaterializedView {
2691 optimized_expr,
2692 ..
2693 }) = entry.item()
2694 {
2695 if optimized_expr.contains_temporal() {
2696 return Err(AdapterError::Unsupported(
2697 "calls to mz_now in write statements",
2698 ));
2699 }
2700 }
2701 match entry.item().typ() {
2702 typ @ (Func | View | MaterializedView | ContinualTask) => {
2703 ids_to_check.extend(entry.uses());
2704 let valid_id = id.is_user() || matches!(typ, Func);
2705 valid_id
2706 }
2707 Source | Secret | Connection => false,
2708 Sink | Index => unreachable!(),
2710 Table => {
2711 if !id.is_user() {
2712 false
2714 } else {
2715 entry.source_export_details().is_none()
2717 }
2718 }
2719 Type => true,
2720 }
2721 }
2722 None => false,
2723 };
2724 if !valid {
2725 let (object_name, object_type) = match catalog.try_get_entry(id) {
2726 Some(entry) => {
2727 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
2728 let object_type = match entry.item().typ() {
2729 Source => "source",
2731 Secret => "secret",
2732 Connection => "connection",
2733 Table => {
2734 if !id.is_user() {
2735 "system table"
2736 } else {
2737 "source-export table"
2738 }
2739 }
2740 View => "system view",
2741 MaterializedView => "system materialized view",
2742 ContinualTask => "system task",
2743 _ => "invalid dependency",
2744 };
2745 (object_name, object_type.to_string())
2746 }
2747 None => (id.to_string(), "unknown".to_string()),
2748 };
2749 return Err(AdapterError::InvalidTableMutationSelection {
2750 object_name,
2751 object_type,
2752 });
2753 }
2754 for id in ids_to_check {
2755 validate_read_dependencies(catalog, &id)?;
2756 }
2757 Ok(())
2758 }
2759
2760 for gid in selection.depends_on() {
2761 let item_id = self.catalog().resolve_item_id(&gid);
2762 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2763 ctx.retire(Err(err));
2764 return;
2765 }
2766 }
2767
2768 let (peek_tx, peek_rx) = oneshot::channel();
2769 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2770 let (tx, _, session, extra) = ctx.into_parts();
2771 let peek_ctx = ExecuteContext::from_parts(
2783 peek_client_tx,
2784 self.internal_cmd_tx.clone(),
2785 session,
2786 Default::default(),
2787 );
2788
2789 self.sequence_peek(
2790 peek_ctx,
2791 plan::SelectPlan {
2792 select: None,
2793 source: selection,
2794 when: QueryWhen::FreshestTableWrite,
2795 finishing,
2796 copy_to: None,
2797 },
2798 TargetCluster::Active,
2799 None,
2800 )
2801 .await;
2802
2803 let internal_cmd_tx = self.internal_cmd_tx.clone();
2804 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2805 let catalog = self.owned_catalog();
2806 let max_result_size = self.catalog().system_config().max_result_size();
2807
2808 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2809 let (peek_response, session) = match peek_rx.await {
2810 Ok(Response {
2811 result: Ok(resp),
2812 session,
2813 otel_ctx,
2814 }) => {
2815 otel_ctx.attach_as_parent();
2816 (resp, session)
2817 }
2818 Ok(Response {
2819 result: Err(e),
2820 session,
2821 otel_ctx,
2822 }) => {
2823 let ctx =
2824 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2825 otel_ctx.attach_as_parent();
2826 ctx.retire(Err(e));
2827 return;
2828 }
2829 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2831 };
2832 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2833 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2834
2835 if timeout_dur == Duration::ZERO {
2837 timeout_dur = Duration::MAX;
2838 }
2839
2840 let style = ExprPrepOneShot {
2841 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2843 catalog_state: catalog.state(),
2844 };
2845 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2846 return_if_err!(style.prep_scalar_expr(expr), ctx);
2847 }
2848
2849 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2850 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2851 let arena = RowArena::new();
2852 let mut diffs = Vec::new();
2853 let mut datum_vec = mz_repr::DatumVec::new();
2854
2855 while let Some(row) = rows.next() {
2856 if !assignments.is_empty() {
2857 assert!(
2858 matches!(kind, MutationKind::Update),
2859 "only updates support assignments"
2860 );
2861 let mut datums = datum_vec.borrow_with(row);
2862 let mut updates = vec![];
2863 for (idx, expr) in &assignments {
2864 let updated = match expr.eval(&datums, &arena) {
2865 Ok(updated) => updated,
2866 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2867 };
2868 updates.push((*idx, updated));
2869 }
2870 for (idx, new_value) in updates {
2871 datums[idx] = new_value;
2872 }
2873 let updated = Row::pack_slice(&datums);
2874 diffs.push((updated, Diff::ONE));
2875 }
2876 match kind {
2877 MutationKind::Update | MutationKind::Delete => {
2881 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2882 }
2883 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2884 }
2885 }
2886
2887 let mut byte_size: u64 = 0;
2890 for (row, diff) in &diffs {
2891 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2892 if diff.is_positive() {
2893 for (idx, datum) in row.iter().enumerate() {
2894 desc.constraints_met(idx, &datum)?;
2895 }
2896 }
2897 }
2898 Ok((diffs, byte_size))
2899 };
2900
2901 let diffs = match peek_response {
2902 ExecuteResponse::SendingRowsStreaming {
2903 rows: mut rows_stream,
2904 ..
2905 } => {
2906 let mut byte_size: u64 = 0;
2907 let mut diffs = Vec::new();
2908 let result = loop {
2909 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2910 Ok(Some(res)) => match res {
2911 PeekResponseUnary::Rows(new_rows) => {
2912 match make_diffs(new_rows) {
2913 Ok((mut new_diffs, new_byte_size)) => {
2914 byte_size = byte_size.saturating_add(new_byte_size);
2915 if byte_size > max_result_size {
2916 break Err(AdapterError::ResultSize(format!(
2917 "result exceeds max size of {max_result_size}"
2918 )));
2919 }
2920 diffs.append(&mut new_diffs)
2921 }
2922 Err(e) => break Err(e),
2923 };
2924 }
2925 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2926 PeekResponseUnary::Error(e) => {
2927 break Err(AdapterError::Unstructured(anyhow!(e)));
2928 }
2929 },
2930 Ok(None) => break Ok(diffs),
2931 Err(_) => {
2932 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2937 conn_id: ctx.session().conn_id().clone(),
2938 });
2939 if let Err(e) = result {
2940 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2941 }
2942 break Err(AdapterError::StatementTimeout);
2943 }
2944 }
2945 };
2946
2947 result
2948 }
2949 ExecuteResponse::SendingRowsImmediate { rows } => {
2950 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2951 }
2952 resp => Err(AdapterError::Unstructured(anyhow!(
2953 "unexpected peek response: {resp:?}"
2954 ))),
2955 };
2956
2957 let mut returning_rows = Vec::new();
2958 let mut diff_err: Option<AdapterError> = None;
2959 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2960 let arena = RowArena::new();
2961 for (row, diff) in diffs {
2962 if !diff.is_positive() {
2963 continue;
2964 }
2965 let mut returning_row = Row::with_capacity(returning.len());
2966 let mut packer = returning_row.packer();
2967 for expr in &returning {
2968 let datums: Vec<_> = row.iter().collect();
2969 match expr.eval(&datums, &arena) {
2970 Ok(datum) => {
2971 packer.push(datum);
2972 }
2973 Err(err) => {
2974 diff_err = Some(err.into());
2975 break;
2976 }
2977 }
2978 }
2979 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
2980 let diff = match NonZeroUsize::try_from(diff) {
2981 Ok(diff) => diff,
2982 Err(err) => {
2983 diff_err = Some(err.into());
2984 break;
2985 }
2986 };
2987 returning_rows.push((returning_row, diff));
2988 if diff_err.is_some() {
2989 break;
2990 }
2991 }
2992 }
2993 let diffs = if let Some(err) = diff_err {
2994 Err(err)
2995 } else {
2996 diffs
2997 };
2998
2999 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3002 if let Some(timestamp_context) = timestamp_context {
3011 let (tx, rx) = tokio::sync::oneshot::channel();
3012 let conn_id = ctx.session().conn_id().clone();
3013 let pending_read_txn = PendingReadTxn {
3014 txn: PendingRead::ReadThenWrite { ctx, tx },
3015 timestamp_context,
3016 created: Instant::now(),
3017 num_requeues: 0,
3018 otel_ctx: OpenTelemetryContext::obtain(),
3019 };
3020 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3021 if let Err(e) = result {
3023 warn!(
3024 "strict_serializable_reads_tx dropped before we could send: {:?}",
3025 e
3026 );
3027 return;
3028 }
3029 let result = rx.await;
3030 ctx = match result {
3032 Ok(Some(ctx)) => ctx,
3033 Ok(None) => {
3034 return;
3037 }
3038 Err(e) => {
3039 warn!(
3040 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3041 e
3042 );
3043 return;
3044 }
3045 };
3046 }
3047
3048 match diffs {
3049 Ok(diffs) => {
3050 let result = Self::send_diffs(
3051 ctx.session_mut(),
3052 plan::SendDiffsPlan {
3053 id,
3054 updates: diffs,
3055 kind,
3056 returning: returning_rows,
3057 max_result_size,
3058 },
3059 );
3060 ctx.retire(result);
3061 }
3062 Err(e) => {
3063 ctx.retire(Err(e));
3064 }
3065 }
3066 });
3067 }
3068
3069 #[instrument]
3070 pub(super) async fn sequence_alter_item_rename(
3071 &mut self,
3072 ctx: &mut ExecuteContext,
3073 plan: plan::AlterItemRenamePlan,
3074 ) -> Result<ExecuteResponse, AdapterError> {
3075 let op = catalog::Op::RenameItem {
3076 id: plan.id,
3077 current_full_name: plan.current_full_name,
3078 to_name: plan.to_name,
3079 };
3080 match self
3081 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3082 .await
3083 {
3084 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3085 Err(err) => Err(err),
3086 }
3087 }
3088
3089 #[instrument]
3090 pub(super) async fn sequence_alter_retain_history(
3091 &mut self,
3092 ctx: &mut ExecuteContext,
3093 plan: plan::AlterRetainHistoryPlan,
3094 ) -> Result<ExecuteResponse, AdapterError> {
3095 soft_assert_or_log!(
3096 !matches!(
3097 self.catalog().get_entry(&plan.id).item(),
3098 CatalogItem::ContinualTask(_)
3099 ),
3100 "RETAIN HISTORY is not supported on continual tasks"
3101 );
3102 let ops = vec![catalog::Op::AlterRetainHistory {
3103 id: plan.id,
3104 value: plan.value,
3105 window: plan.window,
3106 }];
3107 self.catalog_transact_with_context(None, Some(ctx), ops)
3108 .await?;
3109 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3110 }
3111
3112 #[instrument]
3113 pub(super) async fn sequence_alter_source_timestamp_interval(
3114 &mut self,
3115 ctx: &mut ExecuteContext,
3116 plan: plan::AlterSourceTimestampIntervalPlan,
3117 ) -> Result<ExecuteResponse, AdapterError> {
3118 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3119 id: plan.id,
3120 value: plan.value,
3121 interval: plan.interval,
3122 }];
3123 self.catalog_transact_with_context(None, Some(ctx), ops)
3124 .await?;
3125 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3126 }
3127
3128 #[instrument]
3129 pub(super) async fn sequence_alter_schema_rename(
3130 &mut self,
3131 ctx: &mut ExecuteContext,
3132 plan: plan::AlterSchemaRenamePlan,
3133 ) -> Result<ExecuteResponse, AdapterError> {
3134 let (database_spec, schema_spec) = plan.cur_schema_spec;
3135 let op = catalog::Op::RenameSchema {
3136 database_spec,
3137 schema_spec,
3138 new_name: plan.new_schema_name,
3139 check_reserved_names: true,
3140 };
3141 match self
3142 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3143 .await
3144 {
3145 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3146 Err(err) => Err(err),
3147 }
3148 }
3149
3150 #[instrument]
3151 pub(super) async fn sequence_alter_schema_swap(
3152 &mut self,
3153 ctx: &mut ExecuteContext,
3154 plan: plan::AlterSchemaSwapPlan,
3155 ) -> Result<ExecuteResponse, AdapterError> {
3156 let plan::AlterSchemaSwapPlan {
3157 schema_a_spec: (schema_a_db, schema_a),
3158 schema_a_name,
3159 schema_b_spec: (schema_b_db, schema_b),
3160 schema_b_name,
3161 name_temp,
3162 } = plan;
3163
3164 let op_a = catalog::Op::RenameSchema {
3165 database_spec: schema_a_db,
3166 schema_spec: schema_a,
3167 new_name: name_temp,
3168 check_reserved_names: false,
3169 };
3170 let op_b = catalog::Op::RenameSchema {
3171 database_spec: schema_b_db,
3172 schema_spec: schema_b,
3173 new_name: schema_a_name,
3174 check_reserved_names: false,
3175 };
3176 let op_c = catalog::Op::RenameSchema {
3177 database_spec: schema_a_db,
3178 schema_spec: schema_a,
3179 new_name: schema_b_name,
3180 check_reserved_names: false,
3181 };
3182
3183 match self
3184 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3185 Box::pin(async {})
3186 })
3187 .await
3188 {
3189 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3190 Err(err) => Err(err),
3191 }
3192 }
3193
3194 #[instrument]
3195 pub(super) async fn sequence_alter_role(
3196 &mut self,
3197 session: &Session,
3198 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3199 ) -> Result<ExecuteResponse, AdapterError> {
3200 let catalog = self.catalog().for_session(session);
3201 let role = catalog.get_role(&id);
3202
3203 let mut notices = vec![];
3205
3206 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3208 let mut vars = role.vars().clone();
3209
3210 let mut nopassword = false;
3213
3214 match option {
3216 PlannedAlterRoleOption::Attributes(attrs) => {
3217 self.validate_role_attributes(&attrs.clone().into())?;
3218
3219 if let Some(inherit) = attrs.inherit {
3220 attributes.inherit = inherit;
3221 }
3222
3223 if let Some(password) = attrs.password {
3224 attributes.password = Some(password);
3225 attributes.scram_iterations =
3226 Some(self.catalog().system_config().scram_iterations())
3227 }
3228
3229 if let Some(superuser) = attrs.superuser {
3230 attributes.superuser = Some(superuser);
3231 }
3232
3233 if let Some(login) = attrs.login {
3234 attributes.login = Some(login);
3235 }
3236
3237 if attrs.nopassword.unwrap_or(false) {
3238 nopassword = true;
3239 }
3240
3241 if let Some(notice) = self.should_emit_rbac_notice(session) {
3242 notices.push(notice);
3243 }
3244 }
3245 PlannedAlterRoleOption::Variable(variable) => {
3246 let session_var = session.vars().inspect(variable.name())?;
3248 session_var.visible(session.user(), catalog.system_vars())?;
3250
3251 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3254 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3255 } else if let PlannedRoleVariable::Set {
3256 name,
3257 value: VariableValue::Values(vals),
3258 } = &variable
3259 {
3260 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3261 notices.push(AdapterNotice::IntrospectionClusterUsage);
3262 }
3263 }
3264
3265 let var_name = match variable {
3266 PlannedRoleVariable::Set { name, value } => {
3267 match &value {
3269 VariableValue::Default => {
3270 vars.remove(&name);
3271 }
3272 VariableValue::Values(vals) => {
3273 let var = match &vals[..] {
3274 [val] => OwnedVarInput::Flat(val.clone()),
3275 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3276 };
3277 session_var.check(var.borrow())?;
3279
3280 vars.insert(name.clone(), var);
3281 }
3282 };
3283 name
3284 }
3285 PlannedRoleVariable::Reset { name } => {
3286 vars.remove(&name);
3288 name
3289 }
3290 };
3291
3292 notices.push(AdapterNotice::VarDefaultUpdated {
3294 role: Some(name.clone()),
3295 var_name: Some(var_name),
3296 });
3297 }
3298 }
3299
3300 let op = catalog::Op::AlterRole {
3301 id,
3302 name,
3303 attributes,
3304 nopassword,
3305 vars: RoleVars { map: vars },
3306 };
3307 let response = self
3308 .catalog_transact(Some(session), vec![op])
3309 .await
3310 .map(|_| ExecuteResponse::AlteredRole)?;
3311
3312 session.add_notices(notices);
3314
3315 Ok(response)
3316 }
3317
3318 #[instrument]
3319 pub(super) async fn sequence_alter_sink_prepare(
3320 &mut self,
3321 ctx: ExecuteContext,
3322 plan: plan::AlterSinkPlan,
3323 ) {
3324 let id_bundle = crate::CollectionIdBundle {
3326 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3327 compute_ids: BTreeMap::new(),
3328 };
3329 let read_hold = self.acquire_read_holds(&id_bundle);
3330
3331 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3332 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3333 return;
3334 };
3335
3336 let otel_ctx = OpenTelemetryContext::obtain();
3337 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3338
3339 let plan_validity = PlanValidity::new(
3340 self.catalog().transient_revision(),
3341 BTreeSet::from_iter([plan.item_id, from_item_id]),
3342 Some(plan.in_cluster),
3343 None,
3344 ctx.session().role_metadata().clone(),
3345 );
3346
3347 info!(
3348 "preparing alter sink for {}: frontiers={:?} export={:?}",
3349 plan.global_id,
3350 self.controller
3351 .storage_collections
3352 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3353 self.controller.storage.export(plan.global_id)
3354 );
3355
3356 self.install_storage_watch_set(
3362 ctx.session().conn_id().clone(),
3363 BTreeSet::from_iter([plan.global_id]),
3364 read_ts,
3365 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3366 ctx: Some(ctx),
3367 otel_ctx,
3368 plan,
3369 plan_validity,
3370 read_hold,
3371 }),
3372 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3373 }
3374
3375 #[instrument]
3376 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3377 ctx.otel_ctx.attach_as_parent();
3378
3379 let plan::AlterSinkPlan {
3380 item_id,
3381 global_id,
3382 sink: sink_plan,
3383 with_snapshot,
3384 in_cluster,
3385 } = ctx.plan.clone();
3386
3387 match ctx.plan_validity.check(self.catalog()) {
3396 Ok(()) => {}
3397 Err(err) => {
3398 ctx.retire(Err(err));
3399 return;
3400 }
3401 }
3402
3403 let entry = self.catalog().get_entry(&item_id);
3404 let CatalogItem::Sink(old_sink) = entry.item() else {
3405 panic!("invalid item kind for `AlterSinkPlan`");
3406 };
3407
3408 if sink_plan.version != old_sink.version + 1 {
3409 ctx.retire(Err(AdapterError::ChangedPlan(
3410 "sink was altered concurrently".into(),
3411 )));
3412 return;
3413 }
3414
3415 info!(
3416 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3417 self.controller
3418 .storage_collections
3419 .collections_frontiers(vec![global_id, sink_plan.from]),
3420 self.controller.storage.export(global_id),
3421 );
3422
3423 let write_frontier = &self
3426 .controller
3427 .storage
3428 .export(global_id)
3429 .expect("sink known to exist")
3430 .write_frontier;
3431 let as_of = ctx.read_hold.least_valid_read();
3432 assert!(
3433 write_frontier.iter().all(|t| as_of.less_than(t)),
3434 "{:?} should be strictly less than {:?}",
3435 &*as_of,
3436 &**write_frontier
3437 );
3438
3439 let create_sql = &old_sink.create_sql;
3445 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3446 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3447 unreachable!("invalid statement kind for sink");
3448 };
3449
3450 stmt.with_options
3452 .retain(|o| o.name != CreateSinkOptionName::Version);
3453 stmt.with_options.push(CreateSinkOption {
3454 name: CreateSinkOptionName::Version,
3455 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3456 sink_plan.version.to_string(),
3457 ))),
3458 });
3459
3460 let conn_catalog = self.catalog().for_system_session();
3461 let (mut stmt, resolved_ids) =
3462 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3463
3464 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3466 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3467 stmt.from = ResolvedItemName::Item {
3468 id: from_entry.id(),
3469 qualifiers: from_entry.name.qualifiers.clone(),
3470 full_name,
3471 print_id: true,
3472 version: from_entry.version,
3473 };
3474
3475 let new_sink = Sink {
3476 create_sql: stmt.to_ast_string_stable(),
3477 global_id,
3478 from: sink_plan.from,
3479 connection: sink_plan.connection.clone(),
3480 envelope: sink_plan.envelope,
3481 version: sink_plan.version,
3482 with_snapshot,
3483 resolved_ids: resolved_ids.clone(),
3484 cluster_id: in_cluster,
3485 commit_interval: sink_plan.commit_interval,
3486 };
3487
3488 let ops = vec![catalog::Op::UpdateItem {
3489 id: item_id,
3490 name: entry.name().clone(),
3491 to_item: CatalogItem::Sink(new_sink),
3492 }];
3493
3494 match self
3495 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3496 .await
3497 {
3498 Ok(()) => {}
3499 Err(err) => {
3500 ctx.retire(Err(err));
3501 return;
3502 }
3503 }
3504
3505 let storage_sink_desc = StorageSinkDesc {
3506 from: sink_plan.from,
3507 from_desc: from_entry
3508 .relation_desc()
3509 .expect("sinks can only be built on items with descs")
3510 .into_owned(),
3511 connection: sink_plan
3512 .connection
3513 .clone()
3514 .into_inline_connection(self.catalog().state()),
3515 envelope: sink_plan.envelope,
3516 as_of,
3517 with_snapshot,
3518 version: sink_plan.version,
3519 from_storage_metadata: (),
3520 to_storage_metadata: (),
3521 commit_interval: sink_plan.commit_interval,
3522 };
3523
3524 self.controller
3525 .storage
3526 .alter_export(
3527 global_id,
3528 ExportDescription {
3529 sink: storage_sink_desc,
3530 instance_id: in_cluster,
3531 },
3532 )
3533 .await
3534 .unwrap_or_terminate("cannot fail to alter source desc");
3535
3536 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3537 }
3538
3539 #[instrument]
3540 pub(super) async fn sequence_alter_connection(
3541 &mut self,
3542 ctx: ExecuteContext,
3543 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3544 ) {
3545 match action {
3546 AlterConnectionAction::RotateKeys => {
3547 self.sequence_rotate_keys(ctx, id).await;
3548 }
3549 AlterConnectionAction::AlterOptions {
3550 set_options,
3551 drop_options,
3552 validate,
3553 } => {
3554 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3555 .await
3556 }
3557 }
3558 }
3559
3560 #[instrument]
3561 async fn sequence_alter_connection_options(
3562 &mut self,
3563 mut ctx: ExecuteContext,
3564 id: CatalogItemId,
3565 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3566 drop_options: BTreeSet<ConnectionOptionName>,
3567 validate: bool,
3568 ) {
3569 let cur_entry = self.catalog().get_entry(&id);
3570 let cur_conn = cur_entry.connection().expect("known to be connection");
3571 let connection_gid = cur_conn.global_id();
3572
3573 let inner = || -> Result<Connection, AdapterError> {
3574 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3576 .expect("invalid create sql persisted to catalog")
3577 .into_element()
3578 .ast
3579 {
3580 Statement::CreateConnection(stmt) => stmt,
3581 _ => unreachable!("proved type is source"),
3582 };
3583
3584 let catalog = self.catalog().for_system_session();
3585
3586 let (mut create_conn_stmt, resolved_ids) =
3588 mz_sql::names::resolve(&catalog, create_conn_stmt)
3589 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3590
3591 create_conn_stmt
3593 .values
3594 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3595
3596 create_conn_stmt.values.extend(
3598 set_options
3599 .into_iter()
3600 .map(|(name, value)| ConnectionOption { name, value }),
3601 );
3602
3603 let mut catalog = self.catalog().for_system_session();
3606 catalog.mark_id_unresolvable_for_replanning(id);
3607
3608 let plan = match mz_sql::plan::plan(
3610 None,
3611 &catalog,
3612 Statement::CreateConnection(create_conn_stmt),
3613 &Params::empty(),
3614 &resolved_ids,
3615 )
3616 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3617 {
3618 Plan::CreateConnection(plan) => plan,
3619 _ => unreachable!("create source plan is only valid response"),
3620 };
3621
3622 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3624 .expect("invalid create sql persisted to catalog")
3625 .into_element()
3626 .ast
3627 {
3628 Statement::CreateConnection(stmt) => stmt,
3629 _ => unreachable!("proved type is source"),
3630 };
3631
3632 let catalog = self.catalog().for_system_session();
3633
3634 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3636 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3637
3638 Ok(Connection {
3639 create_sql: plan.connection.create_sql,
3640 global_id: cur_conn.global_id,
3641 details: plan.connection.details,
3642 resolved_ids: new_deps,
3643 })
3644 };
3645
3646 let conn = match inner() {
3647 Ok(conn) => conn,
3648 Err(e) => {
3649 return ctx.retire(Err(e));
3650 }
3651 };
3652
3653 if validate {
3654 let connection = conn
3655 .details
3656 .to_connection()
3657 .into_inline_connection(self.catalog().state());
3658
3659 let internal_cmd_tx = self.internal_cmd_tx.clone();
3660 let transient_revision = self.catalog().transient_revision();
3661 let conn_id = ctx.session().conn_id().clone();
3662 let otel_ctx = OpenTelemetryContext::obtain();
3663 let role_metadata = ctx.session().role_metadata().clone();
3664 let current_storage_parameters = self.controller.storage.config().clone();
3665
3666 task::spawn(
3667 || format!("validate_alter_connection:{conn_id}"),
3668 async move {
3669 let resolved_ids = conn.resolved_ids.clone();
3670 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3671 let result = match connection.validate(id, ¤t_storage_parameters).await {
3672 Ok(()) => Ok(conn),
3673 Err(err) => Err(err.into()),
3674 };
3675
3676 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3678 AlterConnectionValidationReady {
3679 ctx,
3680 result,
3681 connection_id: id,
3682 connection_gid,
3683 plan_validity: PlanValidity::new(
3684 transient_revision,
3685 dependency_ids.clone(),
3686 None,
3687 None,
3688 role_metadata,
3689 ),
3690 otel_ctx,
3691 resolved_ids,
3692 },
3693 ));
3694 if let Err(e) = result {
3695 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3696 }
3697 },
3698 );
3699 } else {
3700 let result = self
3701 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3702 .await;
3703 ctx.retire(result);
3704 }
3705 }
3706
3707 #[instrument]
3708 pub(crate) async fn sequence_alter_connection_stage_finish(
3709 &mut self,
3710 session: &Session,
3711 id: CatalogItemId,
3712 connection: Connection,
3713 ) -> Result<ExecuteResponse, AdapterError> {
3714 match self.catalog.get_entry(&id).item() {
3715 CatalogItem::Connection(curr_conn) => {
3716 curr_conn
3717 .details
3718 .to_connection()
3719 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3720 .map_err(StorageError::from)?;
3721 }
3722 _ => unreachable!("known to be a connection"),
3723 };
3724
3725 let ops = vec![catalog::Op::UpdateItem {
3726 id,
3727 name: self.catalog.get_entry(&id).name().clone(),
3728 to_item: CatalogItem::Connection(connection.clone()),
3729 }];
3730
3731 self.catalog_transact(Some(session), ops).await?;
3732
3733 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3740 }
3741
3742 #[instrument]
3743 pub(super) async fn sequence_alter_source(
3744 &mut self,
3745 session: &Session,
3746 plan::AlterSourcePlan {
3747 item_id,
3748 ingestion_id,
3749 action,
3750 }: plan::AlterSourcePlan,
3751 ) -> Result<ExecuteResponse, AdapterError> {
3752 let cur_entry = self.catalog().get_entry(&item_id);
3753 let cur_source = cur_entry.source().expect("known to be source");
3754
3755 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3756 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3758 .expect("invalid create sql persisted to catalog")
3759 .into_element()
3760 .ast
3761 {
3762 Statement::CreateSource(stmt) => stmt,
3763 _ => unreachable!("proved type is source"),
3764 };
3765
3766 let catalog = coord.catalog().for_system_session();
3767
3768 mz_sql::names::resolve(&catalog, create_source_stmt)
3770 .map_err(|e| AdapterError::internal(err_cx, e))
3771 };
3772
3773 match action {
3774 plan::AlterSourceAction::AddSubsourceExports {
3775 subsources,
3776 options,
3777 } => {
3778 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3779
3780 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3781 text_columns: mut new_text_columns,
3782 exclude_columns: mut new_exclude_columns,
3783 ..
3784 } = options.try_into()?;
3785
3786 let (mut create_source_stmt, resolved_ids) =
3788 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3789
3790 let catalog = self.catalog();
3792 let curr_references: BTreeSet<_> = catalog
3793 .get_entry(&item_id)
3794 .used_by()
3795 .into_iter()
3796 .filter_map(|subsource| {
3797 catalog
3798 .get_entry(subsource)
3799 .subsource_details()
3800 .map(|(_id, reference, _details)| reference)
3801 })
3802 .collect();
3803
3804 let purification_err =
3807 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3808
3809 match &mut create_source_stmt.connection {
3813 CreateSourceConnection::Postgres {
3814 options: curr_options,
3815 ..
3816 } => {
3817 let mz_sql::plan::PgConfigOptionExtracted {
3818 mut text_columns, ..
3819 } = curr_options.clone().try_into()?;
3820
3821 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3824
3825 text_columns.retain(|column_qualified_reference| {
3827 mz_ore::soft_assert_eq_or_log!(
3828 column_qualified_reference.0.len(),
3829 4,
3830 "all TEXT COLUMNS values must be column-qualified references"
3831 );
3832 let mut table = column_qualified_reference.clone();
3833 table.0.truncate(3);
3834 curr_references.contains(&table)
3835 });
3836
3837 new_text_columns.extend(text_columns);
3839
3840 if !new_text_columns.is_empty() {
3842 new_text_columns.sort();
3843 let new_text_columns = new_text_columns
3844 .into_iter()
3845 .map(WithOptionValue::UnresolvedItemName)
3846 .collect();
3847
3848 curr_options.push(PgConfigOption {
3849 name: PgConfigOptionName::TextColumns,
3850 value: Some(WithOptionValue::Sequence(new_text_columns)),
3851 });
3852 }
3853 }
3854 CreateSourceConnection::MySql {
3855 options: curr_options,
3856 ..
3857 } => {
3858 let mz_sql::plan::MySqlConfigOptionExtracted {
3859 mut text_columns,
3860 mut exclude_columns,
3861 ..
3862 } = curr_options.clone().try_into()?;
3863
3864 curr_options.retain(|o| {
3867 !matches!(
3868 o.name,
3869 MySqlConfigOptionName::TextColumns
3870 | MySqlConfigOptionName::ExcludeColumns
3871 )
3872 });
3873
3874 let column_referenced =
3876 |column_qualified_reference: &UnresolvedItemName| {
3877 mz_ore::soft_assert_eq_or_log!(
3878 column_qualified_reference.0.len(),
3879 3,
3880 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3881 );
3882 let mut table = column_qualified_reference.clone();
3883 table.0.truncate(2);
3884 curr_references.contains(&table)
3885 };
3886 text_columns.retain(column_referenced);
3887 exclude_columns.retain(column_referenced);
3888
3889 new_text_columns.extend(text_columns);
3891 new_exclude_columns.extend(exclude_columns);
3892
3893 if !new_text_columns.is_empty() {
3895 new_text_columns.sort();
3896 let new_text_columns = new_text_columns
3897 .into_iter()
3898 .map(WithOptionValue::UnresolvedItemName)
3899 .collect();
3900
3901 curr_options.push(MySqlConfigOption {
3902 name: MySqlConfigOptionName::TextColumns,
3903 value: Some(WithOptionValue::Sequence(new_text_columns)),
3904 });
3905 }
3906 if !new_exclude_columns.is_empty() {
3908 new_exclude_columns.sort();
3909 let new_exclude_columns = new_exclude_columns
3910 .into_iter()
3911 .map(WithOptionValue::UnresolvedItemName)
3912 .collect();
3913
3914 curr_options.push(MySqlConfigOption {
3915 name: MySqlConfigOptionName::ExcludeColumns,
3916 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3917 });
3918 }
3919 }
3920 CreateSourceConnection::SqlServer {
3921 options: curr_options,
3922 ..
3923 } => {
3924 let mz_sql::plan::SqlServerConfigOptionExtracted {
3925 mut text_columns,
3926 mut exclude_columns,
3927 ..
3928 } = curr_options.clone().try_into()?;
3929
3930 curr_options.retain(|o| {
3933 !matches!(
3934 o.name,
3935 SqlServerConfigOptionName::TextColumns
3936 | SqlServerConfigOptionName::ExcludeColumns
3937 )
3938 });
3939
3940 let column_referenced =
3942 |column_qualified_reference: &UnresolvedItemName| {
3943 mz_ore::soft_assert_eq_or_log!(
3944 column_qualified_reference.0.len(),
3945 3,
3946 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3947 );
3948 let mut table = column_qualified_reference.clone();
3949 table.0.truncate(2);
3950 curr_references.contains(&table)
3951 };
3952 text_columns.retain(column_referenced);
3953 exclude_columns.retain(column_referenced);
3954
3955 new_text_columns.extend(text_columns);
3957 new_exclude_columns.extend(exclude_columns);
3958
3959 if !new_text_columns.is_empty() {
3961 new_text_columns.sort();
3962 let new_text_columns = new_text_columns
3963 .into_iter()
3964 .map(WithOptionValue::UnresolvedItemName)
3965 .collect();
3966
3967 curr_options.push(SqlServerConfigOption {
3968 name: SqlServerConfigOptionName::TextColumns,
3969 value: Some(WithOptionValue::Sequence(new_text_columns)),
3970 });
3971 }
3972 if !new_exclude_columns.is_empty() {
3974 new_exclude_columns.sort();
3975 let new_exclude_columns = new_exclude_columns
3976 .into_iter()
3977 .map(WithOptionValue::UnresolvedItemName)
3978 .collect();
3979
3980 curr_options.push(SqlServerConfigOption {
3981 name: SqlServerConfigOptionName::ExcludeColumns,
3982 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3983 });
3984 }
3985 }
3986 _ => return Err(purification_err()),
3987 };
3988
3989 let mut catalog = self.catalog().for_system_session();
3990 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
3991
3992 let planned = mz_sql::plan::plan(
3994 None,
3995 &catalog,
3996 Statement::CreateSource(create_source_stmt),
3997 &Params::empty(),
3998 &resolved_ids,
3999 )
4000 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4001 let plan = match planned {
4002 Plan::CreateSource(plan) => plan,
4003 _ => unreachable!("create source plan is only valid response"),
4004 };
4005
4006 let source = Source::new(
4010 plan,
4011 cur_source.global_id,
4012 resolved_ids,
4013 cur_source.custom_logical_compaction_window,
4014 cur_source.is_retained_metrics_object,
4015 );
4016
4017 let desc = match &source.data_source {
4019 DataSourceDesc::Ingestion { desc, .. }
4020 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4021 desc.clone().into_inline_connection(self.catalog().state())
4022 }
4023 _ => unreachable!("already verified of type ingestion"),
4024 };
4025
4026 self.controller
4027 .storage
4028 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4029 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4030
4031 let mut ops = vec![catalog::Op::UpdateItem {
4034 id: item_id,
4035 name: self.catalog.get_entry(&item_id).name().clone(),
4038 to_item: CatalogItem::Source(source),
4039 }];
4040
4041 let CreateSourceInner {
4042 ops: new_ops,
4043 sources: _,
4044 if_not_exists_ids,
4045 } = self.create_source_inner(session, subsources).await?;
4046
4047 ops.extend(new_ops.into_iter());
4048
4049 assert!(
4050 if_not_exists_ids.is_empty(),
4051 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4052 );
4053
4054 self.catalog_transact(Some(session), ops).await?;
4055 }
4056 plan::AlterSourceAction::RefreshReferences { references } => {
4057 self.catalog_transact(
4058 Some(session),
4059 vec![catalog::Op::UpdateSourceReferences {
4060 source_id: item_id,
4061 references: references.into(),
4062 }],
4063 )
4064 .await?;
4065 }
4066 }
4067
4068 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4069 }
4070
4071 #[instrument]
4072 pub(super) async fn sequence_alter_system_set(
4073 &mut self,
4074 session: &Session,
4075 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4076 ) -> Result<ExecuteResponse, AdapterError> {
4077 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4078 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4080 self.validate_alter_system_network_policy(session, &value)?;
4081 }
4082
4083 let op = match value {
4084 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4085 name: name.clone(),
4086 value: OwnedVarInput::SqlSet(values),
4087 },
4088 plan::VariableValue::Default => {
4089 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4090 }
4091 };
4092 self.catalog_transact(Some(session), vec![op]).await?;
4093
4094 session.add_notice(AdapterNotice::VarDefaultUpdated {
4095 role: None,
4096 var_name: Some(name),
4097 });
4098 Ok(ExecuteResponse::AlteredSystemConfiguration)
4099 }
4100
4101 #[instrument]
4102 pub(super) async fn sequence_alter_system_reset(
4103 &mut self,
4104 session: &Session,
4105 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4106 ) -> Result<ExecuteResponse, AdapterError> {
4107 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4108 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4109 self.catalog_transact(Some(session), vec![op]).await?;
4110 session.add_notice(AdapterNotice::VarDefaultUpdated {
4111 role: None,
4112 var_name: Some(name),
4113 });
4114 Ok(ExecuteResponse::AlteredSystemConfiguration)
4115 }
4116
4117 #[instrument]
4118 pub(super) async fn sequence_alter_system_reset_all(
4119 &mut self,
4120 session: &Session,
4121 _: plan::AlterSystemResetAllPlan,
4122 ) -> Result<ExecuteResponse, AdapterError> {
4123 self.is_user_allowed_to_alter_system(session, None)?;
4124 let op = catalog::Op::ResetAllSystemConfiguration;
4125 self.catalog_transact(Some(session), vec![op]).await?;
4126 session.add_notice(AdapterNotice::VarDefaultUpdated {
4127 role: None,
4128 var_name: None,
4129 });
4130 Ok(ExecuteResponse::AlteredSystemConfiguration)
4131 }
4132
4133 fn is_user_allowed_to_alter_system(
4135 &self,
4136 session: &Session,
4137 var_name: Option<&str>,
4138 ) -> Result<(), AdapterError> {
4139 match (session.user().kind(), var_name) {
4140 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4142 (UserKind::Superuser, Some(name))
4144 if session.user().is_internal()
4145 || self.catalog().system_config().user_modifiable(name) =>
4146 {
4147 let var = self.catalog().system_config().get(name)?;
4150 var.visible(session.user(), self.catalog().system_config())?;
4151 Ok(())
4152 }
4153 (UserKind::Regular, Some(name))
4156 if self.catalog().system_config().user_modifiable(name) =>
4157 {
4158 Err(AdapterError::Unauthorized(
4159 rbac::UnauthorizedError::Superuser {
4160 action: format!("toggle the '{name}' system configuration parameter"),
4161 },
4162 ))
4163 }
4164 _ => Err(AdapterError::Unauthorized(
4165 rbac::UnauthorizedError::MzSystem {
4166 action: "alter system".into(),
4167 },
4168 )),
4169 }
4170 }
4171
4172 fn validate_alter_system_network_policy(
4173 &self,
4174 session: &Session,
4175 policy_value: &plan::VariableValue,
4176 ) -> Result<(), AdapterError> {
4177 let policy_name = match &policy_value {
4178 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4180 plan::VariableValue::Values(values) if values.len() == 1 => {
4181 values.iter().next().cloned()
4182 }
4183 plan::VariableValue::Values(values) => {
4184 tracing::warn!(?values, "can't set multiple network policies at once");
4185 None
4186 }
4187 };
4188 let maybe_network_policy = policy_name
4189 .as_ref()
4190 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4191 let Some(network_policy) = maybe_network_policy else {
4192 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4193 VarError::InvalidParameterValue {
4194 name: NETWORK_POLICY.name(),
4195 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4196 reason: "no network policy with such name exists".to_string(),
4197 },
4198 )));
4199 };
4200 self.validate_alter_network_policy(session, &network_policy.rules)
4201 }
4202
4203 fn validate_alter_network_policy(
4208 &self,
4209 session: &Session,
4210 policy_rules: &Vec<NetworkPolicyRule>,
4211 ) -> Result<(), AdapterError> {
4212 if session.user().is_internal() {
4215 return Ok(());
4216 }
4217 if let Some(ip) = session.meta().client_ip() {
4218 validate_ip_with_policy_rules(ip, policy_rules)
4219 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4220 } else {
4221 return Err(AdapterError::NetworkPolicyDenied(
4224 NetworkPolicyError::MissingIp,
4225 ));
4226 }
4227 Ok(())
4228 }
4229
4230 #[instrument]
4232 pub(super) fn sequence_execute(
4233 &self,
4234 session: &mut Session,
4235 plan: plan::ExecutePlan,
4236 ) -> Result<String, AdapterError> {
4237 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4239 let ps = session
4240 .get_prepared_statement_unverified(&plan.name)
4241 .expect("known to exist");
4242 let stmt = ps.stmt().cloned();
4243 let desc = ps.desc().clone();
4244 let state_revision = ps.state_revision;
4245 let logging = Arc::clone(ps.logging());
4246 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4247 }
4248
4249 #[instrument]
4250 pub(super) async fn sequence_grant_privileges(
4251 &mut self,
4252 session: &Session,
4253 plan::GrantPrivilegesPlan {
4254 update_privileges,
4255 grantees,
4256 }: plan::GrantPrivilegesPlan,
4257 ) -> Result<ExecuteResponse, AdapterError> {
4258 self.sequence_update_privileges(
4259 session,
4260 update_privileges,
4261 grantees,
4262 UpdatePrivilegeVariant::Grant,
4263 )
4264 .await
4265 }
4266
4267 #[instrument]
4268 pub(super) async fn sequence_revoke_privileges(
4269 &mut self,
4270 session: &Session,
4271 plan::RevokePrivilegesPlan {
4272 update_privileges,
4273 revokees,
4274 }: plan::RevokePrivilegesPlan,
4275 ) -> Result<ExecuteResponse, AdapterError> {
4276 self.sequence_update_privileges(
4277 session,
4278 update_privileges,
4279 revokees,
4280 UpdatePrivilegeVariant::Revoke,
4281 )
4282 .await
4283 }
4284
4285 #[instrument]
4286 async fn sequence_update_privileges(
4287 &mut self,
4288 session: &Session,
4289 update_privileges: Vec<UpdatePrivilege>,
4290 grantees: Vec<RoleId>,
4291 variant: UpdatePrivilegeVariant,
4292 ) -> Result<ExecuteResponse, AdapterError> {
4293 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4294 let mut warnings = Vec::new();
4295 let catalog = self.catalog().for_session(session);
4296
4297 for UpdatePrivilege {
4298 acl_mode,
4299 target_id,
4300 grantor,
4301 } in update_privileges
4302 {
4303 let actual_object_type = catalog.get_system_object_type(&target_id);
4304 if actual_object_type.is_relation() {
4307 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4308 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4309 if !non_applicable_privileges.is_empty() {
4310 let object_description =
4311 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4312 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4313 non_applicable_privileges,
4314 object_description,
4315 })
4316 }
4317 }
4318
4319 if let SystemObjectId::Object(object_id) = &target_id {
4320 self.catalog()
4321 .ensure_not_reserved_object(object_id, session.conn_id())?;
4322 }
4323
4324 let privileges = self
4325 .catalog()
4326 .get_privileges(&target_id, session.conn_id())
4327 .ok_or(AdapterError::Unsupported(
4330 "GRANTs/REVOKEs on an object type with no privileges",
4331 ))?;
4332
4333 for grantee in &grantees {
4334 self.catalog().ensure_not_system_role(grantee)?;
4335 self.catalog().ensure_not_predefined_role(grantee)?;
4336 let existing_privilege = privileges
4337 .get_acl_item(grantee, &grantor)
4338 .map(Cow::Borrowed)
4339 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4340
4341 match variant {
4342 UpdatePrivilegeVariant::Grant
4343 if !existing_privilege.acl_mode.contains(acl_mode) =>
4344 {
4345 ops.push(catalog::Op::UpdatePrivilege {
4346 target_id: target_id.clone(),
4347 privilege: MzAclItem {
4348 grantee: *grantee,
4349 grantor,
4350 acl_mode,
4351 },
4352 variant,
4353 });
4354 }
4355 UpdatePrivilegeVariant::Revoke
4356 if !existing_privilege
4357 .acl_mode
4358 .intersection(acl_mode)
4359 .is_empty() =>
4360 {
4361 ops.push(catalog::Op::UpdatePrivilege {
4362 target_id: target_id.clone(),
4363 privilege: MzAclItem {
4364 grantee: *grantee,
4365 grantor,
4366 acl_mode,
4367 },
4368 variant,
4369 });
4370 }
4371 _ => {}
4373 }
4374 }
4375 }
4376
4377 if ops.is_empty() {
4378 session.add_notices(warnings);
4379 return Ok(variant.into());
4380 }
4381
4382 let res = self
4383 .catalog_transact(Some(session), ops)
4384 .await
4385 .map(|_| match variant {
4386 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4387 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4388 });
4389 if res.is_ok() {
4390 session.add_notices(warnings);
4391 }
4392 res
4393 }
4394
4395 #[instrument]
4396 pub(super) async fn sequence_alter_default_privileges(
4397 &mut self,
4398 session: &Session,
4399 plan::AlterDefaultPrivilegesPlan {
4400 privilege_objects,
4401 privilege_acl_items,
4402 is_grant,
4403 }: plan::AlterDefaultPrivilegesPlan,
4404 ) -> Result<ExecuteResponse, AdapterError> {
4405 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4406 let variant = if is_grant {
4407 UpdatePrivilegeVariant::Grant
4408 } else {
4409 UpdatePrivilegeVariant::Revoke
4410 };
4411 for privilege_object in &privilege_objects {
4412 self.catalog()
4413 .ensure_not_system_role(&privilege_object.role_id)?;
4414 self.catalog()
4415 .ensure_not_predefined_role(&privilege_object.role_id)?;
4416 if let Some(database_id) = privilege_object.database_id {
4417 self.catalog()
4418 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4419 }
4420 if let Some(schema_id) = privilege_object.schema_id {
4421 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4422 let schema_spec: SchemaSpecifier = schema_id.into();
4423
4424 self.catalog().ensure_not_reserved_object(
4425 &(database_spec, schema_spec).into(),
4426 session.conn_id(),
4427 )?;
4428 }
4429 for privilege_acl_item in &privilege_acl_items {
4430 self.catalog()
4431 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4432 self.catalog()
4433 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4434 ops.push(catalog::Op::UpdateDefaultPrivilege {
4435 privilege_object: privilege_object.clone(),
4436 privilege_acl_item: privilege_acl_item.clone(),
4437 variant,
4438 })
4439 }
4440 }
4441
4442 self.catalog_transact(Some(session), ops).await?;
4443 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4444 }
4445
4446 #[instrument]
4447 pub(super) async fn sequence_grant_role(
4448 &mut self,
4449 session: &Session,
4450 plan::GrantRolePlan {
4451 role_ids,
4452 member_ids,
4453 grantor_id,
4454 }: plan::GrantRolePlan,
4455 ) -> Result<ExecuteResponse, AdapterError> {
4456 let catalog = self.catalog();
4457 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4458 for role_id in role_ids {
4459 for member_id in &member_ids {
4460 let member_membership: BTreeSet<_> =
4461 catalog.get_role(member_id).membership().keys().collect();
4462 if member_membership.contains(&role_id) {
4463 let role_name = catalog.get_role(&role_id).name().to_string();
4464 let member_name = catalog.get_role(member_id).name().to_string();
4465 catalog.ensure_not_reserved_role(member_id)?;
4467 catalog.ensure_grantable_role(&role_id)?;
4468 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4469 role_name,
4470 member_name,
4471 });
4472 } else {
4473 ops.push(catalog::Op::GrantRole {
4474 role_id,
4475 member_id: *member_id,
4476 grantor_id,
4477 });
4478 }
4479 }
4480 }
4481
4482 if ops.is_empty() {
4483 return Ok(ExecuteResponse::GrantedRole);
4484 }
4485
4486 self.catalog_transact(Some(session), ops)
4487 .await
4488 .map(|_| ExecuteResponse::GrantedRole)
4489 }
4490
4491 #[instrument]
4492 pub(super) async fn sequence_revoke_role(
4493 &mut self,
4494 session: &Session,
4495 plan::RevokeRolePlan {
4496 role_ids,
4497 member_ids,
4498 grantor_id,
4499 }: plan::RevokeRolePlan,
4500 ) -> Result<ExecuteResponse, AdapterError> {
4501 let catalog = self.catalog();
4502 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4503 for role_id in role_ids {
4504 for member_id in &member_ids {
4505 let member_membership: BTreeSet<_> =
4506 catalog.get_role(member_id).membership().keys().collect();
4507 if !member_membership.contains(&role_id) {
4508 let role_name = catalog.get_role(&role_id).name().to_string();
4509 let member_name = catalog.get_role(member_id).name().to_string();
4510 catalog.ensure_not_reserved_role(member_id)?;
4512 catalog.ensure_grantable_role(&role_id)?;
4513 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4514 role_name,
4515 member_name,
4516 });
4517 } else {
4518 ops.push(catalog::Op::RevokeRole {
4519 role_id,
4520 member_id: *member_id,
4521 grantor_id,
4522 });
4523 }
4524 }
4525 }
4526
4527 if ops.is_empty() {
4528 return Ok(ExecuteResponse::RevokedRole);
4529 }
4530
4531 self.catalog_transact(Some(session), ops)
4532 .await
4533 .map(|_| ExecuteResponse::RevokedRole)
4534 }
4535
4536 #[instrument]
4537 pub(super) async fn sequence_alter_owner(
4538 &mut self,
4539 session: &Session,
4540 plan::AlterOwnerPlan {
4541 id,
4542 object_type,
4543 new_owner,
4544 }: plan::AlterOwnerPlan,
4545 ) -> Result<ExecuteResponse, AdapterError> {
4546 let mut ops = vec![catalog::Op::UpdateOwner {
4547 id: id.clone(),
4548 new_owner,
4549 }];
4550
4551 match &id {
4552 ObjectId::Item(global_id) => {
4553 let entry = self.catalog().get_entry(global_id);
4554
4555 if entry.is_index() {
4557 let name = self
4558 .catalog()
4559 .resolve_full_name(entry.name(), Some(session.conn_id()))
4560 .to_string();
4561 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4562 return Ok(ExecuteResponse::AlteredObject(object_type));
4563 }
4564
4565 let dependent_index_ops = entry
4567 .used_by()
4568 .into_iter()
4569 .filter(|id| self.catalog().get_entry(id).is_index())
4570 .map(|id| catalog::Op::UpdateOwner {
4571 id: ObjectId::Item(*id),
4572 new_owner,
4573 });
4574 ops.extend(dependent_index_ops);
4575
4576 let dependent_subsources =
4578 entry
4579 .progress_id()
4580 .into_iter()
4581 .map(|item_id| catalog::Op::UpdateOwner {
4582 id: ObjectId::Item(item_id),
4583 new_owner,
4584 });
4585 ops.extend(dependent_subsources);
4586 }
4587 ObjectId::Cluster(cluster_id) => {
4588 let cluster = self.catalog().get_cluster(*cluster_id);
4589 let managed_cluster_replica_ops =
4591 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4592 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4593 new_owner,
4594 });
4595 ops.extend(managed_cluster_replica_ops);
4596 }
4597 _ => {}
4598 }
4599
4600 self.catalog_transact(Some(session), ops)
4601 .await
4602 .map(|_| ExecuteResponse::AlteredObject(object_type))
4603 }
4604
4605 #[instrument]
4606 pub(super) async fn sequence_reassign_owned(
4607 &mut self,
4608 session: &Session,
4609 plan::ReassignOwnedPlan {
4610 old_roles,
4611 new_role,
4612 reassign_ids,
4613 }: plan::ReassignOwnedPlan,
4614 ) -> Result<ExecuteResponse, AdapterError> {
4615 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4616 self.catalog().ensure_not_reserved_role(role_id)?;
4617 }
4618
4619 let ops = reassign_ids
4620 .into_iter()
4621 .map(|id| catalog::Op::UpdateOwner {
4622 id,
4623 new_owner: new_role,
4624 })
4625 .collect();
4626
4627 self.catalog_transact(Some(session), ops)
4628 .await
4629 .map(|_| ExecuteResponse::ReassignOwned)
4630 }
4631
4632 #[instrument]
4633 pub(crate) async fn handle_deferred_statement(&mut self) {
4634 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4638 return;
4639 };
4640 match ps {
4641 crate::coord::PlanStatement::Statement { stmt, params } => {
4642 self.handle_execute_inner(stmt, params, ctx).await;
4643 }
4644 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4645 self.sequence_plan(ctx, plan, resolved_ids).await;
4646 }
4647 }
4648 }
4649
4650 #[instrument]
4651 #[allow(clippy::unused_async)]
4653 pub(super) async fn sequence_alter_table(
4654 &mut self,
4655 ctx: &mut ExecuteContext,
4656 plan: plan::AlterTablePlan,
4657 ) -> Result<ExecuteResponse, AdapterError> {
4658 let plan::AlterTablePlan {
4659 relation_id,
4660 column_name,
4661 column_type,
4662 raw_sql_type,
4663 } = plan;
4664
4665 let (_, new_global_id) = self.allocate_user_id().await?;
4667 let ops = vec![catalog::Op::AlterAddColumn {
4668 id: relation_id,
4669 new_global_id,
4670 name: column_name,
4671 typ: column_type,
4672 sql: raw_sql_type,
4673 }];
4674
4675 self.catalog_transact_with_context(None, Some(ctx), ops)
4676 .await?;
4677
4678 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4679 }
4680
4681 #[instrument]
4683 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4684 &mut self,
4685 ctx: ExecuteContext,
4686 plan: AlterMaterializedViewApplyReplacementPlan,
4687 ) {
4688 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4698
4699 let plan_validity = PlanValidity::new(
4700 self.catalog().transient_revision(),
4701 BTreeSet::from_iter([id, replacement_id]),
4702 None,
4703 None,
4704 ctx.session().role_metadata().clone(),
4705 );
4706
4707 let target = self.catalog.get_entry(&id);
4708 let target_gid = target.latest_global_id();
4709
4710 let replacement = self.catalog.get_entry(&replacement_id);
4711 let replacement_gid = replacement.latest_global_id();
4712
4713 let target_upper = self
4714 .controller
4715 .storage_collections
4716 .collection_frontiers(target_gid)
4717 .expect("target MV exists")
4718 .write_frontier;
4719 let replacement_upper = self
4720 .controller
4721 .compute
4722 .collection_frontiers(replacement_gid, replacement.cluster_id())
4723 .expect("replacement MV exists")
4724 .write_frontier;
4725
4726 info!(
4727 %id, %replacement_id, ?target_upper, ?replacement_upper,
4728 "preparing materialized view replacement application",
4729 );
4730
4731 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4732 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4741 name: target.name().item.clone(),
4742 }));
4743 return;
4744 };
4745
4746 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4750
4751 self.install_storage_watch_set(
4755 ctx.session().conn_id().clone(),
4756 BTreeSet::from_iter([target_gid]),
4757 replacement_upper_ts,
4758 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4759 ctx: Some(ctx),
4760 otel_ctx: OpenTelemetryContext::obtain(),
4761 plan,
4762 plan_validity,
4763 }),
4764 )
4765 .expect("target collection exists");
4766 }
4767
4768 #[instrument]
4770 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4771 &mut self,
4772 mut ctx: AlterMaterializedViewReadyContext,
4773 ) {
4774 ctx.otel_ctx.attach_as_parent();
4775
4776 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4777
4778 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4783 ctx.retire(Err(err));
4784 return;
4785 }
4786
4787 info!(
4788 %id, %replacement_id,
4789 "finishing materialized view replacement application",
4790 );
4791
4792 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4793 match self
4794 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4795 .await
4796 {
4797 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4798 ObjectType::MaterializedView,
4799 ))),
4800 Err(err) => ctx.retire(Err(err)),
4801 }
4802 }
4803
4804 pub(super) async fn statistics_oracle(
4805 &self,
4806 session: &Session,
4807 source_ids: &BTreeSet<GlobalId>,
4808 query_as_of: &Antichain<Timestamp>,
4809 is_oneshot: bool,
4810 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4811 super::statistics_oracle(
4812 session,
4813 source_ids,
4814 query_as_of,
4815 is_oneshot,
4816 self.catalog().system_config(),
4817 self.controller.storage_collections.as_ref(),
4818 )
4819 .await
4820 }
4821}
4822
4823impl Coordinator {
4824 async fn process_dataflow_metainfo(
4826 &mut self,
4827 df_meta: DataflowMetainfo,
4828 export_id: GlobalId,
4829 ctx: Option<&mut ExecuteContext>,
4830 notice_ids: Vec<GlobalId>,
4831 ) -> Option<BuiltinTableAppendNotify> {
4832 if let Some(ctx) = ctx {
4834 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
4835 }
4836
4837 let df_meta = self
4839 .catalog()
4840 .render_notices(df_meta, notice_ids, Some(export_id));
4841
4842 if self.catalog().state().system_config().enable_mz_notices()
4845 && !df_meta.optimizer_notices.is_empty()
4846 {
4847 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4848 self.catalog().state().pack_optimizer_notices(
4849 &mut builtin_table_updates,
4850 df_meta.optimizer_notices.iter(),
4851 Diff::ONE,
4852 );
4853
4854 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4856
4857 Some(
4858 self.builtin_table_update()
4859 .execute(builtin_table_updates)
4860 .await
4861 .0,
4862 )
4863 } else {
4864 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4866
4867 None
4868 }
4869 }
4870}