1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::{Future, StreamExt, future};
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
24use mz_catalog::memory::error::ErrorKind;
25use mz_catalog::memory::objects::{
26 CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
27};
28use mz_expr::{
29 CollectionPlan, Eval, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
30};
31use mz_ore::cast::CastFrom;
32use mz_ore::collections::{CollectionExt, HashSet};
33use mz_ore::future::OreFutureExt;
34use mz_ore::task::{self, JoinHandle, spawn};
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_ore::{assert_none, instrument};
37use mz_repr::adt::jsonb::Jsonb;
38use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
39use mz_repr::explain::ExprHumanizer;
40use mz_repr::explain::json::json_string;
41use mz_repr::role_id::RoleId;
42use mz_repr::{
43 CatalogItemId, Datum, Diff, GlobalId, RelationVersion, RelationVersionSelector, Row, RowArena,
44 RowIterator, Timestamp,
45};
46use mz_secrets::SecretsReader;
47use mz_sql::ast::{
48 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
49 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
50 SqlServerConfigOptionName,
51};
52use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
53use mz_sql::catalog::{
54 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
55 CatalogItem as SqlCatalogItem, CatalogRole, CatalogSchema, CatalogTypeDetails,
56 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
57};
58use mz_sql::names::{
59 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
60 SchemaSpecifier, SystemObjectId,
61};
62use mz_sql::plan::{
63 AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
64 StatementContext,
65};
66use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
67use mz_storage_types::sinks::StorageSinkDesc;
68use mz_sql::plan::{
70 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS,
78 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85 WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::ExportDescription;
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_transform::dataflow::DataflowMetainfo;
93use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
94use smallvec::SmallVec;
95use timely::progress::Antichain;
96use tokio::sync::{oneshot, watch};
97use tracing::{Instrument, Span, info, warn};
98
99use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
100use crate::command::{ExecuteResponse, Response};
101use crate::coord::appends::{
102 BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn, UserWriteResponder,
103};
104use crate::coord::read_then_write::validate_read_then_write_dependencies;
105use crate::coord::sequencer::emit_optimizer_notices;
106use crate::coord::{
107 AlterConnectionValidationReady, AlterMaterializedViewReadyContext, AlterSinkReadyContext,
108 Coordinator, CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext,
109 ExplainContext, Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn,
110 PendingTxnResponse, PlanValidity, StageResult, Staged, StagedContext, TargetCluster,
111 WatchSetResponse, validate_ip_with_policy_rules,
112};
113use crate::error::AdapterError;
114use crate::notice::{AdapterNotice, DroppedInUseIndex};
115use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
116use crate::optimize::{self, Optimize};
117use crate::session::{
118 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
119 WriteLocks, WriteOp,
120};
121use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
122use crate::{PeekResponseUnary, ReadHolds};
123
124type RtrTimestampFuture = BoxFuture<'static, Result<Timestamp, StorageError>>;
126
127mod cluster;
128mod copy_from;
129mod create_index;
130mod create_materialized_view;
131mod create_view;
132mod explain_timestamp;
133mod peek;
134mod secret;
135mod subscribe;
136
137macro_rules! return_if_err {
140 ($expr:expr, $ctx:expr) => {
141 match $expr {
142 Ok(v) => v,
143 Err(e) => return $ctx.retire(Err(e.into())),
144 }
145 };
146}
147
148pub(super) use return_if_err;
149
150struct DropOps {
151 ops: Vec<catalog::Op>,
152 dropped_active_db: bool,
153 dropped_active_cluster: bool,
154 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
155}
156
157struct CreateSourceInner {
159 ops: Vec<catalog::Op>,
160 sources: Vec<(CatalogItemId, Source)>,
161 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
162}
163
164impl Coordinator {
165 pub(crate) async fn sequence_staged<S>(
172 &mut self,
173 mut ctx: S::Ctx,
174 parent_span: Span,
175 mut stage: S,
176 ) where
177 S: Staged + 'static,
178 S::Ctx: Send + 'static,
179 {
180 return_if_err!(stage.validity().check(self.catalog()), ctx);
181 loop {
182 let mut cancel_enabled = stage.cancel_enabled();
183 if let Some(session) = ctx.session() {
184 if cancel_enabled {
185 if let Some((_prev_tx, prev_rx)) = self
188 .connection_cancel_watches
189 .insert(session.conn_id().clone(), watch::channel(false))
190 {
191 let was_canceled = *prev_rx.borrow();
192 if was_canceled {
193 ctx.retire(Err(AdapterError::Canceled));
194 return;
195 }
196 }
197 } else {
198 self.connection_cancel_watches.remove(session.conn_id());
201 }
202 } else {
203 cancel_enabled = false
204 };
205 let next = stage
206 .stage(self, &mut ctx)
207 .instrument(parent_span.clone())
208 .await;
209 let res = return_if_err!(next, ctx);
210 stage = match res {
211 StageResult::Handle(handle) => {
212 let internal_cmd_tx = self.internal_cmd_tx.clone();
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
214 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
215 });
216 return;
217 }
218 StageResult::HandleRetire(handle) => {
219 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
220 ctx.retire(Ok(resp));
221 });
222 return;
223 }
224 StageResult::Response(resp) => {
225 ctx.retire(Ok(resp));
226 return;
227 }
228 StageResult::Immediate(stage) => *stage,
229 }
230 }
231 }
232
233 fn handle_spawn<C, T, F>(
236 &self,
237 ctx: C,
238 handle: JoinHandle<Result<T, AdapterError>>,
239 cancel_enabled: bool,
240 f: F,
241 ) where
242 C: StagedContext + Send + 'static,
243 T: Send + 'static,
244 F: FnOnce(C, T) + Send + 'static,
245 {
246 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
247 .session()
248 .and_then(|session| self.connection_cancel_watches.get(session.conn_id()))
249 {
250 let mut rx = rx.clone();
251 Box::pin(async move {
252 let _ = rx.wait_for(|v| *v).await;
254 ()
255 })
256 } else {
257 Box::pin(future::pending())
258 };
259 spawn(|| "sequence_staged", async move {
260 tokio::select! {
261 res = handle => {
262 let next = return_if_err!(res, ctx);
263 f(ctx, next);
264 }
265 _ = rx, if cancel_enabled => {
266 ctx.retire(Err(AdapterError::Canceled));
267 }
268 }
269 });
270 }
271
272 async fn create_source_inner(
273 &self,
274 session: &Session,
275 plans: Vec<plan::CreateSourcePlanBundle>,
276 ) -> Result<CreateSourceInner, AdapterError> {
277 let mut ops = vec![];
278 let mut sources = vec![];
279
280 let if_not_exists_ids = plans
281 .iter()
282 .filter_map(
283 |plan::CreateSourcePlanBundle {
284 item_id,
285 global_id: _,
286 plan,
287 resolved_ids: _,
288 available_source_references: _,
289 }| {
290 if plan.if_not_exists {
291 Some((*item_id, plan.name.clone()))
292 } else {
293 None
294 }
295 },
296 )
297 .collect::<BTreeMap<_, _>>();
298
299 for plan::CreateSourcePlanBundle {
300 item_id,
301 global_id,
302 mut plan,
303 resolved_ids,
304 available_source_references,
305 } in plans
306 {
307 let name = plan.name.clone();
308
309 if let mz_sql::plan::DataSourceDesc::Webhook {
311 validate_using: Some(validate),
312 ..
313 } = &mut plan.source.data_source
314 {
315 if let Err(reason) = validate.reduce_expression().await {
316 self.metrics
317 .webhook_validation_reduce_failures
318 .with_label_values(&[reason])
319 .inc();
320 return Err(AdapterError::Internal(format!(
321 "failed to reduce check expression, {reason}"
322 )));
323 }
324 }
325
326 let mut reference_ops = vec![];
329 if let Some(references) = &available_source_references {
330 reference_ops.push(catalog::Op::UpdateSourceReferences {
331 source_id: item_id,
332 references: references.clone().into(),
333 });
334 }
335
336 let source = Source::new(plan, global_id, resolved_ids, None, false);
337 ops.push(catalog::Op::CreateItem {
338 id: item_id,
339 name,
340 item: CatalogItem::Source(source.clone()),
341 owner_id: *session.current_role_id(),
342 });
343 sources.push((item_id, source));
344 ops.extend(reference_ops);
346 }
347
348 Ok(CreateSourceInner {
349 ops,
350 sources,
351 if_not_exists_ids,
352 })
353 }
354
355 pub(crate) fn plan_subsource(
363 &self,
364 session: &Session,
365 params: &mz_sql::plan::Params,
366 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
367 item_id: CatalogItemId,
368 global_id: GlobalId,
369 ) -> Result<CreateSourcePlanBundle, AdapterError> {
370 let catalog = self.catalog().for_session(session);
371 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
372
373 let (plan, _sql_impl_ids) = self.plan_statement(
374 session,
375 Statement::CreateSubsource(subsource_stmt),
376 params,
377 &resolved_ids,
378 )?;
379 let plan = match plan {
380 Plan::CreateSource(plan) => plan,
381 _ => unreachable!(),
382 };
383 Ok(CreateSourcePlanBundle {
384 item_id,
385 global_id,
386 plan,
387 resolved_ids,
388 available_source_references: None,
389 })
390 }
391
392 pub(crate) async fn plan_purified_alter_source_add_subsource(
394 &mut self,
395 session: &Session,
396 params: Params,
397 source_name: ResolvedItemName,
398 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
399 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
400 ) -> Result<(Plan, ResolvedIds), AdapterError> {
401 let mut subsource_plans = Vec::with_capacity(subsources.len());
402
403 let conn_catalog = self.catalog().for_system_session();
405 let pcx = plan::PlanContext::zero();
406 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
407
408 let entry = self.catalog().get_entry(source_name.item_id());
409 let source = entry.source().ok_or_else(|| {
410 AdapterError::internal(
411 "plan alter source",
412 format!("expected Source found {entry:?}"),
413 )
414 })?;
415
416 let item_id = entry.id();
417 let ingestion_id = source.global_id();
418 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
419
420 let ids = self
421 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
422 .await?;
423 for (subsource_stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
424 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
425 subsource_plans.push(s);
426 }
427
428 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
429 subsources: subsource_plans,
430 options,
431 };
432
433 Ok((
434 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
435 item_id,
436 ingestion_id,
437 action,
438 }),
439 ResolvedIds::empty(),
440 ))
441 }
442
443 pub(crate) fn plan_purified_alter_source_refresh_references(
445 &self,
446 _session: &Session,
447 _params: Params,
448 source_name: ResolvedItemName,
449 available_source_references: plan::SourceReferences,
450 ) -> Result<(Plan, ResolvedIds), AdapterError> {
451 let entry = self.catalog().get_entry(source_name.item_id());
452 let source = entry.source().ok_or_else(|| {
453 AdapterError::internal(
454 "plan alter source",
455 format!("expected Source found {entry:?}"),
456 )
457 })?;
458 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
459 references: available_source_references,
460 };
461
462 Ok((
463 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
464 item_id: entry.id(),
465 ingestion_id: source.global_id(),
466 action,
467 }),
468 ResolvedIds::empty(),
469 ))
470 }
471
472 pub(crate) async fn plan_purified_create_source(
476 &mut self,
477 ctx: &ExecuteContext,
478 params: Params,
479 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
480 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
481 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
482 available_source_references: plan::SourceReferences,
483 ) -> Result<(Plan, ResolvedIds), AdapterError> {
484 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
485
486 if let Some(progress_stmt) = progress_stmt {
488 assert_none!(progress_stmt.of_source);
493 let (item_id, global_id) = self.allocate_user_id().await?;
494 let progress_plan =
495 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
496 let progress_full_name = self
497 .catalog()
498 .resolve_full_name(&progress_plan.plan.name, None);
499 let progress_subsource = ResolvedItemName::Item {
500 id: progress_plan.item_id,
501 qualifiers: progress_plan.plan.name.qualifiers.clone(),
502 full_name: progress_full_name,
503 print_id: true,
504 version: RelationVersionSelector::Latest,
505 };
506
507 create_source_plans.push(progress_plan);
508
509 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
510 }
511
512 let catalog = self.catalog().for_session(ctx.session());
513 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
514
515 let propagated_with_options: Vec<_> = source_stmt
516 .with_options
517 .iter()
518 .filter_map(|opt| match opt.name {
519 CreateSourceOptionName::TimestampInterval => None,
520 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
521 name: CreateSubsourceOptionName::RetainHistory,
522 value: opt.value.clone(),
523 }),
524 })
525 .collect();
526
527 let source_plan = match self.plan_statement(
529 ctx.session(),
530 Statement::CreateSource(source_stmt),
531 ¶ms,
532 &resolved_ids,
533 )? {
534 (Plan::CreateSource(plan), _sql_impl_ids) => plan,
535 (p, _) => unreachable!("s must be CreateSourcePlan but got {:?}", p),
536 };
537
538 let (item_id, global_id) = self.allocate_user_id().await?;
539
540 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
541 let of_source = ResolvedItemName::Item {
542 id: item_id,
543 qualifiers: source_plan.name.qualifiers.clone(),
544 full_name: source_full_name,
545 print_id: true,
546 version: RelationVersionSelector::Latest,
547 };
548
549 let conn_catalog = self.catalog().for_system_session();
551 let pcx = plan::PlanContext::zero();
552 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
553
554 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
555
556 for subsource_stmt in subsource_stmts.iter_mut() {
557 subsource_stmt
558 .with_options
559 .extend(propagated_with_options.iter().cloned())
560 }
561
562 create_source_plans.push(CreateSourcePlanBundle {
563 item_id,
564 global_id,
565 plan: source_plan,
566 resolved_ids: resolved_ids.clone(),
567 available_source_references: Some(available_source_references),
568 });
569
570 let ids = self
572 .allocate_user_ids(u64::cast_from(subsource_stmts.len()))
573 .await?;
574 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids) {
575 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
576 create_source_plans.push(plan);
577 }
578
579 Ok((
580 Plan::CreateSources(create_source_plans),
581 ResolvedIds::empty(),
582 ))
583 }
584
585 #[instrument]
586 pub(super) async fn sequence_create_source(
587 &mut self,
588 ctx: &mut ExecuteContext,
589 plans: Vec<plan::CreateSourcePlanBundle>,
590 ) -> Result<ExecuteResponse, AdapterError> {
591 let CreateSourceInner {
592 ops,
593 sources,
594 if_not_exists_ids,
595 } = self.create_source_inner(ctx.session(), plans).await?;
596
597 let transact_result = self
598 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
599 .await;
600
601 for (item_id, source) in &sources {
603 if matches!(source.data_source, DataSourceDesc::Webhook { .. }) {
604 if let Some(url) = self.catalog().state().try_get_webhook_url(item_id) {
605 ctx.session()
606 .add_notice(AdapterNotice::WebhookSourceCreated { url });
607 }
608 }
609 }
610
611 match transact_result {
612 Ok(()) => Ok(ExecuteResponse::CreatedSource),
613 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
614 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
615 })) if if_not_exists_ids.contains_key(&id) => {
616 ctx.session()
617 .add_notice(AdapterNotice::ObjectAlreadyExists {
618 name: if_not_exists_ids[&id].item.clone(),
619 ty: "source",
620 });
621 Ok(ExecuteResponse::CreatedSource)
622 }
623 Err(err) => Err(err),
624 }
625 }
626
627 async fn check_connection_secret_content_guards(
631 &self,
632 details: &ConnectionDetails,
633 ) -> Result<(), AdapterError> {
634 for (secret_id, guard) in details.secret_content_guards() {
635 let contents = self.caching_secrets_reader.read_string(secret_id).await?;
636 guard(&contents)?;
637 }
638 Ok(())
639 }
640
641 pub(super) fn check_secret_content_guards_of_dependents(
646 &self,
647 secret_id: CatalogItemId,
648 contents: &str,
649 ) -> Result<(), AdapterError> {
650 for dependent_id in self.catalog().get_entry(&secret_id).used_by() {
651 if let CatalogItem::Connection(conn) = self.catalog().get_entry(dependent_id).item() {
652 for (guarded_id, guard) in conn.details.secret_content_guards() {
653 if guarded_id == secret_id {
654 guard(contents)?;
655 }
656 }
657 }
658 }
659 Ok(())
660 }
661
662 #[instrument]
663 pub(super) async fn sequence_create_connection(
664 &mut self,
665 mut ctx: ExecuteContext,
666 plan: plan::CreateConnectionPlan,
667 resolved_ids: ResolvedIds,
668 ) {
669 let (connection_id, connection_gid) = match self.allocate_user_id().await {
670 Ok(item_id) => item_id,
671 Err(err) => return ctx.retire(Err(err)),
672 };
673
674 match &plan.connection.details {
675 ConnectionDetails::Ssh { key_1, key_2, .. } => {
676 let key_1 = match key_1.as_key_pair() {
677 Some(key_1) => key_1.clone(),
678 None => {
679 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
680 "the PUBLIC KEY 1 option cannot be explicitly specified"
681 ))));
682 }
683 };
684
685 let key_2 = match key_2.as_key_pair() {
686 Some(key_2) => key_2.clone(),
687 None => {
688 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
689 "the PUBLIC KEY 2 option cannot be explicitly specified"
690 ))));
691 }
692 };
693
694 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
695 let secret = key_set.to_bytes();
696 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
697 return ctx.retire(Err(err.into()));
698 }
699 self.caching_secrets_reader.invalidate(connection_id);
700 }
701 _ => (),
702 };
703
704 if let Err(err) = self
707 .check_connection_secret_content_guards(&plan.connection.details)
708 .await
709 {
710 return ctx.retire(Err(err));
711 }
712
713 if plan.validate {
714 let internal_cmd_tx = self.internal_cmd_tx.clone();
715 let transient_revision = self.catalog().transient_revision();
716 let conn_id = ctx.session().conn_id().clone();
717 let otel_ctx = OpenTelemetryContext::obtain();
718 let role_metadata = ctx.session().role_metadata().clone();
719
720 let connection = plan
721 .connection
722 .details
723 .to_connection()
724 .into_inline_connection(self.catalog().state());
725
726 let current_storage_parameters = self.controller.storage.config().clone();
727 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
728 let result = match std::panic::AssertUnwindSafe(
729 connection.validate(connection_id, ¤t_storage_parameters),
730 )
731 .ore_catch_unwind()
732 .await
733 {
734 Ok(Ok(())) => Ok(plan),
735 Ok(Err(err)) => Err(err.into()),
736 Err(_panic) => {
737 tracing::error!("connection validation panicked");
738 Err(AdapterError::Internal(
739 "connection validation panicked".into(),
740 ))
741 }
742 };
743
744 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
746 CreateConnectionValidationReady {
747 ctx,
748 result,
749 connection_id,
750 connection_gid,
751 plan_validity: PlanValidity::new(
752 transient_revision,
753 resolved_ids.items().copied().collect(),
754 None,
755 None,
756 role_metadata,
757 ),
758 otel_ctx,
759 resolved_ids: resolved_ids.clone(),
760 },
761 ));
762 if let Err(e) = result {
763 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
764 }
765 });
766 } else {
767 let result = self
768 .sequence_create_connection_stage_finish(
769 &mut ctx,
770 connection_id,
771 connection_gid,
772 plan,
773 resolved_ids,
774 )
775 .await;
776 ctx.retire(result);
777 }
778 }
779
780 #[instrument]
781 pub(crate) async fn sequence_create_connection_stage_finish(
782 &mut self,
783 ctx: &mut ExecuteContext,
784 connection_id: CatalogItemId,
785 connection_gid: GlobalId,
786 plan: plan::CreateConnectionPlan,
787 resolved_ids: ResolvedIds,
788 ) -> Result<ExecuteResponse, AdapterError> {
789 let ops = vec![catalog::Op::CreateItem {
790 id: connection_id,
791 name: plan.name.clone(),
792 item: CatalogItem::Connection(Connection {
793 create_sql: plan.connection.create_sql,
794 global_id: connection_gid,
795 details: plan.connection.details.clone(),
796 resolved_ids,
797 }),
798 owner_id: *ctx.session().current_role_id(),
799 }];
800
801 let conn_id = ctx.session().conn_id().clone();
804 let transact_result = self
805 .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
806 .await;
807
808 match transact_result {
809 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
810 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
811 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
812 })) if plan.if_not_exists => {
813 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
816 if let Err(e) = self.secrets_controller.delete(connection_id).await {
817 tracing::warn!(
818 "Dropping SSH secret for existing connection has encountered an error: {}",
819 e
820 );
821 } else {
822 self.caching_secrets_reader.invalidate(connection_id);
823 }
824 }
825 ctx.session()
826 .add_notice(AdapterNotice::ObjectAlreadyExists {
827 name: plan.name.item,
828 ty: "connection",
829 });
830 Ok(ExecuteResponse::CreatedConnection)
831 }
832 Err(err) => {
833 if matches!(plan.connection.details, ConnectionDetails::Ssh { .. }) {
836 if let Err(e) = self.secrets_controller.delete(connection_id).await {
837 tracing::warn!(
838 "Dropping SSH secret for failed connection has encountered an error: {}",
839 e
840 );
841 } else {
842 self.caching_secrets_reader.invalidate(connection_id);
843 }
844 }
845 Err(err)
846 }
847 }
848 }
849
850 #[instrument]
851 pub(super) async fn sequence_create_database(
852 &mut self,
853 session: &Session,
854 plan: plan::CreateDatabasePlan,
855 ) -> Result<ExecuteResponse, AdapterError> {
856 let ops = vec![catalog::Op::CreateDatabase {
857 name: plan.name.clone(),
858 owner_id: *session.current_role_id(),
859 }];
860 match self.catalog_transact(Some(session), ops).await {
861 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
862 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
863 kind: ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
864 })) if plan.if_not_exists => {
865 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
866 Ok(ExecuteResponse::CreatedDatabase)
867 }
868 Err(err) => Err(err),
869 }
870 }
871
872 #[instrument]
873 pub(super) async fn sequence_create_schema(
874 &mut self,
875 session: &Session,
876 plan: plan::CreateSchemaPlan,
877 ) -> Result<ExecuteResponse, AdapterError> {
878 let op = catalog::Op::CreateSchema {
879 database_id: plan.database_spec,
880 schema_name: plan.schema_name.clone(),
881 owner_id: *session.current_role_id(),
882 };
883 match self.catalog_transact(Some(session), vec![op]).await {
884 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
885 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
886 kind: ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
887 })) if plan.if_not_exists => {
888 session.add_notice(AdapterNotice::SchemaAlreadyExists {
889 name: plan.schema_name,
890 });
891 Ok(ExecuteResponse::CreatedSchema)
892 }
893 Err(err) => Err(err),
894 }
895 }
896
897 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
899 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
900 if attributes.superuser.is_some() || attributes.password.is_some() {
901 return Err(AdapterError::UnavailableFeature {
902 feature: "SUPERUSER and PASSWORD attributes".to_string(),
903 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
904 });
905 }
906 }
907 Ok(())
908 }
909
910 #[instrument]
911 pub(super) async fn sequence_create_role(
912 &mut self,
913 conn_id: Option<&ConnectionId>,
914 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
915 ) -> Result<ExecuteResponse, AdapterError> {
916 self.validate_role_attributes(&attributes.clone())?;
917 let op = catalog::Op::CreateRole { name, attributes };
918 self.catalog_transact_with_context(conn_id, None, vec![op])
919 .await
920 .map(|_| ExecuteResponse::CreatedRole)
921 }
922
923 #[instrument]
924 pub(super) async fn sequence_create_network_policy(
925 &mut self,
926 session: &Session,
927 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
928 ) -> Result<ExecuteResponse, AdapterError> {
929 let op = catalog::Op::CreateNetworkPolicy {
930 rules,
931 name,
932 owner_id: *session.current_role_id(),
933 };
934 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
935 .await
936 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
937 }
938
939 #[instrument]
940 pub(super) async fn sequence_alter_network_policy(
941 &mut self,
942 session: &Session,
943 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
944 ) -> Result<ExecuteResponse, AdapterError> {
945 let current_network_policy_name =
947 self.catalog().system_config().default_network_policy_name();
948 if current_network_policy_name == name {
950 self.validate_alter_network_policy(session, &rules)?;
951 }
952
953 let op = catalog::Op::AlterNetworkPolicy {
954 id,
955 rules,
956 name,
957 owner_id: *session.current_role_id(),
958 };
959 self.catalog_transact_with_context(Some(session.conn_id()), None, vec![op])
960 .await
961 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
962 }
963
964 #[instrument]
965 pub(super) async fn sequence_create_table(
966 &mut self,
967 ctx: &mut ExecuteContext,
968 plan: plan::CreateTablePlan,
969 resolved_ids: ResolvedIds,
970 ) -> Result<ExecuteResponse, AdapterError> {
971 let plan::CreateTablePlan {
972 name,
973 table,
974 if_not_exists,
975 } = plan;
976
977 let conn_id = if table.temporary {
978 Some(ctx.session().conn_id())
979 } else {
980 None
981 };
982 let (table_id, global_id) = self.allocate_user_id().await?;
983 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
984
985 let data_source = match table.data_source {
986 plan::TableDataSource::TableWrites { defaults } => {
987 TableDataSource::TableWrites { defaults }
988 }
989 plan::TableDataSource::DataSource {
990 desc: data_source_plan,
991 timeline,
992 } => match data_source_plan {
993 plan::DataSourceDesc::IngestionExport {
994 ingestion_id,
995 external_reference,
996 details,
997 data_config,
998 } => TableDataSource::DataSource {
999 desc: DataSourceDesc::IngestionExport {
1000 ingestion_id,
1001 external_reference,
1002 details,
1003 data_config,
1004 },
1005 timeline,
1006 },
1007 plan::DataSourceDesc::Webhook {
1008 validate_using,
1009 body_format,
1010 headers,
1011 cluster_id,
1012 } => TableDataSource::DataSource {
1013 desc: DataSourceDesc::Webhook {
1014 validate_using,
1015 body_format,
1016 headers,
1017 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1018 },
1019 timeline,
1020 },
1021 o => {
1022 unreachable!("CREATE TABLE data source got {:?}", o)
1023 }
1024 },
1025 };
1026
1027 let is_webhook = if let TableDataSource::DataSource {
1028 desc: DataSourceDesc::Webhook { .. },
1029 timeline: _,
1030 } = &data_source
1031 {
1032 true
1033 } else {
1034 false
1035 };
1036
1037 let table = Table {
1038 create_sql: Some(table.create_sql),
1039 desc: table.desc,
1040 collections,
1041 conn_id: conn_id.cloned(),
1042 resolved_ids,
1043 custom_logical_compaction_window: table.compaction_window,
1044 is_retained_metrics_object: false,
1045 data_source,
1046 };
1047 let ops = vec![catalog::Op::CreateItem {
1048 id: table_id,
1049 name: name.clone(),
1050 item: CatalogItem::Table(table.clone()),
1051 owner_id: *ctx.session().current_role_id(),
1052 }];
1053
1054 let catalog_result = self
1055 .catalog_transact_with_ddl_transaction(ctx, ops, |_, _| Box::pin(async {}))
1056 .await;
1057
1058 if is_webhook {
1059 if let Some(url) = self.catalog().state().try_get_webhook_url(&table_id) {
1062 ctx.session()
1063 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1064 }
1065 }
1066
1067 match catalog_result {
1068 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1069 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1070 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1071 })) if if_not_exists => {
1072 ctx.session_mut()
1073 .add_notice(AdapterNotice::ObjectAlreadyExists {
1074 name: name.item,
1075 ty: "table",
1076 });
1077 Ok(ExecuteResponse::CreatedTable)
1078 }
1079 Err(err) => Err(err),
1080 }
1081 }
1082
1083 #[instrument]
1084 pub(super) async fn sequence_create_sink(
1085 &mut self,
1086 ctx: ExecuteContext,
1087 plan: plan::CreateSinkPlan,
1088 resolved_ids: ResolvedIds,
1089 ) {
1090 let plan::CreateSinkPlan {
1091 name,
1092 sink,
1093 with_snapshot,
1094 if_not_exists,
1095 in_cluster,
1096 } = plan;
1097
1098 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
1100
1101 let catalog_sink = Sink {
1102 create_sql: sink.create_sql,
1103 global_id,
1104 from: sink.from,
1105 connection: sink.connection,
1106 envelope: sink.envelope,
1107 version: sink.version,
1108 with_snapshot,
1109 resolved_ids,
1110 cluster_id: in_cluster,
1111 commit_interval: sink.commit_interval,
1112 };
1113
1114 let ops = vec![catalog::Op::CreateItem {
1115 id: item_id,
1116 name: name.clone(),
1117 item: CatalogItem::Sink(catalog_sink.clone()),
1118 owner_id: *ctx.session().current_role_id(),
1119 }];
1120
1121 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1122
1123 match result {
1124 Ok(()) => {}
1125 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1126 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1127 })) if if_not_exists => {
1128 ctx.session()
1129 .add_notice(AdapterNotice::ObjectAlreadyExists {
1130 name: name.item,
1131 ty: "sink",
1132 });
1133 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1134 return;
1135 }
1136 Err(e) => {
1137 ctx.retire(Err(e));
1138 return;
1139 }
1140 };
1141
1142 self.create_storage_export(global_id, &catalog_sink)
1143 .await
1144 .unwrap_or_terminate("cannot fail to create exports");
1145
1146 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1147 .await;
1148
1149 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1150 }
1151
1152 pub(super) fn validate_system_column_references(
1175 &self,
1176 uses_ambiguous_columns: bool,
1177 depends_on: &BTreeSet<GlobalId>,
1178 ) -> Result<(), AdapterError> {
1179 if uses_ambiguous_columns
1180 && depends_on
1181 .iter()
1182 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1183 {
1184 Err(AdapterError::AmbiguousSystemColumnReference)
1185 } else {
1186 Ok(())
1187 }
1188 }
1189
1190 #[instrument]
1191 pub(super) async fn sequence_create_type(
1192 &mut self,
1193 session: &Session,
1194 plan: plan::CreateTypePlan,
1195 resolved_ids: ResolvedIds,
1196 ) -> Result<ExecuteResponse, AdapterError> {
1197 let (item_id, global_id) = self.allocate_user_id().await?;
1198 plan.typ
1200 .inner
1201 .desc(&self.catalog().for_session(session))
1202 .map_err(AdapterError::from)?;
1203 let typ = Type {
1204 create_sql: Some(plan.typ.create_sql),
1205 global_id,
1206 details: CatalogTypeDetails {
1207 array_id: None,
1208 typ: plan.typ.inner,
1209 pg_metadata: None,
1210 },
1211 resolved_ids,
1212 };
1213 let op = catalog::Op::CreateItem {
1214 id: item_id,
1215 name: plan.name,
1216 item: CatalogItem::Type(typ),
1217 owner_id: *session.current_role_id(),
1218 };
1219 match self.catalog_transact(Some(session), vec![op]).await {
1220 Ok(()) => Ok(ExecuteResponse::CreatedType),
1221 Err(err) => Err(err),
1222 }
1223 }
1224
1225 #[instrument]
1226 pub(super) async fn sequence_comment_on(
1227 &mut self,
1228 session: &Session,
1229 plan: plan::CommentPlan,
1230 ) -> Result<ExecuteResponse, AdapterError> {
1231 let op = catalog::Op::Comment {
1232 object_id: plan.object_id,
1233 sub_component: plan.sub_component,
1234 comment: plan.comment,
1235 };
1236 self.catalog_transact(Some(session), vec![op]).await?;
1237 Ok(ExecuteResponse::Comment)
1238 }
1239
1240 #[instrument]
1241 pub(super) async fn sequence_drop_objects(
1242 &mut self,
1243 ctx: &mut ExecuteContext,
1244 plan::DropObjectsPlan {
1245 drop_ids,
1246 object_type,
1247 referenced_ids,
1248 }: plan::DropObjectsPlan,
1249 ) -> Result<ExecuteResponse, AdapterError> {
1250 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1251 let mut objects = Vec::new();
1252 for obj_id in &drop_ids {
1253 if !referenced_ids_hashset.contains(obj_id) {
1254 let object_info = ErrorMessageObjectDescription::from_id(
1255 obj_id,
1256 &self.catalog().for_session(ctx.session()),
1257 )
1258 .to_string();
1259 objects.push(object_info);
1260 }
1261 }
1262
1263 if !objects.is_empty() {
1264 ctx.session()
1265 .add_notice(AdapterNotice::CascadeDroppedObject { objects });
1266 }
1267
1268 let expr_cache_invalidate_ids: BTreeSet<_> = drop_ids
1270 .iter()
1271 .filter_map(|id| match id {
1272 ObjectId::Item(item_id) => Some(self.catalog().get_entry(item_id).global_ids()),
1273 _ => None,
1274 })
1275 .flatten()
1276 .collect();
1277
1278 let DropOps {
1279 ops,
1280 dropped_active_db,
1281 dropped_active_cluster,
1282 dropped_in_use_indexes,
1283 } = self.sequence_drop_common(ctx.session(), drop_ids)?;
1284
1285 self.catalog_transact_with_context(None, Some(ctx), ops)
1286 .await?;
1287
1288 if !expr_cache_invalidate_ids.is_empty() {
1290 let _fut = self.catalog().update_expression_cache(
1291 Default::default(),
1292 Default::default(),
1293 expr_cache_invalidate_ids,
1294 );
1295 }
1296
1297 fail::fail_point!("after_sequencer_drop_replica");
1298
1299 if dropped_active_db {
1300 ctx.session()
1301 .add_notice(AdapterNotice::DroppedActiveDatabase {
1302 name: ctx.session().vars().database().to_string(),
1303 });
1304 }
1305 if dropped_active_cluster {
1306 ctx.session()
1307 .add_notice(AdapterNotice::DroppedActiveCluster {
1308 name: ctx.session().vars().cluster().to_string(),
1309 });
1310 }
1311 for dropped_in_use_index in dropped_in_use_indexes {
1312 ctx.session()
1313 .add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1314 self.metrics
1315 .optimization_notices
1316 .with_label_values(&["DroppedInUseIndex"])
1317 .inc_by(1);
1318 }
1319 Ok(ExecuteResponse::DroppedObject(object_type))
1320 }
1321
1322 fn validate_dropped_role_ownership(
1323 &self,
1324 session: &Session,
1325 dropped_roles: &BTreeMap<RoleId, &str>,
1326 ) -> Result<(), AdapterError> {
1327 fn privilege_check(
1328 privileges: &PrivilegeMap,
1329 dropped_roles: &BTreeMap<RoleId, &str>,
1330 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1331 object_id: &SystemObjectId,
1332 catalog: &ConnCatalog,
1333 ) {
1334 for privilege in privileges.all_values() {
1335 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1336 let grantor_name = catalog.get_role(&privilege.grantor).name();
1337 let object_description =
1338 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1339 dependent_objects
1340 .entry(role_name.to_string())
1341 .or_default()
1342 .push(format!(
1343 "privileges on {object_description} granted by {grantor_name}",
1344 ));
1345 }
1346 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1347 let grantee_name = catalog.get_role(&privilege.grantee).name();
1348 let object_description =
1349 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1350 dependent_objects
1351 .entry(role_name.to_string())
1352 .or_default()
1353 .push(format!(
1354 "privileges granted on {object_description} to {grantee_name}"
1355 ));
1356 }
1357 }
1358 }
1359
1360 let catalog = self.catalog().for_session(session);
1361 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1362 for entry in self.catalog.entries() {
1363 let id = SystemObjectId::Object(entry.id().into());
1364 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1365 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1366 dependent_objects
1367 .entry(role_name.to_string())
1368 .or_default()
1369 .push(format!("owner of {object_description}"));
1370 }
1371 privilege_check(
1372 entry.privileges(),
1373 dropped_roles,
1374 &mut dependent_objects,
1375 &id,
1376 &catalog,
1377 );
1378 }
1379 for database in self.catalog.databases() {
1380 let database_id = SystemObjectId::Object(database.id().into());
1381 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1382 let object_description =
1383 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1384 dependent_objects
1385 .entry(role_name.to_string())
1386 .or_default()
1387 .push(format!("owner of {object_description}"));
1388 }
1389 privilege_check(
1390 &database.privileges,
1391 dropped_roles,
1392 &mut dependent_objects,
1393 &database_id,
1394 &catalog,
1395 );
1396 for schema in database.schemas_by_id.values() {
1397 let schema_id = SystemObjectId::Object(
1398 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1399 );
1400 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1401 let object_description =
1402 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1403 dependent_objects
1404 .entry(role_name.to_string())
1405 .or_default()
1406 .push(format!("owner of {object_description}"));
1407 }
1408 privilege_check(
1409 &schema.privileges,
1410 dropped_roles,
1411 &mut dependent_objects,
1412 &schema_id,
1413 &catalog,
1414 );
1415 }
1416 }
1417 for cluster in self.catalog.clusters() {
1418 let cluster_id = SystemObjectId::Object(cluster.id().into());
1419 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1420 let object_description =
1421 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1422 dependent_objects
1423 .entry(role_name.to_string())
1424 .or_default()
1425 .push(format!("owner of {object_description}"));
1426 }
1427 privilege_check(
1428 &cluster.privileges,
1429 dropped_roles,
1430 &mut dependent_objects,
1431 &cluster_id,
1432 &catalog,
1433 );
1434 for replica in cluster.replicas() {
1435 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1436 let replica_id =
1437 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1438 let object_description =
1439 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1440 dependent_objects
1441 .entry(role_name.to_string())
1442 .or_default()
1443 .push(format!("owner of {object_description}"));
1444 }
1445 }
1446 }
1447 privilege_check(
1448 self.catalog().system_privileges(),
1449 dropped_roles,
1450 &mut dependent_objects,
1451 &SystemObjectId::System,
1452 &catalog,
1453 );
1454 for (default_privilege_object, default_privilege_acl_items) in
1455 self.catalog.default_privileges()
1456 {
1457 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1458 dependent_objects
1459 .entry(role_name.to_string())
1460 .or_default()
1461 .push(format!(
1462 "default privileges on {}S created by {}",
1463 default_privilege_object.object_type, role_name
1464 ));
1465 }
1466 for default_privilege_acl_item in default_privilege_acl_items {
1467 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1468 dependent_objects
1469 .entry(role_name.to_string())
1470 .or_default()
1471 .push(format!(
1472 "default privileges on {}S granted to {}",
1473 default_privilege_object.object_type, role_name
1474 ));
1475 }
1476 }
1477 }
1478
1479 if !dependent_objects.is_empty() {
1480 Err(AdapterError::DependentObject(dependent_objects))
1481 } else {
1482 Ok(())
1483 }
1484 }
1485
1486 #[instrument]
1487 pub(super) async fn sequence_drop_owned(
1488 &mut self,
1489 session: &Session,
1490 plan: plan::DropOwnedPlan,
1491 ) -> Result<ExecuteResponse, AdapterError> {
1492 for role_id in &plan.role_ids {
1493 self.catalog().ensure_not_reserved_role(role_id)?;
1494 }
1495
1496 let mut privilege_revokes = plan.privilege_revokes;
1497
1498 let session_catalog = self.catalog().for_session(session);
1500 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1501 && !session.is_superuser()
1502 {
1503 let role_membership =
1505 session_catalog.collect_role_membership(session.current_role_id());
1506 let invalid_revokes: BTreeSet<_> = privilege_revokes
1507 .extract_if(.., |(_, privilege)| {
1508 !role_membership.contains(&privilege.grantor)
1509 })
1510 .map(|(object_id, _)| object_id)
1511 .collect();
1512 for invalid_revoke in invalid_revokes {
1513 let object_description =
1514 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1515 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1516 }
1517 }
1518
1519 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1520 catalog::Op::UpdatePrivilege {
1521 target_id: object_id,
1522 privilege,
1523 variant: UpdatePrivilegeVariant::Revoke,
1524 }
1525 });
1526 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1527 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1528 privilege_object,
1529 privilege_acl_item,
1530 variant: UpdatePrivilegeVariant::Revoke,
1531 },
1532 );
1533 let DropOps {
1534 ops: drop_ops,
1535 dropped_active_db,
1536 dropped_active_cluster,
1537 dropped_in_use_indexes,
1538 } = self.sequence_drop_common(session, plan.drop_ids)?;
1539
1540 let ops = privilege_revoke_ops
1541 .chain(default_privilege_revoke_ops)
1542 .chain(drop_ops.into_iter())
1543 .collect();
1544
1545 self.catalog_transact(Some(session), ops).await?;
1546
1547 if dropped_active_db {
1548 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1549 name: session.vars().database().to_string(),
1550 });
1551 }
1552 if dropped_active_cluster {
1553 session.add_notice(AdapterNotice::DroppedActiveCluster {
1554 name: session.vars().cluster().to_string(),
1555 });
1556 }
1557 for dropped_in_use_index in dropped_in_use_indexes {
1558 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1559 }
1560 Ok(ExecuteResponse::DroppedOwned)
1561 }
1562
1563 fn sequence_drop_common(
1564 &self,
1565 session: &Session,
1566 ids: Vec<ObjectId>,
1567 ) -> Result<DropOps, AdapterError> {
1568 let mut dropped_active_db = false;
1569 let mut dropped_active_cluster = false;
1570 let mut dropped_in_use_indexes = Vec::new();
1571 let mut dropped_roles = BTreeMap::new();
1572 let mut dropped_databases = BTreeSet::new();
1573 let mut dropped_schemas = BTreeSet::new();
1574 let mut role_revokes = BTreeSet::new();
1578 let mut default_privilege_revokes = BTreeSet::new();
1581
1582 let mut clusters_to_drop = BTreeSet::new();
1584
1585 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1586 for id in &ids {
1587 match id {
1588 ObjectId::Database(id) => {
1589 let name = self.catalog().get_database(id).name();
1590 if name == session.vars().database() {
1591 dropped_active_db = true;
1592 }
1593 dropped_databases.insert(id);
1594 }
1595 ObjectId::Schema((_, spec)) => {
1596 if let SchemaSpecifier::Id(id) = spec {
1597 dropped_schemas.insert(id);
1598 }
1599 }
1600 ObjectId::Cluster(id) => {
1601 clusters_to_drop.insert(*id);
1602 if let Some(active_id) = self
1603 .catalog()
1604 .active_cluster(session)
1605 .ok()
1606 .map(|cluster| cluster.id())
1607 {
1608 if id == &active_id {
1609 dropped_active_cluster = true;
1610 }
1611 }
1612 }
1613 ObjectId::Role(id) => {
1614 let role = self.catalog().get_role(id);
1615 let name = role.name();
1616 dropped_roles.insert(*id, name);
1617 for (group_id, grantor_id) in &role.membership.map {
1619 role_revokes.insert((*group_id, *id, *grantor_id));
1620 }
1621 }
1622 ObjectId::Item(id) => {
1623 if let Some(index) = self.catalog().get_entry(id).index() {
1624 let humanizer = self.catalog().for_session(session);
1625 let dependants = self
1626 .controller
1627 .compute
1628 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1629 .ok()
1630 .into_iter()
1631 .flatten()
1632 .filter(|dependant_id| {
1633 if dependant_id.is_transient() {
1640 return false;
1641 }
1642 let Some(dependent_id) = humanizer
1644 .try_get_item_by_global_id(dependant_id)
1645 .map(|item| item.id())
1646 else {
1647 return false;
1648 };
1649 !ids_set.contains(&ObjectId::Item(dependent_id))
1652 })
1653 .flat_map(|dependant_id| {
1654 humanizer.humanize_id(dependant_id)
1658 })
1659 .collect_vec();
1660 if !dependants.is_empty() {
1661 dropped_in_use_indexes.push(DroppedInUseIndex {
1662 index_name: humanizer
1663 .humanize_id(index.global_id())
1664 .unwrap_or_else(|| id.to_string()),
1665 dependant_objects: dependants,
1666 });
1667 }
1668 }
1669 }
1670 _ => {}
1671 }
1672 }
1673
1674 for id in &ids {
1675 match id {
1676 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1680 if !clusters_to_drop.contains(cluster_id) {
1681 let cluster = self.catalog.get_cluster(*cluster_id);
1682 if cluster.is_managed() {
1683 let replica =
1684 cluster.replica(*replica_id).expect("Catalog out of sync");
1685 if !replica.config.location.internal() {
1686 coord_bail!("cannot drop replica of managed cluster");
1687 }
1688 }
1689 }
1690 }
1691 _ => {}
1692 }
1693 }
1694
1695 for role_id in dropped_roles.keys() {
1696 self.catalog().ensure_not_reserved_role(role_id)?;
1697 }
1698 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1699 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1701 for role in self.catalog().user_roles() {
1702 for dropped_role_id in
1703 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1704 {
1705 role_revokes.insert((
1706 **dropped_role_id,
1707 role.id(),
1708 *role
1709 .membership
1710 .map
1711 .get(*dropped_role_id)
1712 .expect("included in keys above"),
1713 ));
1714 }
1715 }
1716
1717 for (default_privilege_object, default_privilege_acls) in
1718 self.catalog().default_privileges()
1719 {
1720 if matches!(
1721 &default_privilege_object.database_id,
1722 Some(database_id) if dropped_databases.contains(database_id),
1723 ) || matches!(
1724 &default_privilege_object.schema_id,
1725 Some(schema_id) if dropped_schemas.contains(schema_id),
1726 ) {
1727 for default_privilege_acl in default_privilege_acls {
1728 default_privilege_revokes.insert((
1729 default_privilege_object.clone(),
1730 default_privilege_acl.clone(),
1731 ));
1732 }
1733 }
1734 }
1735
1736 let ops = role_revokes
1737 .into_iter()
1738 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1739 role_id,
1740 member_id,
1741 grantor_id,
1742 })
1743 .chain(default_privilege_revokes.into_iter().map(
1744 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1745 privilege_object,
1746 privilege_acl_item,
1747 variant: UpdatePrivilegeVariant::Revoke,
1748 },
1749 ))
1750 .chain(iter::once(catalog::Op::DropObjects(
1751 ids.into_iter()
1752 .map(DropObjectInfo::manual_drop_from_object_id)
1753 .collect(),
1754 )))
1755 .collect();
1756
1757 Ok(DropOps {
1758 ops,
1759 dropped_active_db,
1760 dropped_active_cluster,
1761 dropped_in_use_indexes,
1762 })
1763 }
1764
1765 pub(super) fn sequence_explain_schema(
1766 &self,
1767 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1768 ) -> Result<ExecuteResponse, AdapterError> {
1769 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1770 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1771 })?;
1772
1773 let json_string = json_string(&json_value);
1774 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1775 Ok(Self::send_immediate_rows(row))
1776 }
1777
1778 pub(super) fn sequence_show_all_variables(
1779 &self,
1780 session: &Session,
1781 ) -> Result<ExecuteResponse, AdapterError> {
1782 let mut rows = viewable_variables(self.catalog().state(), session)
1783 .map(|v| (v.name(), v.value(), v.description()))
1784 .collect::<Vec<_>>();
1785 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
1786
1787 let rows: Vec<_> = rows
1789 .into_iter()
1790 .map(|(name, val, desc)| {
1791 Row::pack_slice(&[
1792 Datum::String(name),
1793 Datum::String(&val),
1794 Datum::String(desc),
1795 ])
1796 })
1797 .collect();
1798 Ok(Self::send_immediate_rows(rows))
1799 }
1800
1801 pub(super) fn sequence_show_variable(
1802 &self,
1803 session: &Session,
1804 plan: plan::ShowVariablePlan,
1805 ) -> Result<ExecuteResponse, AdapterError> {
1806 if &plan.name == SCHEMA_ALIAS {
1807 let schemas = self.catalog.resolve_search_path(session);
1808 let schema = schemas.first();
1809 return match schema {
1810 Some((database_spec, schema_spec)) => {
1811 let schema_name = &self
1812 .catalog
1813 .get_schema(database_spec, schema_spec, session.conn_id())
1814 .name()
1815 .schema;
1816 let row = Row::pack_slice(&[Datum::String(schema_name)]);
1817 Ok(Self::send_immediate_rows(row))
1818 }
1819 None => {
1820 if session.vars().current_object_missing_warnings() {
1821 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
1822 search_path: session
1823 .vars()
1824 .search_path()
1825 .into_iter()
1826 .map(|schema| schema.to_string())
1827 .collect(),
1828 });
1829 }
1830 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
1831 }
1832 };
1833 }
1834
1835 let variable = session
1836 .vars()
1837 .get(self.catalog().system_config(), &plan.name)
1838 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
1839
1840 variable.visible(session.user(), self.catalog().system_config())?;
1843
1844 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
1845 if variable.name() == vars::DATABASE.name()
1846 && matches!(
1847 self.catalog().resolve_database(&variable.value()),
1848 Err(CatalogError::UnknownDatabase(_))
1849 )
1850 && session.vars().current_object_missing_warnings()
1851 {
1852 let name = variable.value();
1853 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1854 } else if variable.name() == vars::CLUSTER.name()
1855 && matches!(
1856 self.catalog().resolve_cluster(&variable.value()),
1857 Err(CatalogError::UnknownCluster(_))
1858 )
1859 && session.vars().current_object_missing_warnings()
1860 {
1861 let name = variable.value();
1862 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1863 }
1864 Ok(Self::send_immediate_rows(row))
1865 }
1866
1867 #[instrument]
1868 pub(super) async fn sequence_inspect_shard(
1869 &self,
1870 session: &Session,
1871 plan: plan::InspectShardPlan,
1872 ) -> Result<ExecuteResponse, AdapterError> {
1873 if !session.user().is_internal() {
1876 return Err(AdapterError::Unauthorized(
1877 rbac::UnauthorizedError::MzSystem {
1878 action: "inspect".into(),
1879 },
1880 ));
1881 }
1882 let state = self
1883 .controller
1884 .storage
1885 .inspect_persist_state(plan.id)
1886 .await?;
1887 let jsonb = Jsonb::from_serde_json(state)?;
1888 Ok(Self::send_immediate_rows(jsonb.into_row()))
1889 }
1890
1891 #[instrument]
1892 pub(super) fn sequence_set_variable(
1893 &self,
1894 session: &mut Session,
1895 plan: plan::SetVariablePlan,
1896 ) -> Result<ExecuteResponse, AdapterError> {
1897 let (name, local) = (plan.name, plan.local);
1898 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1899 self.validate_set_isolation_level(session)?;
1900 }
1901 if &name == vars::CLUSTER.name() {
1902 self.validate_set_cluster(session)?;
1903 }
1904
1905 let vars = session.vars_mut();
1906 let values = match plan.value {
1907 plan::VariableValue::Default => None,
1908 plan::VariableValue::Values(values) => Some(values),
1909 };
1910
1911 match values {
1912 Some(values) => {
1913 vars.set(
1914 self.catalog().system_config(),
1915 &name,
1916 VarInput::SqlSet(&values),
1917 local,
1918 )?;
1919
1920 let vars = session.vars();
1921
1922 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
1925 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
1926 } else if name == vars::CLUSTER.name()
1927 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
1928 {
1929 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
1930 }
1931
1932 if name.as_str() == vars::DATABASE.name()
1934 && matches!(
1935 self.catalog().resolve_database(vars.database()),
1936 Err(CatalogError::UnknownDatabase(_))
1937 )
1938 && session.vars().current_object_missing_warnings()
1939 {
1940 let name = vars.database().to_string();
1941 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
1942 } else if name.as_str() == vars::CLUSTER.name()
1943 && matches!(
1944 self.catalog().resolve_cluster(vars.cluster()),
1945 Err(CatalogError::UnknownCluster(_))
1946 )
1947 && session.vars().current_object_missing_warnings()
1948 {
1949 let name = vars.cluster().to_string();
1950 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
1951 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
1952 let v = values.into_first().to_lowercase();
1953 if v == IsolationLevel::ReadUncommitted.as_variant_str()
1954 || v == IsolationLevel::ReadCommitted.as_variant_str()
1955 || v == IsolationLevel::RepeatableRead.as_variant_str()
1956 {
1957 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
1958 isolation_level: v,
1959 });
1960 } else if v == IsolationLevel::StrongSessionSerializable.as_variant_str() {
1961 session.add_notice(AdapterNotice::StrongSessionSerializable);
1962 }
1963 }
1964
1965 if (name.as_str() == TRANSACTION_ISOLATION_VAR_NAME
1969 || name.as_str() == vars::REAL_TIME_RECENCY.name())
1970 && session
1971 .vars()
1972 .transaction_isolation()
1973 .is_bounded_staleness()
1974 && session.vars().real_time_recency()
1975 {
1976 return Err(AdapterError::BoundedStalenessRealTimeRecencyConflict);
1977 }
1978 }
1979 None => vars.reset(self.catalog().system_config(), &name, local)?,
1980 }
1981
1982 Ok(ExecuteResponse::SetVariable { name, reset: false })
1983 }
1984
1985 pub(super) fn sequence_reset_variable(
1986 &self,
1987 session: &mut Session,
1988 plan: plan::ResetVariablePlan,
1989 ) -> Result<ExecuteResponse, AdapterError> {
1990 let name = plan.name;
1991 if &name == TRANSACTION_ISOLATION_VAR_NAME {
1992 self.validate_set_isolation_level(session)?;
1993 }
1994 if &name == vars::CLUSTER.name() {
1995 self.validate_set_cluster(session)?;
1996 }
1997 session
1998 .vars_mut()
1999 .reset(self.catalog().system_config(), &name, false)?;
2000 Ok(ExecuteResponse::SetVariable { name, reset: true })
2001 }
2002
2003 pub(super) fn sequence_set_transaction(
2004 &self,
2005 session: &mut Session,
2006 plan: plan::SetTransactionPlan,
2007 ) -> Result<ExecuteResponse, AdapterError> {
2008 for mode in plan.modes {
2010 match mode {
2011 TransactionMode::AccessMode(_) => {
2012 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2013 }
2014 TransactionMode::IsolationLevel(isolation_level) => {
2015 self.validate_set_isolation_level(session)?;
2016
2017 session.vars_mut().set(
2018 self.catalog().system_config(),
2019 TRANSACTION_ISOLATION_VAR_NAME,
2020 VarInput::Flat(&isolation_level.to_ast_string_stable()),
2021 plan.local,
2022 )?
2023 }
2024 }
2025 }
2026 Ok(ExecuteResponse::SetVariable {
2027 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2028 reset: false,
2029 })
2030 }
2031
2032 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2033 if session.transaction().contains_ops() {
2034 Err(AdapterError::InvalidSetIsolationLevel)
2035 } else {
2036 Ok(())
2037 }
2038 }
2039
2040 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2041 if session.transaction().contains_ops() {
2042 Err(AdapterError::InvalidSetCluster)
2043 } else {
2044 Ok(())
2045 }
2046 }
2047
2048 #[instrument]
2049 pub(super) async fn sequence_end_transaction(
2050 &mut self,
2051 mut ctx: ExecuteContext,
2052 mut action: EndTransactionAction,
2053 ) {
2054 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2056 (&action, ctx.session().transaction())
2057 {
2058 action = EndTransactionAction::Rollback;
2059 }
2060 let response = match action {
2061 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2062 params: BTreeMap::new(),
2063 }),
2064 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2065 params: BTreeMap::new(),
2066 }),
2067 };
2068
2069 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2070
2071 let (response, action) = match result {
2072 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2073 (response, action)
2074 }
2075 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2076 let validated_locks = match write_lock_guards {
2080 None => None,
2081 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2082 Ok(locks) => Some(locks),
2083 Err(missing) => {
2084 tracing::error!(?missing, "programming error, missing write locks");
2085 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2086 }
2087 },
2088 };
2089
2090 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2091 for WriteOp { id, rows } in writes {
2092 let total_rows = collected_writes.entry(id).or_default();
2093 total_rows.push(rows);
2094 }
2095
2096 self.submit_write(PendingWriteTxn::User {
2097 span: Span::current(),
2098 writes: collected_writes,
2099 write_locks: validated_locks,
2100 responder: UserWriteResponder::Session(PendingTxn {
2101 ctx,
2102 response,
2103 action,
2104 }),
2105 });
2106 return;
2107 }
2108 Ok((
2109 Some(TransactionOps::Peeks {
2110 determination,
2111 requires_linearization: RequireLinearization::Required,
2112 ..
2113 }),
2114 _,
2115 )) if ctx.session().vars().transaction_isolation()
2116 == &IsolationLevel::StrictSerializable =>
2117 {
2118 let conn_id = ctx.session().conn_id().clone();
2119 let pending_read_txn = PendingReadTxn {
2120 txn: PendingRead::Read {
2121 txn: PendingTxn {
2122 ctx,
2123 response,
2124 action,
2125 },
2126 },
2127 timestamp_context: determination.timestamp_context,
2128 created: Instant::now(),
2129 num_requeues: 0,
2130 otel_ctx: OpenTelemetryContext::obtain(),
2131 };
2132 self.strict_serializable_reads_tx
2133 .send((conn_id, pending_read_txn))
2134 .expect("sending to strict_serializable_reads_tx cannot fail");
2135 return;
2136 }
2137 Ok((
2138 Some(TransactionOps::Peeks {
2139 determination,
2140 requires_linearization: RequireLinearization::Required,
2141 ..
2142 }),
2143 _,
2144 )) if ctx.session().vars().transaction_isolation()
2145 == &IsolationLevel::StrongSessionSerializable =>
2146 {
2147 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2148 ctx.session_mut()
2149 .ensure_timestamp_oracle(timeline.clone())
2150 .apply_write(*ts);
2151 }
2152 (response, action)
2153 }
2154 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2155 self.internal_cmd_tx
2156 .send(Message::ExecuteSingleStatementTransaction {
2157 ctx,
2158 otel_ctx: OpenTelemetryContext::obtain(),
2159 stmt,
2160 params,
2161 })
2162 .expect("must send");
2163 return;
2164 }
2165 Ok((_, _)) => (response, action),
2166 Err(err) => (Err(err), EndTransactionAction::Rollback),
2167 };
2168 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2169 let response = response.map(|mut r| {
2171 r.extend_params(changed);
2172 ExecuteResponse::from(r)
2173 });
2174
2175 ctx.retire(response);
2176 }
2177
2178 #[instrument]
2179 async fn sequence_end_transaction_inner(
2180 &mut self,
2181 ctx: &mut ExecuteContext,
2182 action: EndTransactionAction,
2183 ) -> Result<(Option<TransactionOps>, Option<WriteLocks>), AdapterError> {
2184 let txn = self.clear_transaction(ctx.session_mut()).await;
2185
2186 if let EndTransactionAction::Commit = action {
2187 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2188 match &mut ops {
2189 TransactionOps::Writes(writes) => {
2190 for WriteOp { id, .. } in &mut writes.iter() {
2191 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2193 AdapterError::Catalog(mz_catalog::memory::error::Error {
2194 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2195 })
2196 })?;
2197 }
2198
2199 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2201 }
2202 TransactionOps::DDL {
2203 ops,
2204 state: _,
2205 side_effects,
2206 revision,
2207 snapshot: _,
2208 } => {
2209 if *revision != self.catalog().transient_revision() {
2211 return Err(AdapterError::DDLTransactionRace);
2212 }
2213 let ops = std::mem::take(ops);
2215 let side_effects = std::mem::take(side_effects);
2216 self.catalog_transact_with_side_effects(
2217 Some(ctx),
2218 ops,
2219 move |a, mut ctx| {
2220 Box::pin(async move {
2221 for side_effect in side_effects {
2222 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2223 }
2224 })
2225 },
2226 )
2227 .await?;
2228 }
2229 _ => (),
2230 }
2231 return Ok((Some(ops), write_lock_guards));
2232 }
2233 }
2234
2235 Ok((None, None))
2236 }
2237
2238 pub(super) async fn sequence_side_effecting_func(
2239 &mut self,
2240 ctx: ExecuteContext,
2241 plan: SideEffectingFunc,
2242 ) {
2243 match plan {
2244 SideEffectingFunc::PgCancelBackend { connection_id } => {
2245 let Some(connection_id) = connection_id else {
2246 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[
2249 Datum::Null,
2250 ]))));
2251 return;
2252 };
2253
2254 if ctx.session().conn_id().unhandled() == connection_id {
2255 ctx.retire(Err(AdapterError::Canceled));
2259 return;
2260 }
2261
2262 let res = if let Some((id_handle, _conn_meta)) =
2263 self.active_conns.get_key_value(&connection_id)
2264 {
2265 self.handle_privileged_cancel(id_handle.clone()).await;
2267 Datum::True
2268 } else {
2269 Datum::False
2270 };
2271 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2272 }
2273 }
2274 }
2275
2276 pub(crate) async fn execute_side_effecting_func(
2285 &mut self,
2286 plan: SideEffectingFunc,
2287 conn_id: ConnectionId,
2288 current_role: RoleId,
2289 ) -> Result<ExecuteResponse, AdapterError> {
2290 match plan {
2291 SideEffectingFunc::PgCancelBackend { connection_id } => {
2292 let Some(connection_id) = connection_id else {
2293 return Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])));
2296 };
2297
2298 if conn_id.unhandled() == connection_id {
2299 return Err(AdapterError::Canceled);
2303 }
2304
2305 if let Some((_id_handle, conn_meta)) =
2308 self.active_conns.get_key_value(&connection_id)
2309 {
2310 let target_role = *conn_meta.authenticated_role_id();
2311 let role_membership = self
2312 .catalog()
2313 .state()
2314 .collect_role_membership(¤t_role);
2315 if !role_membership.contains(&target_role) {
2316 let target_role_name = self
2317 .catalog()
2318 .try_get_role(&target_role)
2319 .map(|role| role.name().to_string())
2320 .unwrap_or_else(|| target_role.to_string());
2321 return Err(AdapterError::Unauthorized(
2322 rbac::UnauthorizedError::RoleMembership {
2323 role_names: vec![target_role_name],
2324 },
2325 ));
2326 }
2327
2328 let id_handle = self
2330 .active_conns
2331 .get_key_value(&connection_id)
2332 .map(|(id, _)| id.clone())
2333 .expect("checked above");
2334 self.handle_privileged_cancel(id_handle).await;
2335 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
2336 } else {
2337 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
2339 }
2340 }
2341 }
2342 }
2343
2344 pub(crate) async fn determine_real_time_recent_timestamp(
2348 &self,
2349 source_ids: impl Iterator<Item = GlobalId>,
2350 real_time_recency_timeout: Duration,
2351 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2352 let item_ids = source_ids
2353 .map(|gid| {
2354 self.catalog
2355 .try_resolve_item_id(&gid)
2356 .ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2357 })
2358 .collect::<Result<Vec<_>, _>>()?;
2359
2360 let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
2366 if to_visit.is_empty() {
2369 return Ok(None);
2370 }
2371
2372 let mut timestamp_objects = BTreeSet::new();
2373
2374 while let Some(id) = to_visit.pop_front() {
2375 timestamp_objects.insert(id);
2376 to_visit.extend(
2377 self.catalog()
2378 .get_entry(&id)
2379 .uses()
2380 .into_iter()
2381 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2382 );
2383 }
2384 let timestamp_objects = timestamp_objects
2385 .into_iter()
2386 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2387 .collect();
2388
2389 let r = self
2390 .controller
2391 .determine_real_time_recent_timestamp(timestamp_objects, real_time_recency_timeout)
2392 .await?;
2393
2394 Ok(Some(r))
2395 }
2396
2397 pub(crate) async fn await_real_time_recent_timestamp<F>(
2398 catalog: Arc<Catalog>,
2399 fut: F,
2400 ) -> Result<Timestamp, AdapterError>
2401 where
2402 F: Future<Output = Result<Timestamp, StorageError>>,
2403 {
2404 fut.await
2405 .map_err(|error| Self::real_time_recent_timestamp_error(&catalog, error))
2406 }
2407
2408 fn real_time_recent_timestamp_error(catalog: &Catalog, error: StorageError) -> AdapterError {
2409 let rtr_name = |id: &GlobalId| {
2410 catalog
2411 .try_get_entry_by_global_id(id)
2412 .map(|e| e.name().item.clone())
2413 .unwrap_or_else(|| id.to_string())
2414 };
2415
2416 match error {
2417 StorageError::RtrTimeout(id) => AdapterError::RtrTimeout(rtr_name(&id)),
2418 StorageError::RtrDropFailure(id) => AdapterError::RtrDropFailure(rtr_name(&id)),
2419 error => error.into(),
2420 }
2421 }
2422
2423 pub(crate) async fn determine_real_time_recent_timestamp_if_needed(
2426 &self,
2427 session: &Session,
2428 source_ids: impl Iterator<Item = GlobalId>,
2429 ) -> Result<Option<RtrTimestampFuture>, AdapterError> {
2430 let vars = session.vars();
2431
2432 if vars.real_time_recency()
2433 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2434 && !session.contains_read_timestamp()
2435 {
2436 self.determine_real_time_recent_timestamp(source_ids, *vars.real_time_recency_timeout())
2437 .await
2438 } else {
2439 Ok(None)
2440 }
2441 }
2442
2443 #[instrument]
2444 pub(super) async fn sequence_explain_plan(
2445 &mut self,
2446 ctx: ExecuteContext,
2447 plan: plan::ExplainPlanPlan,
2448 target_cluster: TargetCluster,
2449 ) {
2450 match &plan.explainee {
2451 plan::Explainee::Statement(stmt) => match stmt {
2452 plan::ExplaineeStatement::CreateView { .. } => {
2453 self.explain_create_view(ctx, plan).await;
2454 }
2455 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2456 self.explain_create_materialized_view(ctx, plan).await;
2457 }
2458 plan::ExplaineeStatement::CreateIndex { .. } => {
2459 self.explain_create_index(ctx, plan).await;
2460 }
2461 plan::ExplaineeStatement::Select { .. } => {
2462 self.explain_peek(ctx, plan, target_cluster).await;
2463 }
2464 plan::ExplaineeStatement::Subscribe { .. } => {
2465 self.explain_subscribe(ctx, plan, target_cluster).await;
2466 }
2467 },
2468 plan::Explainee::View(_) => {
2469 let result = self.explain_view(&ctx, plan);
2470 ctx.retire(result);
2471 }
2472 plan::Explainee::MaterializedView(_) => {
2473 let result = self.explain_materialized_view(&ctx, plan);
2474 ctx.retire(result);
2475 }
2476 plan::Explainee::Index(_) => {
2477 let result = self.explain_index(&ctx, plan);
2478 ctx.retire(result);
2479 }
2480 plan::Explainee::ReplanView(_) => {
2481 self.explain_replan_view(ctx, plan).await;
2482 }
2483 plan::Explainee::ReplanMaterializedView(_) => {
2484 self.explain_replan_materialized_view(ctx, plan).await;
2485 }
2486 plan::Explainee::ReplanIndex(_) => {
2487 self.explain_replan_index(ctx, plan).await;
2488 }
2489 };
2490 }
2491
2492 pub(super) async fn sequence_explain_pushdown(
2493 &mut self,
2494 ctx: ExecuteContext,
2495 plan: plan::ExplainPushdownPlan,
2496 target_cluster: TargetCluster,
2497 ) {
2498 match plan.explainee {
2499 Explainee::Statement(ExplaineeStatement::Select {
2500 broken: false,
2501 plan,
2502 desc: _,
2503 }) => {
2504 let stage = return_if_err!(
2505 self.peek_validate(
2506 ctx.session(),
2507 plan,
2508 target_cluster,
2509 None,
2510 ExplainContext::Pushdown,
2511 Some(ctx.session().vars().max_query_result_size()),
2512 ),
2513 ctx
2514 );
2515 self.sequence_staged(ctx, Span::current(), stage).await;
2516 }
2517 Explainee::MaterializedView(item_id) => {
2518 self.explain_pushdown_materialized_view(ctx, item_id).await;
2519 }
2520 _ => {
2521 ctx.retire(Err(AdapterError::Unsupported(
2522 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2523 )));
2524 }
2525 };
2526 }
2527
2528 async fn execute_explain_pushdown_with_read_holds(
2530 &self,
2531 ctx: ExecuteContext,
2532 as_of: Antichain<Timestamp>,
2533 mz_now: ResultSpec<'static>,
2534 read_holds: Option<ReadHolds>,
2535 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2536 ) {
2537 let fut = self
2538 .explain_pushdown_future(ctx.session(), as_of, mz_now, imports)
2539 .await;
2540 task::spawn(|| "render explain pushdown", async move {
2541 let _read_holds = read_holds;
2543 let res = fut.await;
2544 ctx.retire(res);
2545 });
2546 }
2547
2548 async fn explain_pushdown_future<I: IntoIterator<Item = (GlobalId, MapFilterProject)>>(
2550 &self,
2551 session: &Session,
2552 as_of: Antichain<Timestamp>,
2553 mz_now: ResultSpec<'static>,
2554 imports: I,
2555 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2556 super::explain_pushdown_future_inner(
2558 session,
2559 &self.catalog,
2560 &self.controller.storage_collections,
2561 as_of,
2562 mz_now,
2563 imports,
2564 )
2565 .await
2566 }
2567
2568 #[instrument]
2569 pub(super) async fn sequence_insert(
2570 &mut self,
2571 mut ctx: ExecuteContext,
2572 plan: plan::InsertPlan,
2573 ) {
2574 if !ctx.session_mut().transaction().allows_writes() {
2582 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2583 return;
2584 }
2585 if ctx
2586 .session()
2587 .vars()
2588 .transaction_isolation()
2589 .is_bounded_staleness()
2590 {
2591 ctx.retire(Err(AdapterError::BoundedStalenessReadOnly));
2592 return;
2593 }
2594
2595 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2609 let expr = return_if_err!(
2612 plan.values
2613 .clone()
2614 .lower(self.catalog().system_config(), None),
2615 ctx
2616 );
2617 OptimizedMirRelationExpr(expr)
2618 } else {
2619 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2621
2622 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2624
2625 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2627 };
2628
2629 match optimized_mir.into_inner() {
2630 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2631 let catalog = self.owned_catalog();
2632 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2633 let result =
2634 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2635 ctx.retire(result);
2636 });
2637 }
2638 _ => {
2640 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2641 Some(table) => {
2642 let desc = table.relation_desc_latest().expect("table has a desc");
2644 desc.arity()
2645 }
2646 None => {
2647 ctx.retire(Err(AdapterError::Catalog(
2648 mz_catalog::memory::error::Error {
2649 kind: ErrorKind::Sql(CatalogError::UnknownItem(
2650 plan.id.to_string(),
2651 )),
2652 },
2653 )));
2654 return;
2655 }
2656 };
2657
2658 let finishing = RowSetFinishing {
2659 order_by: vec![],
2660 limit: None,
2661 offset: 0,
2662 project: (0..desc_arity).collect(),
2663 };
2664
2665 let read_then_write_plan = plan::ReadThenWritePlan {
2666 id: plan.id,
2667 selection: plan.values,
2668 finishing,
2669 assignments: BTreeMap::new(),
2670 kind: MutationKind::Insert,
2671 returning: plan.returning,
2672 };
2673
2674 self.sequence_read_then_write(ctx, read_then_write_plan)
2675 .await;
2676 }
2677 }
2678 }
2679
2680 #[instrument]
2685 pub(super) async fn sequence_read_then_write(
2686 &mut self,
2687 mut ctx: ExecuteContext,
2688 plan: plan::ReadThenWritePlan,
2689 ) {
2690 if ctx
2691 .session()
2692 .vars()
2693 .transaction_isolation()
2694 .is_bounded_staleness()
2695 {
2696 ctx.retire(Err(AdapterError::BoundedStalenessReadOnly));
2697 return;
2698 }
2699
2700 let mut source_ids: BTreeSet<_> = plan
2701 .selection
2702 .depends_on()
2703 .into_iter()
2704 .map(|gid| self.catalog().resolve_item_id(&gid))
2705 .collect();
2706 source_ids.insert(plan.id);
2707
2708 if ctx.session().transaction().write_locks().is_none() {
2710 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2712
2713 for id in &source_ids {
2715 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2716 write_locks.insert_lock(*id, lock);
2717 }
2718 }
2719
2720 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2722 Ok(locks) => locks,
2723 Err(missing) => {
2724 let role_metadata = ctx.session().role_metadata().clone();
2726 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2727 let plan = DeferredPlan {
2728 ctx,
2729 plan: Plan::ReadThenWrite(plan),
2730 validity: PlanValidity::new(
2731 self.catalog.transient_revision(),
2732 source_ids.clone(),
2733 None,
2734 None,
2735 role_metadata,
2736 ),
2737 requires_locks: source_ids,
2738 resolved_ids: ResolvedIds::empty(),
2740 sql_impl_resolved_ids: ResolvedIds::empty(),
2741 };
2742 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2743 }
2744 };
2745
2746 ctx.session_mut()
2747 .try_grant_write_locks(write_locks)
2748 .expect("session has already been granted write locks");
2749 }
2750
2751 let plan::ReadThenWritePlan {
2752 id,
2753 kind,
2754 selection,
2755 mut assignments,
2756 finishing,
2757 mut returning,
2758 } = plan;
2759
2760 let desc = match self.catalog().try_get_entry(&id) {
2762 Some(table) => {
2763 table
2765 .relation_desc_latest()
2766 .expect("table has a desc")
2767 .into_owned()
2768 }
2769 None => {
2770 ctx.retire(Err(AdapterError::Catalog(
2771 mz_catalog::memory::error::Error {
2772 kind: ErrorKind::Sql(CatalogError::UnknownItem(id.to_string())),
2773 },
2774 )));
2775 return;
2776 }
2777 };
2778
2779 let contains_temporal = selection.contains_temporal()
2781 || assignments.values().any(|e| e.contains_temporal())
2782 || returning.iter().any(|e| e.contains_temporal());
2783 if contains_temporal {
2784 ctx.retire(Err(AdapterError::Unsupported(
2785 "calls to mz_now in write statements",
2786 )));
2787 return;
2788 }
2789
2790 for gid in selection.depends_on() {
2792 let item_id = self.catalog().resolve_item_id(&gid);
2793 if let Err(err) = validate_read_then_write_dependencies(self.catalog(), &item_id) {
2794 ctx.retire(Err(err));
2795 return;
2796 }
2797 }
2798
2799 let (peek_tx, peek_rx) = oneshot::channel();
2800 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
2801 let (tx, _, session, extra) = ctx.into_parts();
2802 let peek_ctx = ExecuteContext::from_parts(
2814 peek_client_tx,
2815 self.internal_cmd_tx.clone(),
2816 session,
2817 Default::default(),
2818 );
2819
2820 self.sequence_peek(
2821 peek_ctx,
2822 plan::SelectPlan {
2823 select: None,
2824 source: selection,
2825 when: QueryWhen::FreshestTableWrite,
2826 finishing,
2827 copy_to: None,
2828 },
2829 TargetCluster::Active,
2830 None,
2831 )
2832 .await;
2833
2834 let internal_cmd_tx = self.internal_cmd_tx.clone();
2835 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
2836 let catalog = self.owned_catalog();
2837 let max_result_size = self.catalog().system_config().max_result_size();
2838
2839 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
2840 let (peek_response, session) = match peek_rx.await {
2841 Ok(Response {
2842 result: Ok(resp),
2843 session,
2844 otel_ctx,
2845 }) => {
2846 otel_ctx.attach_as_parent();
2847 (resp, session)
2848 }
2849 Ok(Response {
2850 result: Err(e),
2851 session,
2852 otel_ctx,
2853 }) => {
2854 let ctx =
2855 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2856 otel_ctx.attach_as_parent();
2857 ctx.retire(Err(e));
2858 return;
2859 }
2860 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
2862 };
2863 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
2864 let mut timeout_dur = *ctx.session().vars().statement_timeout();
2865
2866 if timeout_dur == Duration::ZERO {
2868 timeout_dur = Duration::MAX;
2869 }
2870
2871 let style = ExprPrepOneShot {
2872 logical_time: EvalTime::NotAvailable, session: ctx.session(),
2874 catalog_state: catalog.state(),
2875 };
2876 for expr in assignments.values_mut().chain(returning.iter_mut()) {
2877 return_if_err!(style.prep_scalar_expr(expr), ctx);
2878 }
2879
2880 let make_diffs = move |mut rows: Box<dyn RowIterator>|
2881 -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
2882 let arena = RowArena::new();
2883 let mut diffs = Vec::new();
2884 let mut datum_vec = mz_repr::DatumVec::new();
2885
2886 while let Some(row) = rows.next() {
2887 if !assignments.is_empty() {
2888 assert!(
2889 matches!(kind, MutationKind::Update),
2890 "only updates support assignments"
2891 );
2892 let mut datums = datum_vec.borrow_with(row);
2893 let mut updates = vec![];
2894 for (idx, expr) in &assignments {
2895 let updated = match expr.eval(&datums, &arena) {
2896 Ok(updated) => updated,
2897 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
2898 };
2899 updates.push((*idx, updated));
2900 }
2901 for (idx, new_value) in updates {
2902 datums[idx] = new_value;
2903 }
2904 let updated = Row::pack_slice(&datums);
2905 diffs.push((updated, Diff::ONE));
2906 }
2907 match kind {
2908 MutationKind::Update | MutationKind::Delete => {
2912 diffs.push((row.to_owned(), Diff::MINUS_ONE))
2913 }
2914 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
2915 }
2916 }
2917
2918 let mut byte_size: u64 = 0;
2921 for (row, diff) in &diffs {
2922 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
2923 if diff.is_positive() {
2924 for (idx, datum) in row.iter().enumerate() {
2925 desc.constraints_met(idx, &datum)?;
2926 }
2927 }
2928 }
2929 Ok((diffs, byte_size))
2930 };
2931
2932 let diffs = match peek_response {
2933 ExecuteResponse::SendingRowsStreaming {
2934 rows: mut rows_stream,
2935 ..
2936 } => {
2937 let mut byte_size: u64 = 0;
2938 let mut diffs = Vec::new();
2939 let result = loop {
2940 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
2941 Ok(Some(res)) => match res {
2942 PeekResponseUnary::Rows(new_rows) => {
2943 match make_diffs(new_rows) {
2944 Ok((mut new_diffs, new_byte_size)) => {
2945 byte_size = byte_size.saturating_add(new_byte_size);
2946 if byte_size > max_result_size {
2947 break Err(AdapterError::ResultSize(format!(
2948 "result exceeds max size of {max_result_size}"
2949 )));
2950 }
2951 diffs.append(&mut new_diffs)
2952 }
2953 Err(e) => break Err(e),
2954 };
2955 }
2956 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
2957 PeekResponseUnary::Error(e) => {
2958 break Err(AdapterError::Unstructured(anyhow!(e)));
2959 }
2960 PeekResponseUnary::DependencyDropped(dep) => {
2961 break Err(dep.to_concurrent_dependency_drop());
2962 }
2963 },
2964 Ok(None) => break Ok(diffs),
2965 Err(_) => {
2966 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
2971 conn_id: ctx.session().conn_id().clone(),
2972 });
2973 if let Err(e) = result {
2974 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
2975 }
2976 break Err(AdapterError::StatementTimeout);
2977 }
2978 }
2979 };
2980
2981 result
2982 }
2983 ExecuteResponse::SendingRowsImmediate { rows } => {
2984 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
2985 }
2986 resp => Err(AdapterError::Unstructured(anyhow!(
2987 "unexpected peek response: {resp:?}"
2988 ))),
2989 };
2990
2991 let mut returning_rows = Vec::new();
2992 let mut diff_err: Option<AdapterError> = None;
2993 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
2994 let arena = RowArena::new();
2995 for (row, diff) in diffs {
2996 if !diff.is_positive() {
2997 continue;
2998 }
2999 let mut returning_row = Row::with_capacity(returning.len());
3000 let mut packer = returning_row.packer();
3001 for expr in &returning {
3002 let datums: Vec<_> = row.iter().collect();
3003 match expr.eval(&datums, &arena) {
3004 Ok(datum) => {
3005 packer.push(datum);
3006 }
3007 Err(err) => {
3008 diff_err = Some(err.into());
3009 break;
3010 }
3011 }
3012 }
3013 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3014 let diff = match NonZeroUsize::try_from(diff) {
3015 Ok(diff) => diff,
3016 Err(err) => {
3017 diff_err = Some(err.into());
3018 break;
3019 }
3020 };
3021 returning_rows.push((returning_row, diff));
3022 if diff_err.is_some() {
3023 break;
3024 }
3025 }
3026 }
3027 let diffs = if let Some(err) = diff_err {
3028 Err(err)
3029 } else {
3030 diffs
3031 };
3032
3033 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3036 if let Some(timestamp_context) = timestamp_context {
3045 let (tx, rx) = tokio::sync::oneshot::channel();
3046 let conn_id = ctx.session().conn_id().clone();
3047 let pending_read_txn = PendingReadTxn {
3048 txn: PendingRead::ReadThenWrite { ctx, tx },
3049 timestamp_context,
3050 created: Instant::now(),
3051 num_requeues: 0,
3052 otel_ctx: OpenTelemetryContext::obtain(),
3053 };
3054 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3055 if let Err(e) = result {
3057 warn!(
3058 "strict_serializable_reads_tx dropped before we could send: {:?}",
3059 e
3060 );
3061 return;
3062 }
3063 let result = rx.await;
3064 ctx = match result {
3066 Ok(Some(ctx)) => ctx,
3067 Ok(None) => {
3068 return;
3071 }
3072 Err(e) => {
3073 warn!(
3074 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3075 e
3076 );
3077 return;
3078 }
3079 };
3080 }
3081
3082 match diffs {
3083 Ok(diffs) => {
3084 let result = Self::send_diffs(
3085 ctx.session_mut(),
3086 plan::SendDiffsPlan {
3087 id,
3088 updates: diffs,
3089 kind,
3090 returning: returning_rows,
3091 max_result_size,
3092 },
3093 );
3094 ctx.retire(result);
3095 }
3096 Err(e) => {
3097 ctx.retire(Err(e));
3098 }
3099 }
3100 });
3101 }
3102
3103 #[instrument]
3104 pub(super) async fn sequence_alter_item_rename(
3105 &mut self,
3106 ctx: &mut ExecuteContext,
3107 plan: plan::AlterItemRenamePlan,
3108 ) -> Result<ExecuteResponse, AdapterError> {
3109 let op = catalog::Op::RenameItem {
3110 id: plan.id,
3111 current_full_name: plan.current_full_name,
3112 to_name: plan.to_name,
3113 };
3114 match self
3115 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3116 .await
3117 {
3118 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3119 Err(err) => Err(err),
3120 }
3121 }
3122
3123 #[instrument]
3124 pub(super) async fn sequence_alter_retain_history(
3125 &mut self,
3126 ctx: &mut ExecuteContext,
3127 plan: plan::AlterRetainHistoryPlan,
3128 ) -> Result<ExecuteResponse, AdapterError> {
3129 let ops = vec![catalog::Op::AlterRetainHistory {
3130 id: plan.id,
3131 value: plan.value,
3132 window: plan.window,
3133 }];
3134 self.catalog_transact_with_context(None, Some(ctx), ops)
3135 .await?;
3136 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3137 }
3138
3139 #[instrument]
3140 pub(super) async fn sequence_alter_source_timestamp_interval(
3141 &mut self,
3142 ctx: &mut ExecuteContext,
3143 plan: plan::AlterSourceTimestampIntervalPlan,
3144 ) -> Result<ExecuteResponse, AdapterError> {
3145 let ops = vec![catalog::Op::AlterSourceTimestampInterval {
3146 id: plan.id,
3147 value: plan.value,
3148 interval: plan.interval,
3149 }];
3150 self.catalog_transact_with_context(None, Some(ctx), ops)
3151 .await?;
3152 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
3153 }
3154
3155 #[instrument]
3156 pub(super) async fn sequence_alter_schema_rename(
3157 &mut self,
3158 ctx: &mut ExecuteContext,
3159 plan: plan::AlterSchemaRenamePlan,
3160 ) -> Result<ExecuteResponse, AdapterError> {
3161 let (database_spec, schema_spec) = plan.cur_schema_spec;
3162 let op = catalog::Op::RenameSchema {
3163 database_spec,
3164 schema_spec,
3165 new_name: plan.new_schema_name,
3166 check_reserved_names: true,
3167 };
3168 match self
3169 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3170 .await
3171 {
3172 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3173 Err(err) => Err(err),
3174 }
3175 }
3176
3177 #[instrument]
3178 pub(super) async fn sequence_alter_schema_swap(
3179 &mut self,
3180 ctx: &mut ExecuteContext,
3181 plan: plan::AlterSchemaSwapPlan,
3182 ) -> Result<ExecuteResponse, AdapterError> {
3183 let plan::AlterSchemaSwapPlan {
3184 schema_a_spec: (schema_a_db, schema_a),
3185 schema_a_name,
3186 schema_b_spec: (schema_b_db, schema_b),
3187 schema_b_name,
3188 name_temp,
3189 } = plan;
3190
3191 let op_a = catalog::Op::RenameSchema {
3192 database_spec: schema_a_db,
3193 schema_spec: schema_a,
3194 new_name: name_temp,
3195 check_reserved_names: false,
3196 };
3197 let op_b = catalog::Op::RenameSchema {
3198 database_spec: schema_b_db,
3199 schema_spec: schema_b,
3200 new_name: schema_a_name,
3201 check_reserved_names: false,
3202 };
3203 let op_c = catalog::Op::RenameSchema {
3204 database_spec: schema_a_db,
3205 schema_spec: schema_a,
3206 new_name: schema_b_name,
3207 check_reserved_names: false,
3208 };
3209
3210 match self
3211 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3212 Box::pin(async {})
3213 })
3214 .await
3215 {
3216 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3217 Err(err) => Err(err),
3218 }
3219 }
3220
3221 #[instrument]
3222 pub(super) async fn sequence_alter_role(
3223 &mut self,
3224 session: &Session,
3225 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3226 ) -> Result<ExecuteResponse, AdapterError> {
3227 let catalog = self.catalog().for_session(session);
3228 let role = catalog.get_role(&id);
3229
3230 let mut notices = vec![];
3232
3233 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3235 let mut vars = role.vars().clone();
3236
3237 let mut nopassword = false;
3240
3241 match option {
3243 PlannedAlterRoleOption::Attributes(attrs) => {
3244 self.validate_role_attributes(&attrs.clone().into())?;
3245
3246 if let Some(inherit) = attrs.inherit {
3247 attributes.inherit = inherit;
3248 }
3249
3250 if let Some(password) = attrs.password {
3251 attributes.password = Some(password);
3252 attributes.scram_iterations =
3253 Some(self.catalog().system_config().scram_iterations())
3254 }
3255
3256 if let Some(superuser) = attrs.superuser {
3257 attributes.superuser = Some(superuser);
3258 }
3259
3260 if let Some(login) = attrs.login {
3261 attributes.login = Some(login);
3262 }
3263
3264 if attrs.nopassword.unwrap_or(false) {
3265 nopassword = true;
3266 }
3267
3268 if let Some(notice) = self.should_emit_rbac_notice(session) {
3269 notices.push(notice);
3270 }
3271 }
3272 PlannedAlterRoleOption::Variable(variable) => {
3273 let session_var = session.vars().inspect(variable.name())?;
3275 session_var.visible(session.user(), catalog.system_vars())?;
3277
3278 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3281 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3282 } else if let PlannedRoleVariable::Set {
3283 name,
3284 value: VariableValue::Values(vals),
3285 } = &variable
3286 {
3287 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3288 notices.push(AdapterNotice::IntrospectionClusterUsage);
3289 }
3290 }
3291
3292 let var_name = match variable {
3293 PlannedRoleVariable::Set { name, value } => {
3294 match &value {
3296 VariableValue::Default => {
3297 vars.remove(&name);
3298 }
3299 VariableValue::Values(vals) => {
3300 let var = match &vals[..] {
3301 [val] => OwnedVarInput::Flat(val.clone()),
3302 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3303 };
3304 session_var.check(var.borrow())?;
3306
3307 vars.insert(name.clone(), var);
3308 }
3309 };
3310 name
3311 }
3312 PlannedRoleVariable::Reset { name } => {
3313 vars.remove(&name);
3315 name
3316 }
3317 };
3318
3319 notices.push(AdapterNotice::VarDefaultUpdated {
3321 role: Some(name.clone()),
3322 var_name: Some(var_name),
3323 });
3324 }
3325 }
3326
3327 let op = catalog::Op::AlterRole {
3328 id,
3329 name,
3330 attributes,
3331 nopassword,
3332 vars: RoleVars { map: vars },
3333 };
3334 let response = self
3335 .catalog_transact(Some(session), vec![op])
3336 .await
3337 .map(|_| ExecuteResponse::AlteredRole)?;
3338
3339 session.add_notices(notices);
3341
3342 Ok(response)
3343 }
3344
3345 #[instrument]
3346 pub(super) async fn sequence_alter_sink_prepare(
3347 &mut self,
3348 ctx: ExecuteContext,
3349 plan: plan::AlterSinkPlan,
3350 ) {
3351 let id_bundle = crate::CollectionIdBundle {
3353 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3354 compute_ids: BTreeMap::new(),
3355 };
3356 let read_hold = self.acquire_read_holds(&id_bundle);
3357
3358 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3359 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3360 return;
3361 };
3362
3363 let otel_ctx = OpenTelemetryContext::obtain();
3364 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3365
3366 let plan_validity = PlanValidity::new(
3367 self.catalog().transient_revision(),
3368 BTreeSet::from_iter([plan.item_id, from_item_id]),
3369 Some(plan.in_cluster),
3370 None,
3371 ctx.session().role_metadata().clone(),
3372 );
3373
3374 info!(
3375 "preparing alter sink for {}: frontiers={:?} export={:?}",
3376 plan.global_id,
3377 self.controller
3378 .storage_collections
3379 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3380 self.controller.storage.export(plan.global_id)
3381 );
3382
3383 self.install_storage_watch_set(
3389 ctx.session().conn_id().clone(),
3390 BTreeSet::from_iter([plan.global_id]),
3391 read_ts,
3392 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3393 ctx: Some(ctx),
3394 otel_ctx,
3395 plan,
3396 plan_validity,
3397 read_hold,
3398 }),
3399 ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
3400 }
3401
3402 #[instrument]
3403 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3404 ctx.otel_ctx.attach_as_parent();
3405
3406 let plan::AlterSinkPlan {
3407 item_id,
3408 global_id,
3409 sink: sink_plan,
3410 with_snapshot,
3411 in_cluster,
3412 } = ctx.plan.clone();
3413
3414 match ctx.plan_validity.check(self.catalog()) {
3423 Ok(()) => {}
3424 Err(err) => {
3425 ctx.retire(Err(err));
3426 return;
3427 }
3428 }
3429
3430 let entry = self.catalog().get_entry(&item_id);
3431 let CatalogItem::Sink(old_sink) = entry.item() else {
3432 panic!("invalid item kind for `AlterSinkPlan`");
3433 };
3434
3435 if sink_plan.version != old_sink.version + 1 {
3436 ctx.retire(Err(AdapterError::ChangedPlan(
3437 "sink was altered concurrently".into(),
3438 )));
3439 return;
3440 }
3441
3442 info!(
3443 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3444 self.controller
3445 .storage_collections
3446 .collections_frontiers(vec![global_id, sink_plan.from]),
3447 self.controller.storage.export(global_id),
3448 );
3449
3450 let write_frontier = &self
3453 .controller
3454 .storage
3455 .export(global_id)
3456 .expect("sink known to exist")
3457 .write_frontier;
3458 let as_of = ctx.read_hold.least_valid_read();
3459 assert!(
3460 write_frontier.iter().all(|t| as_of.less_than(t)),
3461 "{:?} should be strictly less than {:?}",
3462 &*as_of,
3463 &**write_frontier
3464 );
3465
3466 let create_sql = &old_sink.create_sql;
3472 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3473 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3474 unreachable!("invalid statement kind for sink");
3475 };
3476
3477 stmt.with_options
3479 .retain(|o| o.name != CreateSinkOptionName::Version);
3480 stmt.with_options.push(CreateSinkOption {
3481 name: CreateSinkOptionName::Version,
3482 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3483 sink_plan.version.to_string(),
3484 ))),
3485 });
3486
3487 let conn_catalog = self.catalog().for_system_session();
3488 let (mut stmt, resolved_ids) =
3489 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3490
3491 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3493 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3494 stmt.from = ResolvedItemName::Item {
3495 id: from_entry.id(),
3496 qualifiers: from_entry.name.qualifiers.clone(),
3497 full_name,
3498 print_id: true,
3499 version: from_entry.version,
3500 };
3501
3502 let new_sink = Sink {
3503 create_sql: stmt.to_ast_string_stable(),
3504 global_id,
3505 from: sink_plan.from,
3506 connection: sink_plan.connection.clone(),
3507 envelope: sink_plan.envelope,
3508 version: sink_plan.version,
3509 with_snapshot,
3510 resolved_ids: resolved_ids.clone(),
3511 cluster_id: in_cluster,
3512 commit_interval: sink_plan.commit_interval,
3513 };
3514
3515 let ops = vec![catalog::Op::UpdateItem {
3516 id: item_id,
3517 name: entry.name().clone(),
3518 to_item: CatalogItem::Sink(new_sink),
3519 }];
3520
3521 match self
3522 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3523 .await
3524 {
3525 Ok(()) => {}
3526 Err(err) => {
3527 ctx.retire(Err(err));
3528 return;
3529 }
3530 }
3531
3532 let storage_sink_desc = StorageSinkDesc {
3533 from: sink_plan.from,
3534 from_desc: from_entry
3535 .relation_desc()
3536 .expect("sinks can only be built on items with descs")
3537 .into_owned(),
3538 connection: sink_plan
3539 .connection
3540 .clone()
3541 .into_inline_connection(self.catalog().state()),
3542 envelope: sink_plan.envelope,
3543 as_of,
3544 with_snapshot,
3545 version: sink_plan.version,
3546 from_storage_metadata: (),
3547 to_storage_metadata: (),
3548 commit_interval: sink_plan.commit_interval,
3549 };
3550
3551 self.controller
3552 .storage
3553 .alter_export(
3554 global_id,
3555 ExportDescription {
3556 sink: storage_sink_desc,
3557 instance_id: in_cluster,
3558 },
3559 )
3560 .await
3561 .unwrap_or_terminate("cannot fail to alter source desc");
3562
3563 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3564 }
3565
3566 #[instrument]
3567 pub(super) async fn sequence_alter_connection(
3568 &mut self,
3569 ctx: ExecuteContext,
3570 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3571 ) {
3572 match action {
3573 AlterConnectionAction::RotateKeys => {
3574 self.sequence_rotate_keys(ctx, id).await;
3575 }
3576 AlterConnectionAction::AlterOptions {
3577 set_options,
3578 drop_options,
3579 validate,
3580 } => {
3581 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3582 .await
3583 }
3584 }
3585 }
3586
3587 #[instrument]
3588 async fn sequence_alter_connection_options(
3589 &mut self,
3590 mut ctx: ExecuteContext,
3591 id: CatalogItemId,
3592 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3593 drop_options: BTreeSet<ConnectionOptionName>,
3594 validate: bool,
3595 ) {
3596 let cur_entry = self.catalog().get_entry(&id);
3597 let cur_conn = cur_entry.connection().expect("known to be connection");
3598 let connection_gid = cur_conn.global_id();
3599
3600 let inner = || -> Result<Connection, AdapterError> {
3601 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3603 .expect("invalid create sql persisted to catalog")
3604 .into_element()
3605 .ast
3606 {
3607 Statement::CreateConnection(stmt) => stmt,
3608 _ => unreachable!("proved type is source"),
3609 };
3610
3611 let catalog = self.catalog().for_system_session();
3612
3613 let (mut create_conn_stmt, resolved_ids) =
3615 mz_sql::names::resolve(&catalog, create_conn_stmt)
3616 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3617
3618 create_conn_stmt
3620 .values
3621 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3622
3623 create_conn_stmt.values.extend(
3625 set_options
3626 .into_iter()
3627 .map(|(name, value)| ConnectionOption { name, value }),
3628 );
3629
3630 let mut catalog = self.catalog().for_system_session();
3633 catalog.mark_id_unresolvable_for_replanning(id);
3634
3635 let plan = match mz_sql::plan::plan(
3637 None,
3638 &catalog,
3639 Statement::CreateConnection(create_conn_stmt),
3640 &Params::empty(),
3641 &resolved_ids,
3642 )
3643 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3644 {
3645 (Plan::CreateConnection(plan), _sql_impl_ids) => plan,
3646 (p, _) => {
3647 unreachable!("create connection plan is only valid response, got {:?}", p)
3648 }
3649 };
3650
3651 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3653 .expect("invalid create sql persisted to catalog")
3654 .into_element()
3655 .ast
3656 {
3657 Statement::CreateConnection(stmt) => stmt,
3658 _ => unreachable!("proved type is source"),
3659 };
3660
3661 let catalog = self.catalog().for_system_session();
3662
3663 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3665 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3666
3667 Ok(Connection {
3668 create_sql: plan.connection.create_sql,
3669 global_id: cur_conn.global_id,
3670 details: plan.connection.details,
3671 resolved_ids: new_deps,
3672 })
3673 };
3674
3675 let conn = match inner() {
3676 Ok(conn) => conn,
3677 Err(e) => {
3678 return ctx.retire(Err(e));
3679 }
3680 };
3681
3682 if let Err(err) = self
3685 .check_connection_secret_content_guards(&conn.details)
3686 .await
3687 {
3688 return ctx.retire(Err(err));
3689 }
3690
3691 if validate {
3692 let connection = conn
3693 .details
3694 .to_connection()
3695 .into_inline_connection(self.catalog().state());
3696
3697 let internal_cmd_tx = self.internal_cmd_tx.clone();
3698 let transient_revision = self.catalog().transient_revision();
3699 let conn_id = ctx.session().conn_id().clone();
3700 let otel_ctx = OpenTelemetryContext::obtain();
3701 let role_metadata = ctx.session().role_metadata().clone();
3702 let current_storage_parameters = self.controller.storage.config().clone();
3703
3704 task::spawn(
3705 || format!("validate_alter_connection:{conn_id}"),
3706 async move {
3707 let resolved_ids = conn.resolved_ids.clone();
3708 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3709 let result = match std::panic::AssertUnwindSafe(
3710 connection.validate(id, ¤t_storage_parameters),
3711 )
3712 .ore_catch_unwind()
3713 .await
3714 {
3715 Ok(Ok(())) => Ok(conn),
3716 Ok(Err(err)) => Err(err.into()),
3717 Err(_panic) => {
3718 tracing::error!("alter connection validation panicked");
3719 Err(AdapterError::Internal(
3720 "connection validation panicked".into(),
3721 ))
3722 }
3723 };
3724
3725 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3727 AlterConnectionValidationReady {
3728 ctx,
3729 result,
3730 connection_id: id,
3731 connection_gid,
3732 plan_validity: PlanValidity::new(
3733 transient_revision,
3734 dependency_ids.clone(),
3735 None,
3736 None,
3737 role_metadata,
3738 ),
3739 otel_ctx,
3740 resolved_ids,
3741 },
3742 ));
3743 if let Err(e) = result {
3744 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3745 }
3746 },
3747 );
3748 } else {
3749 let result = self
3750 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3751 .await;
3752 ctx.retire(result);
3753 }
3754 }
3755
3756 #[instrument]
3757 pub(crate) async fn sequence_alter_connection_stage_finish(
3758 &mut self,
3759 session: &Session,
3760 id: CatalogItemId,
3761 connection: Connection,
3762 ) -> Result<ExecuteResponse, AdapterError> {
3763 match self.catalog.get_entry(&id).item() {
3764 CatalogItem::Connection(curr_conn) => {
3765 curr_conn
3766 .details
3767 .to_connection()
3768 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3769 .map_err(StorageError::from)?;
3770 }
3771 _ => unreachable!("known to be a connection"),
3772 };
3773
3774 let ops = vec![catalog::Op::UpdateItem {
3775 id,
3776 name: self.catalog.get_entry(&id).name().clone(),
3777 to_item: CatalogItem::Connection(connection.clone()),
3778 }];
3779
3780 self.catalog_transact(Some(session), ops).await?;
3781
3782 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3789 }
3790
3791 #[instrument]
3792 pub(super) async fn sequence_alter_source(
3793 &mut self,
3794 session: &Session,
3795 plan::AlterSourcePlan {
3796 item_id,
3797 ingestion_id,
3798 action,
3799 }: plan::AlterSourcePlan,
3800 ) -> Result<ExecuteResponse, AdapterError> {
3801 let cur_entry = self.catalog().get_entry(&item_id);
3802 let cur_source = cur_entry.source().expect("known to be source");
3803
3804 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3805 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
3807 .expect("invalid create sql persisted to catalog")
3808 .into_element()
3809 .ast
3810 {
3811 Statement::CreateSource(stmt) => stmt,
3812 _ => unreachable!("proved type is source"),
3813 };
3814
3815 let catalog = coord.catalog().for_system_session();
3816
3817 mz_sql::names::resolve(&catalog, create_source_stmt)
3819 .map_err(|e| AdapterError::internal(err_cx, e))
3820 };
3821
3822 match action {
3823 plan::AlterSourceAction::AddSubsourceExports {
3824 subsources,
3825 options,
3826 } => {
3827 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
3828
3829 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
3830 text_columns: mut new_text_columns,
3831 exclude_columns: mut new_exclude_columns,
3832 ..
3833 } = options.try_into()?;
3834
3835 let (mut create_source_stmt, resolved_ids) =
3837 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
3838
3839 let catalog = self.catalog();
3841 let curr_references: BTreeSet<_> = catalog
3842 .get_entry(&item_id)
3843 .used_by()
3844 .into_iter()
3845 .filter_map(|subsource| {
3846 catalog
3847 .get_entry(subsource)
3848 .subsource_details()
3849 .map(|(_id, reference, _details)| reference)
3850 })
3851 .collect();
3852
3853 let purification_err =
3856 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
3857
3858 match &mut create_source_stmt.connection {
3862 CreateSourceConnection::Postgres {
3863 options: curr_options,
3864 ..
3865 } => {
3866 let mz_sql::plan::PgConfigOptionExtracted {
3867 mut text_columns, ..
3868 } = curr_options.clone().try_into()?;
3869
3870 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
3873
3874 text_columns.retain(|column_qualified_reference| {
3876 mz_ore::soft_assert_eq_or_log!(
3877 column_qualified_reference.0.len(),
3878 4,
3879 "all TEXT COLUMNS values must be column-qualified references"
3880 );
3881 let mut table = column_qualified_reference.clone();
3882 table.0.truncate(3);
3883 curr_references.contains(&table)
3884 });
3885
3886 new_text_columns.extend(text_columns);
3888
3889 if !new_text_columns.is_empty() {
3891 new_text_columns.sort();
3892 let new_text_columns = new_text_columns
3893 .into_iter()
3894 .map(WithOptionValue::UnresolvedItemName)
3895 .collect();
3896
3897 curr_options.push(PgConfigOption {
3898 name: PgConfigOptionName::TextColumns,
3899 value: Some(WithOptionValue::Sequence(new_text_columns)),
3900 });
3901 }
3902 }
3903 CreateSourceConnection::MySql {
3904 options: curr_options,
3905 ..
3906 } => {
3907 let mz_sql::plan::MySqlConfigOptionExtracted {
3908 mut text_columns,
3909 mut exclude_columns,
3910 ..
3911 } = curr_options.clone().try_into()?;
3912
3913 curr_options.retain(|o| {
3916 !matches!(
3917 o.name,
3918 MySqlConfigOptionName::TextColumns
3919 | MySqlConfigOptionName::ExcludeColumns
3920 )
3921 });
3922
3923 let column_referenced =
3925 |column_qualified_reference: &UnresolvedItemName| {
3926 mz_ore::soft_assert_eq_or_log!(
3927 column_qualified_reference.0.len(),
3928 3,
3929 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
3930 );
3931 let mut table = column_qualified_reference.clone();
3932 table.0.truncate(2);
3933 curr_references.contains(&table)
3934 };
3935 text_columns.retain(column_referenced);
3936 exclude_columns.retain(column_referenced);
3937
3938 new_text_columns.extend(text_columns);
3940 new_exclude_columns.extend(exclude_columns);
3941
3942 if !new_text_columns.is_empty() {
3944 new_text_columns.sort();
3945 let new_text_columns = new_text_columns
3946 .into_iter()
3947 .map(WithOptionValue::UnresolvedItemName)
3948 .collect();
3949
3950 curr_options.push(MySqlConfigOption {
3951 name: MySqlConfigOptionName::TextColumns,
3952 value: Some(WithOptionValue::Sequence(new_text_columns)),
3953 });
3954 }
3955 if !new_exclude_columns.is_empty() {
3957 new_exclude_columns.sort();
3958 let new_exclude_columns = new_exclude_columns
3959 .into_iter()
3960 .map(WithOptionValue::UnresolvedItemName)
3961 .collect();
3962
3963 curr_options.push(MySqlConfigOption {
3964 name: MySqlConfigOptionName::ExcludeColumns,
3965 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
3966 });
3967 }
3968 }
3969 CreateSourceConnection::SqlServer {
3970 options: curr_options,
3971 ..
3972 } => {
3973 let mz_sql::plan::SqlServerConfigOptionExtracted {
3974 mut text_columns,
3975 mut exclude_columns,
3976 ..
3977 } = curr_options.clone().try_into()?;
3978
3979 curr_options.retain(|o| {
3982 !matches!(
3983 o.name,
3984 SqlServerConfigOptionName::TextColumns
3985 | SqlServerConfigOptionName::ExcludeColumns
3986 )
3987 });
3988
3989 let column_referenced =
3995 |column_qualified_reference: &UnresolvedItemName| {
3996 mz_ore::soft_assert_eq_or_log!(
3997 column_qualified_reference.0.len(),
3998 3,
3999 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4000 );
4001 let mut table = column_qualified_reference.clone();
4002 table.0.truncate(2);
4003 curr_references.iter().any(|r| r.0.ends_with(&table.0))
4004 };
4005 text_columns.retain(column_referenced);
4006 exclude_columns.retain(column_referenced);
4007
4008 new_text_columns.extend(text_columns);
4010 new_exclude_columns.extend(exclude_columns);
4011
4012 if !new_text_columns.is_empty() {
4014 new_text_columns.sort();
4015 let new_text_columns = new_text_columns
4016 .into_iter()
4017 .map(WithOptionValue::UnresolvedItemName)
4018 .collect();
4019
4020 curr_options.push(SqlServerConfigOption {
4021 name: SqlServerConfigOptionName::TextColumns,
4022 value: Some(WithOptionValue::Sequence(new_text_columns)),
4023 });
4024 }
4025 if !new_exclude_columns.is_empty() {
4027 new_exclude_columns.sort();
4028 let new_exclude_columns = new_exclude_columns
4029 .into_iter()
4030 .map(WithOptionValue::UnresolvedItemName)
4031 .collect();
4032
4033 curr_options.push(SqlServerConfigOption {
4034 name: SqlServerConfigOptionName::ExcludeColumns,
4035 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4036 });
4037 }
4038 }
4039 _ => return Err(purification_err()),
4040 };
4041
4042 let mut catalog = self.catalog().for_system_session();
4043 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4044
4045 let planned = mz_sql::plan::plan(
4047 None,
4048 &catalog,
4049 Statement::CreateSource(create_source_stmt),
4050 &Params::empty(),
4051 &resolved_ids,
4052 )
4053 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4054 let plan = match planned {
4055 (Plan::CreateSource(plan), _sql_impl_ids) => plan,
4056 (p, _) => {
4057 unreachable!("create source plan is only valid response, got {:?}", p)
4058 }
4059 };
4060
4061 let source = Source::new(
4065 plan,
4066 cur_source.global_id,
4067 resolved_ids,
4068 cur_source.custom_logical_compaction_window,
4069 cur_source.is_retained_metrics_object,
4070 );
4071
4072 let desc = match &source.data_source {
4074 DataSourceDesc::Ingestion { desc, .. }
4075 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4076 desc.clone().into_inline_connection(self.catalog().state())
4077 }
4078 _ => unreachable!("already verified of type ingestion"),
4079 };
4080
4081 self.controller
4082 .storage
4083 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4084 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4085
4086 let mut ops = vec![catalog::Op::UpdateItem {
4089 id: item_id,
4090 name: self.catalog.get_entry(&item_id).name().clone(),
4093 to_item: CatalogItem::Source(source),
4094 }];
4095
4096 let CreateSourceInner {
4097 ops: new_ops,
4098 sources: _,
4099 if_not_exists_ids,
4100 } = self.create_source_inner(session, subsources).await?;
4101
4102 ops.extend(new_ops.into_iter());
4103
4104 assert!(
4105 if_not_exists_ids.is_empty(),
4106 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4107 );
4108
4109 self.catalog_transact(Some(session), ops).await?;
4110 }
4111 plan::AlterSourceAction::RefreshReferences { references } => {
4112 self.catalog_transact(
4113 Some(session),
4114 vec![catalog::Op::UpdateSourceReferences {
4115 source_id: item_id,
4116 references: references.into(),
4117 }],
4118 )
4119 .await?;
4120 }
4121 }
4122
4123 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4124 }
4125
4126 #[instrument]
4127 pub(super) async fn sequence_alter_system_set(
4128 &mut self,
4129 session: &Session,
4130 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4131 ) -> Result<ExecuteResponse, AdapterError> {
4132 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4133 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4135 self.validate_alter_system_network_policy(session, &value)?;
4136 }
4137
4138 let op = match value {
4139 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4140 name: name.clone(),
4141 value: OwnedVarInput::SqlSet(values),
4142 },
4143 plan::VariableValue::Default => {
4144 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4145 }
4146 };
4147 self.catalog_transact(Some(session), vec![op]).await?;
4148
4149 session.add_notice(AdapterNotice::VarDefaultUpdated {
4150 role: None,
4151 var_name: Some(name),
4152 });
4153 Ok(ExecuteResponse::AlteredSystemConfiguration)
4154 }
4155
4156 #[instrument]
4157 pub(super) async fn sequence_alter_system_reset(
4158 &mut self,
4159 session: &Session,
4160 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4161 ) -> Result<ExecuteResponse, AdapterError> {
4162 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4163 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4164 self.catalog_transact(Some(session), vec![op]).await?;
4165 session.add_notice(AdapterNotice::VarDefaultUpdated {
4166 role: None,
4167 var_name: Some(name),
4168 });
4169 Ok(ExecuteResponse::AlteredSystemConfiguration)
4170 }
4171
4172 #[instrument]
4173 pub(super) async fn sequence_alter_system_reset_all(
4174 &mut self,
4175 session: &Session,
4176 _: plan::AlterSystemResetAllPlan,
4177 ) -> Result<ExecuteResponse, AdapterError> {
4178 self.is_user_allowed_to_alter_system(session, None)?;
4179 let op = catalog::Op::ResetAllSystemConfiguration;
4180 self.catalog_transact(Some(session), vec![op]).await?;
4181 session.add_notice(AdapterNotice::VarDefaultUpdated {
4182 role: None,
4183 var_name: None,
4184 });
4185 Ok(ExecuteResponse::AlteredSystemConfiguration)
4186 }
4187
4188 fn is_user_allowed_to_alter_system(
4190 &self,
4191 session: &Session,
4192 var_name: Option<&str>,
4193 ) -> Result<(), AdapterError> {
4194 match (session.user().kind(), var_name) {
4195 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4197 (UserKind::Superuser, Some(name))
4199 if session.user().is_internal()
4200 || self.catalog().system_config().user_modifiable(name) =>
4201 {
4202 let var = self.catalog().system_config().get(name)?;
4205 var.visible(session.user(), self.catalog().system_config())?;
4206 Ok(())
4207 }
4208 (UserKind::Regular, Some(name))
4211 if self.catalog().system_config().user_modifiable(name) =>
4212 {
4213 Err(AdapterError::Unauthorized(
4214 rbac::UnauthorizedError::Superuser {
4215 action: format!("toggle the '{name}' system configuration parameter"),
4216 },
4217 ))
4218 }
4219 _ => Err(AdapterError::Unauthorized(
4220 rbac::UnauthorizedError::MzSystem {
4221 action: "alter system".into(),
4222 },
4223 )),
4224 }
4225 }
4226
4227 fn validate_alter_system_network_policy(
4228 &self,
4229 session: &Session,
4230 policy_value: &plan::VariableValue,
4231 ) -> Result<(), AdapterError> {
4232 let policy_name = match &policy_value {
4233 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4235 plan::VariableValue::Values(values) if values.len() == 1 => {
4236 values.iter().next().cloned()
4237 }
4238 plan::VariableValue::Values(values) => {
4239 tracing::warn!(?values, "can't set multiple network policies at once");
4240 None
4241 }
4242 };
4243 let maybe_network_policy = policy_name
4244 .as_ref()
4245 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4246 let Some(network_policy) = maybe_network_policy else {
4247 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4248 VarError::InvalidParameterValue {
4249 name: NETWORK_POLICY.name(),
4250 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4251 reason: "no network policy with such name exists".to_string(),
4252 },
4253 )));
4254 };
4255 self.validate_alter_network_policy(session, &network_policy.rules)
4256 }
4257
4258 fn validate_alter_network_policy(
4263 &self,
4264 session: &Session,
4265 policy_rules: &Vec<NetworkPolicyRule>,
4266 ) -> Result<(), AdapterError> {
4267 if session.user().is_internal() {
4270 return Ok(());
4271 }
4272 if let Some(ip) = session.meta().client_ip() {
4273 validate_ip_with_policy_rules(ip, policy_rules)
4274 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4275 } else {
4276 return Err(AdapterError::NetworkPolicyDenied(
4279 NetworkPolicyError::MissingIp,
4280 ));
4281 }
4282 Ok(())
4283 }
4284
4285 #[instrument]
4287 pub(super) fn sequence_execute(
4288 &self,
4289 session: &mut Session,
4290 plan: plan::ExecutePlan,
4291 ) -> Result<String, AdapterError> {
4292 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4294 let ps = session
4295 .get_prepared_statement_unverified(&plan.name)
4296 .expect("known to exist");
4297 let stmt = ps.stmt().cloned();
4298 let desc = ps.desc().clone();
4299 let state_revision = ps.state_revision;
4300 let logging = Arc::clone(ps.logging());
4301 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4302 }
4303
4304 #[instrument]
4305 pub(super) async fn sequence_grant_privileges(
4306 &mut self,
4307 session: &Session,
4308 plan::GrantPrivilegesPlan {
4309 update_privileges,
4310 grantees,
4311 }: plan::GrantPrivilegesPlan,
4312 ) -> Result<ExecuteResponse, AdapterError> {
4313 self.sequence_update_privileges(
4314 session,
4315 update_privileges,
4316 grantees,
4317 UpdatePrivilegeVariant::Grant,
4318 )
4319 .await
4320 }
4321
4322 #[instrument]
4323 pub(super) async fn sequence_revoke_privileges(
4324 &mut self,
4325 session: &Session,
4326 plan::RevokePrivilegesPlan {
4327 update_privileges,
4328 revokees,
4329 }: plan::RevokePrivilegesPlan,
4330 ) -> Result<ExecuteResponse, AdapterError> {
4331 self.sequence_update_privileges(
4332 session,
4333 update_privileges,
4334 revokees,
4335 UpdatePrivilegeVariant::Revoke,
4336 )
4337 .await
4338 }
4339
4340 #[instrument]
4341 async fn sequence_update_privileges(
4342 &mut self,
4343 session: &Session,
4344 update_privileges: Vec<UpdatePrivilege>,
4345 grantees: Vec<RoleId>,
4346 variant: UpdatePrivilegeVariant,
4347 ) -> Result<ExecuteResponse, AdapterError> {
4348 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4349 let mut warnings = Vec::new();
4350 let catalog = self.catalog().for_session(session);
4351
4352 for UpdatePrivilege {
4353 acl_mode,
4354 target_id,
4355 grantor,
4356 acl_from_all,
4357 } in update_privileges
4358 {
4359 let actual_object_type = catalog.get_system_object_type(&target_id);
4360 if actual_object_type.is_relation() && !acl_from_all {
4366 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4367 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4368 if !non_applicable_privileges.is_empty() {
4369 let object_description =
4370 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4371 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4372 non_applicable_privileges,
4373 object_description,
4374 })
4375 }
4376 }
4377
4378 if let SystemObjectId::Object(object_id) = &target_id {
4379 self.catalog()
4380 .ensure_not_reserved_object(object_id, session.conn_id())?;
4381 }
4382
4383 let privileges = self
4384 .catalog()
4385 .get_privileges(&target_id, session.conn_id())
4386 .ok_or(AdapterError::Unsupported(
4389 "GRANTs/REVOKEs on an object type with no privileges",
4390 ))?;
4391
4392 for grantee in &grantees {
4393 self.catalog().ensure_not_system_role(grantee)?;
4394 self.catalog().ensure_not_predefined_role(grantee)?;
4395 let existing_privilege = privileges
4396 .get_acl_item(grantee, &grantor)
4397 .map(Cow::Borrowed)
4398 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4399
4400 match variant {
4401 UpdatePrivilegeVariant::Grant
4402 if !existing_privilege.acl_mode.contains(acl_mode) =>
4403 {
4404 ops.push(catalog::Op::UpdatePrivilege {
4405 target_id: target_id.clone(),
4406 privilege: MzAclItem {
4407 grantee: *grantee,
4408 grantor,
4409 acl_mode,
4410 },
4411 variant,
4412 });
4413 }
4414 UpdatePrivilegeVariant::Revoke
4415 if !existing_privilege
4416 .acl_mode
4417 .intersection(acl_mode)
4418 .is_empty() =>
4419 {
4420 ops.push(catalog::Op::UpdatePrivilege {
4421 target_id: target_id.clone(),
4422 privilege: MzAclItem {
4423 grantee: *grantee,
4424 grantor,
4425 acl_mode,
4426 },
4427 variant,
4428 });
4429 }
4430 _ => {}
4432 }
4433 }
4434 }
4435
4436 if ops.is_empty() {
4437 session.add_notices(warnings);
4438 return Ok(variant.into());
4439 }
4440
4441 let res = self
4442 .catalog_transact(Some(session), ops)
4443 .await
4444 .map(|_| match variant {
4445 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4446 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4447 });
4448 if res.is_ok() {
4449 session.add_notices(warnings);
4450 }
4451 res
4452 }
4453
4454 #[instrument]
4455 pub(super) async fn sequence_alter_default_privileges(
4456 &mut self,
4457 session: &Session,
4458 plan::AlterDefaultPrivilegesPlan {
4459 privilege_objects,
4460 privilege_acl_items,
4461 is_grant,
4462 }: plan::AlterDefaultPrivilegesPlan,
4463 ) -> Result<ExecuteResponse, AdapterError> {
4464 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4465 let variant = if is_grant {
4466 UpdatePrivilegeVariant::Grant
4467 } else {
4468 UpdatePrivilegeVariant::Revoke
4469 };
4470 for privilege_object in &privilege_objects {
4471 self.catalog()
4472 .ensure_not_system_role(&privilege_object.role_id)?;
4473 self.catalog()
4474 .ensure_not_predefined_role(&privilege_object.role_id)?;
4475 if let Some(database_id) = privilege_object.database_id {
4476 self.catalog()
4477 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4478 }
4479 if let Some(schema_id) = privilege_object.schema_id {
4480 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4481 let schema_spec: SchemaSpecifier = schema_id.into();
4482
4483 self.catalog().ensure_not_reserved_object(
4484 &(database_spec, schema_spec).into(),
4485 session.conn_id(),
4486 )?;
4487 }
4488 for privilege_acl_item in &privilege_acl_items {
4489 self.catalog()
4490 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4491 self.catalog()
4492 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4493 ops.push(catalog::Op::UpdateDefaultPrivilege {
4494 privilege_object: privilege_object.clone(),
4495 privilege_acl_item: privilege_acl_item.clone(),
4496 variant,
4497 })
4498 }
4499 }
4500
4501 self.catalog_transact(Some(session), ops).await?;
4502 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4503 }
4504
4505 #[instrument]
4506 pub(super) async fn sequence_grant_role(
4507 &mut self,
4508 session: &Session,
4509 plan::GrantRolePlan {
4510 role_ids,
4511 member_ids,
4512 grantor_id,
4513 }: plan::GrantRolePlan,
4514 ) -> Result<ExecuteResponse, AdapterError> {
4515 let catalog = self.catalog();
4516 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4517 for role_id in role_ids {
4518 for member_id in &member_ids {
4519 let member_membership: BTreeSet<_> =
4520 catalog.get_role(member_id).membership().keys().collect();
4521 if member_membership.contains(&role_id) {
4522 let role_name = catalog.get_role(&role_id).name().to_string();
4523 let member_name = catalog.get_role(member_id).name().to_string();
4524 catalog.ensure_not_reserved_role(member_id)?;
4526 catalog.ensure_grantable_role(&role_id)?;
4527 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4528 role_name,
4529 member_name,
4530 });
4531 } else {
4532 ops.push(catalog::Op::GrantRole {
4533 role_id,
4534 member_id: *member_id,
4535 grantor_id,
4536 });
4537 }
4538 }
4539 }
4540
4541 if ops.is_empty() {
4542 return Ok(ExecuteResponse::GrantedRole);
4543 }
4544
4545 self.catalog_transact(Some(session), ops)
4546 .await
4547 .map(|_| ExecuteResponse::GrantedRole)
4548 }
4549
4550 #[instrument]
4551 pub(super) async fn sequence_revoke_role(
4552 &mut self,
4553 session: &Session,
4554 plan::RevokeRolePlan {
4555 role_ids,
4556 member_ids,
4557 grantor_id,
4558 }: plan::RevokeRolePlan,
4559 ) -> Result<ExecuteResponse, AdapterError> {
4560 let catalog = self.catalog();
4561 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4562 for role_id in role_ids {
4563 for member_id in &member_ids {
4564 let member_membership: BTreeSet<_> =
4565 catalog.get_role(member_id).membership().keys().collect();
4566 if !member_membership.contains(&role_id) {
4567 let role_name = catalog.get_role(&role_id).name().to_string();
4568 let member_name = catalog.get_role(member_id).name().to_string();
4569 catalog.ensure_not_reserved_role(member_id)?;
4571 catalog.ensure_grantable_role(&role_id)?;
4572 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4573 role_name,
4574 member_name,
4575 });
4576 } else {
4577 ops.push(catalog::Op::RevokeRole {
4578 role_id,
4579 member_id: *member_id,
4580 grantor_id,
4581 });
4582 }
4583 }
4584 }
4585
4586 if ops.is_empty() {
4587 return Ok(ExecuteResponse::RevokedRole);
4588 }
4589
4590 self.catalog_transact(Some(session), ops)
4591 .await
4592 .map(|_| ExecuteResponse::RevokedRole)
4593 }
4594
4595 #[instrument]
4596 pub(super) async fn sequence_alter_owner(
4597 &mut self,
4598 session: &Session,
4599 plan::AlterOwnerPlan {
4600 id,
4601 object_type,
4602 new_owner,
4603 }: plan::AlterOwnerPlan,
4604 ) -> Result<ExecuteResponse, AdapterError> {
4605 let mut ops = vec![catalog::Op::UpdateOwner {
4606 id: id.clone(),
4607 new_owner,
4608 }];
4609
4610 match &id {
4611 ObjectId::Item(global_id) => {
4612 let entry = self.catalog().get_entry(global_id);
4613
4614 if entry.is_index() {
4616 let name = self
4617 .catalog()
4618 .resolve_full_name(entry.name(), Some(session.conn_id()))
4619 .to_string();
4620 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4621 return Ok(ExecuteResponse::AlteredObject(object_type));
4622 }
4623
4624 let dependent_index_ops = entry
4626 .used_by()
4627 .into_iter()
4628 .filter(|id| self.catalog().get_entry(id).is_index())
4629 .map(|id| catalog::Op::UpdateOwner {
4630 id: ObjectId::Item(*id),
4631 new_owner,
4632 });
4633 ops.extend(dependent_index_ops);
4634
4635 let dependent_subsources =
4637 entry
4638 .progress_id()
4639 .into_iter()
4640 .map(|item_id| catalog::Op::UpdateOwner {
4641 id: ObjectId::Item(item_id),
4642 new_owner,
4643 });
4644 ops.extend(dependent_subsources);
4645 }
4646 ObjectId::Cluster(cluster_id) => {
4647 let cluster = self.catalog().get_cluster(*cluster_id);
4648 let managed_cluster_replica_ops =
4650 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4651 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4652 new_owner,
4653 });
4654 ops.extend(managed_cluster_replica_ops);
4655 }
4656 _ => {}
4657 }
4658
4659 self.catalog_transact(Some(session), ops)
4660 .await
4661 .map(|_| ExecuteResponse::AlteredObject(object_type))
4662 }
4663
4664 #[instrument]
4665 pub(super) async fn sequence_reassign_owned(
4666 &mut self,
4667 session: &Session,
4668 plan::ReassignOwnedPlan {
4669 old_roles,
4670 new_role,
4671 reassign_ids,
4672 }: plan::ReassignOwnedPlan,
4673 ) -> Result<ExecuteResponse, AdapterError> {
4674 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4675 self.catalog().ensure_not_reserved_role(role_id)?;
4676 }
4677
4678 let ops = reassign_ids
4679 .into_iter()
4680 .map(|id| catalog::Op::UpdateOwner {
4681 id,
4682 new_owner: new_role,
4683 })
4684 .collect();
4685
4686 self.catalog_transact(Some(session), ops)
4687 .await
4688 .map(|_| ExecuteResponse::ReassignOwned)
4689 }
4690
4691 #[instrument]
4692 pub(crate) async fn handle_deferred_statement(&mut self) {
4693 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4697 return;
4698 };
4699 match ps {
4700 crate::coord::PlanStatement::Statement { stmt, params } => {
4701 self.handle_execute_inner(stmt, params, ctx).await;
4702 }
4703 crate::coord::PlanStatement::Plan {
4704 plan,
4705 resolved_ids,
4706 sql_impl_resolved_ids,
4707 } => {
4708 self.sequence_plan(ctx, plan, resolved_ids, sql_impl_resolved_ids)
4709 .await;
4710 }
4711 }
4712 }
4713
4714 #[instrument]
4715 #[allow(clippy::unused_async)]
4717 pub(super) async fn sequence_alter_table(
4718 &mut self,
4719 ctx: &mut ExecuteContext,
4720 plan: plan::AlterTablePlan,
4721 ) -> Result<ExecuteResponse, AdapterError> {
4722 let plan::AlterTablePlan {
4723 relation_id,
4724 column_name,
4725 column_type,
4726 raw_sql_type,
4727 } = plan;
4728
4729 let (_, new_global_id) = self.allocate_user_id().await?;
4731 let ops = vec![catalog::Op::AlterAddColumn {
4732 id: relation_id,
4733 new_global_id,
4734 name: column_name,
4735 typ: column_type,
4736 sql: raw_sql_type,
4737 }];
4738
4739 self.catalog_transact_with_context(None, Some(ctx), ops)
4740 .await?;
4741
4742 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
4743 }
4744
4745 #[instrument]
4747 pub(super) async fn sequence_alter_materialized_view_apply_replacement_prepare(
4748 &mut self,
4749 ctx: ExecuteContext,
4750 plan: AlterMaterializedViewApplyReplacementPlan,
4751 ) {
4752 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan.clone();
4762
4763 let plan_validity = PlanValidity::new(
4764 self.catalog().transient_revision(),
4765 BTreeSet::from_iter([id, replacement_id]),
4766 None,
4767 None,
4768 ctx.session().role_metadata().clone(),
4769 );
4770
4771 let target = self.catalog.get_entry(&id);
4772 let target_gid = target.latest_global_id();
4773
4774 let replacement = self.catalog.get_entry(&replacement_id);
4775 let replacement_gid = replacement.latest_global_id();
4776
4777 let target_upper = self
4778 .controller
4779 .storage_collections
4780 .collection_frontiers(target_gid)
4781 .expect("target MV exists")
4782 .write_frontier;
4783 let replacement_upper = self
4784 .controller
4785 .compute
4786 .collection_frontiers(replacement_gid, replacement.cluster_id())
4787 .expect("replacement MV exists")
4788 .write_frontier;
4789
4790 info!(
4791 %id, %replacement_id, ?target_upper, ?replacement_upper,
4792 "preparing materialized view replacement application",
4793 );
4794
4795 let Some(replacement_upper_ts) = replacement_upper.into_option() else {
4796 ctx.retire(Err(AdapterError::ReplaceMaterializedViewSealed {
4805 name: target.name().item.clone(),
4806 }));
4807 return;
4808 };
4809
4810 let replacement_upper_ts = replacement_upper_ts.step_back().unwrap_or(Timestamp::MIN);
4814
4815 self.install_storage_watch_set(
4819 ctx.session().conn_id().clone(),
4820 BTreeSet::from_iter([target_gid]),
4821 replacement_upper_ts,
4822 WatchSetResponse::AlterMaterializedViewReady(AlterMaterializedViewReadyContext {
4823 ctx: Some(ctx),
4824 otel_ctx: OpenTelemetryContext::obtain(),
4825 plan,
4826 plan_validity,
4827 }),
4828 )
4829 .expect("target collection exists");
4830 }
4831
4832 #[instrument]
4834 pub async fn sequence_alter_materialized_view_apply_replacement_finish(
4835 &mut self,
4836 mut ctx: AlterMaterializedViewReadyContext,
4837 ) {
4838 ctx.otel_ctx.attach_as_parent();
4839
4840 let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = ctx.plan;
4841
4842 if let Err(err) = ctx.plan_validity.check(self.catalog()) {
4847 ctx.retire(Err(err));
4848 return;
4849 }
4850
4851 info!(
4852 %id, %replacement_id,
4853 "finishing materialized view replacement application",
4854 );
4855
4856 let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4857 match self
4858 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
4859 .await
4860 {
4861 Ok(()) => ctx.retire(Ok(ExecuteResponse::AlteredObject(
4862 ObjectType::MaterializedView,
4863 ))),
4864 Err(err) => ctx.retire(Err(err)),
4865 }
4866 }
4867
4868 pub(super) async fn statistics_oracle(
4869 &self,
4870 session: &Session,
4871 source_ids: &BTreeSet<GlobalId>,
4872 query_as_of: &Antichain<Timestamp>,
4873 is_oneshot: bool,
4874 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
4875 super::statistics_oracle(
4876 session,
4877 source_ids,
4878 query_as_of,
4879 is_oneshot,
4880 self.catalog().system_config(),
4881 self.controller.storage_collections.as_ref(),
4882 )
4883 .await
4884 }
4885}
4886
4887impl Coordinator {
4888 pub(crate) fn emit_raw_optimizer_notices_to_user(
4896 &self,
4897 ctx: &ExecuteContext,
4898 notices: &[RawOptimizerNotice],
4899 ) {
4900 emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
4901 }
4902
4903 async fn persist_dataflow_metainfo(
4913 &mut self,
4914 df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
4915 export_id: GlobalId,
4916 ) -> Option<BuiltinTableAppendNotify> {
4917 if self.catalog().state().system_config().enable_mz_notices()
4920 && !df_meta.optimizer_notices.is_empty()
4921 {
4922 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
4923 self.catalog().state().pack_optimizer_notices(
4924 &mut builtin_table_updates,
4925 df_meta.optimizer_notices.iter(),
4926 Diff::ONE,
4927 );
4928
4929 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4931
4932 Some(
4933 self.builtin_table_update()
4934 .execute(builtin_table_updates)
4935 .await
4936 .0,
4937 )
4938 } else {
4939 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
4941
4942 None
4943 }
4944 }
4945}