1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_SELF_MANAGED_AUTH};
26use mz_catalog::memory::objects::{
27 CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::vec::VecExt;
39use mz_ore::{assert_none, instrument};
40use mz_persist_client::stats::SnapshotPartStats;
41use mz_repr::adt::jsonb::Jsonb;
42use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
43use mz_repr::explain::ExprHumanizer;
44use mz_repr::explain::json::json_string;
45use mz_repr::role_id::RoleId;
46use mz_repr::{
47 CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
48 RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
49};
50use mz_sql::ast::{
51 AlterSourceAddSubsourceOption, SqlServerConfigOption, SqlServerConfigOptionName,
52};
53use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
54use mz_sql::catalog::{
55 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
56 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
57 ErrorMessageObjectDescription, ObjectType, RoleAttributes, RoleVars, SessionCatalog,
58};
59use mz_sql::names::{
60 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
61 SchemaSpecifier, SystemObjectId,
62};
63use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_storage_types::stats::RelationPartStats;
92use mz_transform::EmptyStatisticsOracle;
93use mz_transform::dataflow::DataflowMetainfo;
94use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
95use smallvec::SmallVec;
96use timely::progress::Antichain;
97use timely::progress::Timestamp as TimelyTimestamp;
98use tokio::sync::{oneshot, watch};
99use tracing::{Instrument, Span, info, warn};
100
101use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
102use crate::command::{ExecuteResponse, Response};
103use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
104use crate::coord::{
105 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
106 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
107 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
108 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
109 validate_ip_with_policy_rules,
110};
111use crate::error::AdapterError;
112use crate::notice::{AdapterNotice, DroppedInUseIndex};
113use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
114use crate::optimize::{self, Optimize};
115use crate::session::{
116 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
117 WriteLocks, WriteOp,
118};
119use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
120use crate::{PeekResponseUnary, ReadHolds};
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = match res {
255 Ok(next) => return_if_err!(next, ctx),
256 Err(err) => {
257 tracing::error!("sequence_staged join error {err}");
258 ctx.retire(Err(AdapterError::Internal(
259 "sequence_staged join error".into(),
260 )));
261 return;
262 }
263 };
264 f(ctx, next);
265 }
266 _ = rx, if cancel_enabled => {
267 ctx.retire(Err(AdapterError::Canceled));
268 }
269 }
270 });
271 }
272
273 async fn create_source_inner(
274 &self,
275 session: &Session,
276 plans: Vec<plan::CreateSourcePlanBundle>,
277 ) -> Result<CreateSourceInner, AdapterError> {
278 let mut ops = vec![];
279 let mut sources = vec![];
280
281 let if_not_exists_ids = plans
282 .iter()
283 .filter_map(
284 |plan::CreateSourcePlanBundle {
285 item_id,
286 global_id: _,
287 plan,
288 resolved_ids: _,
289 available_source_references: _,
290 }| {
291 if plan.if_not_exists {
292 Some((*item_id, plan.name.clone()))
293 } else {
294 None
295 }
296 },
297 )
298 .collect::<BTreeMap<_, _>>();
299
300 for plan::CreateSourcePlanBundle {
301 item_id,
302 global_id,
303 mut plan,
304 resolved_ids,
305 available_source_references,
306 } in plans
307 {
308 let name = plan.name.clone();
309
310 match plan.source.data_source {
311 plan::DataSourceDesc::Ingestion(ref ingestion) => {
312 let cluster_id = plan
313 .in_cluster
314 .expect("ingestion plans must specify cluster");
315 match ingestion.desc.connection {
316 GenericSourceConnection::Postgres(_)
317 | GenericSourceConnection::MySql(_)
318 | GenericSourceConnection::SqlServer(_)
319 | GenericSourceConnection::Kafka(_)
320 | GenericSourceConnection::LoadGenerator(_) => {
321 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
322 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
323 .get(self.catalog().system_config().dyncfgs());
324
325 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
326 {
327 return Err(AdapterError::Unsupported(
328 "sources in clusters with >1 replicas",
329 ));
330 }
331 }
332 }
333 }
334 }
335 plan::DataSourceDesc::Webhook { .. } => {
336 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
337 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
338 if cluster.replica_ids().len() > 1 {
339 return Err(AdapterError::Unsupported(
340 "webhook sources in clusters with >1 replicas",
341 ));
342 }
343 }
344 }
345 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
346 }
347
348 if let mz_sql::plan::DataSourceDesc::Webhook {
350 validate_using: Some(validate),
351 ..
352 } = &mut plan.source.data_source
353 {
354 if let Err(reason) = validate.reduce_expression().await {
355 self.metrics
356 .webhook_validation_reduce_failures
357 .with_label_values(&[reason])
358 .inc();
359 return Err(AdapterError::Internal(format!(
360 "failed to reduce check expression, {reason}"
361 )));
362 }
363 }
364
365 let mut reference_ops = vec![];
368 if let Some(references) = &available_source_references {
369 reference_ops.push(catalog::Op::UpdateSourceReferences {
370 source_id: item_id,
371 references: references.clone().into(),
372 });
373 }
374
375 let source = Source::new(plan, global_id, resolved_ids, None, false);
376 ops.push(catalog::Op::CreateItem {
377 id: item_id,
378 name,
379 item: CatalogItem::Source(source.clone()),
380 owner_id: *session.current_role_id(),
381 });
382 sources.push((item_id, source));
383 ops.extend(reference_ops);
385 }
386
387 Ok(CreateSourceInner {
388 ops,
389 sources,
390 if_not_exists_ids,
391 })
392 }
393
394 pub(crate) fn plan_subsource(
402 &self,
403 session: &Session,
404 params: &mz_sql::plan::Params,
405 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
406 item_id: CatalogItemId,
407 global_id: GlobalId,
408 ) -> Result<CreateSourcePlanBundle, AdapterError> {
409 let catalog = self.catalog().for_session(session);
410 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
411
412 let plan = self.plan_statement(
413 session,
414 Statement::CreateSubsource(subsource_stmt),
415 params,
416 &resolved_ids,
417 )?;
418 let plan = match plan {
419 Plan::CreateSource(plan) => plan,
420 _ => unreachable!(),
421 };
422 Ok(CreateSourcePlanBundle {
423 item_id,
424 global_id,
425 plan,
426 resolved_ids,
427 available_source_references: None,
428 })
429 }
430
431 pub(crate) async fn plan_purified_alter_source_add_subsource(
433 &mut self,
434 session: &Session,
435 params: Params,
436 source_name: ResolvedItemName,
437 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
438 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
439 ) -> Result<(Plan, ResolvedIds), AdapterError> {
440 let mut subsource_plans = Vec::with_capacity(subsources.len());
441
442 let conn_catalog = self.catalog().for_system_session();
444 let pcx = plan::PlanContext::zero();
445 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
446
447 let entry = self.catalog().get_entry(source_name.item_id());
448 let source = entry.source().ok_or_else(|| {
449 AdapterError::internal(
450 "plan alter source",
451 format!("expected Source found {entry:?}"),
452 )
453 })?;
454
455 let item_id = entry.id();
456 let ingestion_id = source.global_id();
457 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
458
459 let id_ts = self.get_catalog_write_ts().await;
460 let ids = self
461 .catalog_mut()
462 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
463 .await?;
464 for (subsource_stmt, (item_id, global_id)) in
465 subsource_stmts.into_iter().zip(ids.into_iter())
466 {
467 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
468 subsource_plans.push(s);
469 }
470
471 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
472 subsources: subsource_plans,
473 options,
474 };
475
476 Ok((
477 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
478 item_id,
479 ingestion_id,
480 action,
481 }),
482 ResolvedIds::empty(),
483 ))
484 }
485
486 pub(crate) fn plan_purified_alter_source_refresh_references(
488 &self,
489 _session: &Session,
490 _params: Params,
491 source_name: ResolvedItemName,
492 available_source_references: plan::SourceReferences,
493 ) -> Result<(Plan, ResolvedIds), AdapterError> {
494 let entry = self.catalog().get_entry(source_name.item_id());
495 let source = entry.source().ok_or_else(|| {
496 AdapterError::internal(
497 "plan alter source",
498 format!("expected Source found {entry:?}"),
499 )
500 })?;
501 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
502 references: available_source_references,
503 };
504
505 Ok((
506 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
507 item_id: entry.id(),
508 ingestion_id: source.global_id(),
509 action,
510 }),
511 ResolvedIds::empty(),
512 ))
513 }
514
515 pub(crate) async fn plan_purified_create_source(
519 &mut self,
520 ctx: &ExecuteContext,
521 params: Params,
522 progress_stmt: CreateSubsourceStatement<Aug>,
523 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
524 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
525 available_source_references: plan::SourceReferences,
526 ) -> Result<(Plan, ResolvedIds), AdapterError> {
527 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
528
529 assert_none!(progress_stmt.of_source);
536 let id_ts = self.get_catalog_write_ts().await;
537 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
538 let progress_plan =
539 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
540 let progress_full_name = self
541 .catalog()
542 .resolve_full_name(&progress_plan.plan.name, None);
543 let progress_subsource = ResolvedItemName::Item {
544 id: progress_plan.item_id,
545 qualifiers: progress_plan.plan.name.qualifiers.clone(),
546 full_name: progress_full_name,
547 print_id: true,
548 version: RelationVersionSelector::Latest,
549 };
550
551 create_source_plans.push(progress_plan);
552
553 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
554
555 let catalog = self.catalog().for_session(ctx.session());
556 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
557
558 let source_plan = match self.plan_statement(
560 ctx.session(),
561 Statement::CreateSource(source_stmt),
562 ¶ms,
563 &resolved_ids,
564 )? {
565 Plan::CreateSource(plan) => plan,
566 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
567 };
568
569 let id_ts = self.get_catalog_write_ts().await;
570 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
571
572 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
573 let of_source = ResolvedItemName::Item {
574 id: item_id,
575 qualifiers: source_plan.name.qualifiers.clone(),
576 full_name: source_full_name,
577 print_id: true,
578 version: RelationVersionSelector::Latest,
579 };
580
581 let conn_catalog = self.catalog().for_system_session();
583 let pcx = plan::PlanContext::zero();
584 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
585
586 let subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
587
588 create_source_plans.push(CreateSourcePlanBundle {
589 item_id,
590 global_id,
591 plan: source_plan,
592 resolved_ids: resolved_ids.clone(),
593 available_source_references: Some(available_source_references),
594 });
595
596 let id_ts = self.get_catalog_write_ts().await;
598 let ids = self
599 .catalog_mut()
600 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
601 .await?;
602 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip(ids.into_iter()) {
603 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
604 create_source_plans.push(plan);
605 }
606
607 Ok((
608 Plan::CreateSources(create_source_plans),
609 ResolvedIds::empty(),
610 ))
611 }
612
613 #[instrument]
614 pub(super) async fn sequence_create_source(
615 &mut self,
616 session: &mut Session,
617 plans: Vec<plan::CreateSourcePlanBundle>,
618 ) -> Result<ExecuteResponse, AdapterError> {
619 let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
620 let CreateSourceInner {
621 ops,
622 sources,
623 if_not_exists_ids,
624 } = self.create_source_inner(session, plans).await?;
625
626 let transact_result = self
627 .catalog_transact_with_side_effects(Some(session), ops, |coord| async {
628 let mut collections = Vec::with_capacity(sources.len());
640
641 for (item_id, source) in sources {
642 let source_status_item_id = coord.catalog().resolve_builtin_storage_collection(
643 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
644 );
645 let source_status_collection_id = Some(
646 coord
647 .catalog()
648 .get_entry(&source_status_item_id)
649 .latest_global_id(),
650 );
651
652 let (data_source, status_collection_id) = match source.data_source {
653 DataSourceDesc::Ingestion {
654 ingestion_desc:
655 mz_sql::plan::Ingestion {
656 desc,
657 progress_subsource,
658 },
659 cluster_id,
660 } => {
661 let desc = desc.into_inline_connection(coord.catalog().state());
662 let progress_subsource = coord
665 .catalog()
666 .get_entry(&progress_subsource)
667 .latest_global_id();
668
669 let ingestion = mz_storage_types::sources::IngestionDescription::new(
670 desc,
671 cluster_id,
672 progress_subsource,
673 );
674
675 (
676 DataSource::Ingestion(ingestion),
677 source_status_collection_id,
678 )
679 }
680 DataSourceDesc::IngestionExport {
681 ingestion_id,
682 external_reference: _,
683 details,
684 data_config,
685 } => {
686 let ingestion_id =
689 coord.catalog().get_entry(&ingestion_id).latest_global_id();
690 (
691 DataSource::IngestionExport {
692 ingestion_id,
693 details,
694 data_config: data_config
695 .into_inline_connection(coord.catalog().state()),
696 },
697 source_status_collection_id,
698 )
699 }
700 DataSourceDesc::Progress => (DataSource::Progress, None),
701 DataSourceDesc::Webhook { .. } => {
702 if let Some(url) = coord.catalog().state().try_get_webhook_url(&item_id)
703 {
704 session.add_notice(AdapterNotice::WebhookSourceCreated { url })
705 }
706
707 (DataSource::Webhook, None)
708 }
709 DataSourceDesc::Introspection(_) => {
710 unreachable!("cannot create sources with introspection data sources")
711 }
712 };
713
714 collections.push((
715 source.global_id,
716 CollectionDescription::<Timestamp> {
717 desc: source.desc.clone(),
718 data_source,
719 timeline: Some(source.timeline),
720 since: None,
721 status_collection_id,
722 },
723 ));
724 }
725
726 let storage_metadata = coord.catalog.state().storage_metadata();
727
728 coord
729 .controller
730 .storage
731 .create_collections(storage_metadata, None, collections)
732 .await
733 .unwrap_or_terminate("cannot fail to create collections");
734
735 let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
751 for (compaction_window, storage_policies) in read_policies {
752 coord
753 .initialize_storage_read_policies(storage_policies, compaction_window)
754 .await;
755 }
756 })
757 .await;
758
759 match transact_result {
760 Ok(()) => Ok(ExecuteResponse::CreatedSource),
761 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
762 kind:
763 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
764 })) if if_not_exists_ids.contains_key(&id) => {
765 session.add_notice(AdapterNotice::ObjectAlreadyExists {
766 name: if_not_exists_ids[&id].item.clone(),
767 ty: "source",
768 });
769 Ok(ExecuteResponse::CreatedSource)
770 }
771 Err(err) => Err(err),
772 }
773 }
774
775 #[instrument]
776 pub(super) async fn sequence_create_connection(
777 &mut self,
778 mut ctx: ExecuteContext,
779 plan: plan::CreateConnectionPlan,
780 resolved_ids: ResolvedIds,
781 ) {
782 let id_ts = self.get_catalog_write_ts().await;
783 let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
784 {
785 Ok(item_id) => item_id,
786 Err(err) => return ctx.retire(Err(err.into())),
787 };
788
789 match &plan.connection.details {
790 ConnectionDetails::Ssh { key_1, key_2, .. } => {
791 let key_1 = match key_1.as_key_pair() {
792 Some(key_1) => key_1.clone(),
793 None => {
794 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
795 "the PUBLIC KEY 1 option cannot be explicitly specified"
796 ))));
797 }
798 };
799
800 let key_2 = match key_2.as_key_pair() {
801 Some(key_2) => key_2.clone(),
802 None => {
803 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
804 "the PUBLIC KEY 2 option cannot be explicitly specified"
805 ))));
806 }
807 };
808
809 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
810 let secret = key_set.to_bytes();
811 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
812 return ctx.retire(Err(err.into()));
813 }
814 }
815 _ => (),
816 };
817
818 if plan.validate {
819 let internal_cmd_tx = self.internal_cmd_tx.clone();
820 let transient_revision = self.catalog().transient_revision();
821 let conn_id = ctx.session().conn_id().clone();
822 let otel_ctx = OpenTelemetryContext::obtain();
823 let role_metadata = ctx.session().role_metadata().clone();
824
825 let connection = plan
826 .connection
827 .details
828 .to_connection()
829 .into_inline_connection(self.catalog().state());
830
831 let current_storage_parameters = self.controller.storage.config().clone();
832 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
833 let result = match connection
834 .validate(connection_id, ¤t_storage_parameters)
835 .await
836 {
837 Ok(()) => Ok(plan),
838 Err(err) => Err(err.into()),
839 };
840
841 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
843 CreateConnectionValidationReady {
844 ctx,
845 result,
846 connection_id,
847 connection_gid,
848 plan_validity: PlanValidity::new(
849 transient_revision,
850 resolved_ids.items().copied().collect(),
851 None,
852 None,
853 role_metadata,
854 ),
855 otel_ctx,
856 resolved_ids: resolved_ids.clone(),
857 },
858 ));
859 if let Err(e) = result {
860 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
861 }
862 });
863 } else {
864 let result = self
865 .sequence_create_connection_stage_finish(
866 ctx.session_mut(),
867 connection_id,
868 connection_gid,
869 plan,
870 resolved_ids,
871 )
872 .await;
873 ctx.retire(result);
874 }
875 }
876
877 #[instrument]
878 pub(crate) async fn sequence_create_connection_stage_finish(
879 &mut self,
880 session: &mut Session,
881 connection_id: CatalogItemId,
882 connection_gid: GlobalId,
883 plan: plan::CreateConnectionPlan,
884 resolved_ids: ResolvedIds,
885 ) -> Result<ExecuteResponse, AdapterError> {
886 let ops = vec![catalog::Op::CreateItem {
887 id: connection_id,
888 name: plan.name.clone(),
889 item: CatalogItem::Connection(Connection {
890 create_sql: plan.connection.create_sql,
891 global_id: connection_gid,
892 details: plan.connection.details.clone(),
893 resolved_ids,
894 }),
895 owner_id: *session.current_role_id(),
896 }];
897
898 let transact_result = self
899 .catalog_transact_with_side_effects(Some(session), ops, |coord| async {
900 match plan.connection.details {
901 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
902 let spec = VpcEndpointConfig {
903 aws_service_name: privatelink.service_name.to_owned(),
904 availability_zone_ids: privatelink.availability_zones.to_owned(),
905 };
906 let cloud_resource_controller =
907 match coord.cloud_resource_controller.as_ref().cloned() {
908 Some(controller) => controller,
909 None => {
910 tracing::warn!("AWS PrivateLink connections unsupported");
911 return;
912 }
913 };
914 if let Err(err) = cloud_resource_controller
915 .ensure_vpc_endpoint(connection_id, spec)
916 .await
917 {
918 tracing::warn!(?err, "failed to ensure vpc endpoint!");
919 }
920 }
921 _ => {}
922 }
923 })
924 .await;
925
926 match transact_result {
927 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
928 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
929 kind:
930 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
931 })) if plan.if_not_exists => {
932 session.add_notice(AdapterNotice::ObjectAlreadyExists {
933 name: plan.name.item,
934 ty: "connection",
935 });
936 Ok(ExecuteResponse::CreatedConnection)
937 }
938 Err(err) => Err(err),
939 }
940 }
941
942 #[instrument]
943 pub(super) async fn sequence_create_database(
944 &mut self,
945 session: &mut Session,
946 plan: plan::CreateDatabasePlan,
947 ) -> Result<ExecuteResponse, AdapterError> {
948 let ops = vec![catalog::Op::CreateDatabase {
949 name: plan.name.clone(),
950 owner_id: *session.current_role_id(),
951 }];
952 match self.catalog_transact(Some(session), ops).await {
953 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
954 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
955 kind:
956 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
957 })) if plan.if_not_exists => {
958 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
959 Ok(ExecuteResponse::CreatedDatabase)
960 }
961 Err(err) => Err(err),
962 }
963 }
964
965 #[instrument]
966 pub(super) async fn sequence_create_schema(
967 &mut self,
968 session: &mut Session,
969 plan: plan::CreateSchemaPlan,
970 ) -> Result<ExecuteResponse, AdapterError> {
971 let op = catalog::Op::CreateSchema {
972 database_id: plan.database_spec,
973 schema_name: plan.schema_name.clone(),
974 owner_id: *session.current_role_id(),
975 };
976 match self.catalog_transact(Some(session), vec![op]).await {
977 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
978 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
979 kind:
980 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
981 })) if plan.if_not_exists => {
982 session.add_notice(AdapterNotice::SchemaAlreadyExists {
983 name: plan.schema_name,
984 });
985 Ok(ExecuteResponse::CreatedSchema)
986 }
987 Err(err) => Err(err),
988 }
989 }
990
991 fn validate_role_attributes(&self, attributes: &RoleAttributes) -> Result<(), AdapterError> {
993 if !ENABLE_SELF_MANAGED_AUTH.get(self.catalog().system_config().dyncfgs()) {
994 if attributes.superuser.is_some()
995 || attributes.password.is_some()
996 || attributes.login.is_some()
997 {
998 return Err(AdapterError::UnavailableFeature {
999 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1000 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1001 });
1002 }
1003 }
1004 Ok(())
1005 }
1006
1007 #[instrument]
1008 pub(super) async fn sequence_create_role(
1009 &mut self,
1010 conn_id: Option<&ConnectionId>,
1011 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1012 ) -> Result<ExecuteResponse, AdapterError> {
1013 self.validate_role_attributes(&attributes.clone())?;
1014 let op = catalog::Op::CreateRole { name, attributes };
1015 self.catalog_transact_conn(conn_id, vec![op])
1016 .await
1017 .map(|_| ExecuteResponse::CreatedRole)
1018 }
1019
1020 #[instrument]
1021 pub(super) async fn sequence_create_network_policy(
1022 &mut self,
1023 session: &Session,
1024 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1025 ) -> Result<ExecuteResponse, AdapterError> {
1026 let op = catalog::Op::CreateNetworkPolicy {
1027 rules,
1028 name,
1029 owner_id: *session.current_role_id(),
1030 };
1031 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1032 .await
1033 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1034 }
1035
1036 #[instrument]
1037 pub(super) async fn sequence_alter_network_policy(
1038 &mut self,
1039 session: &Session,
1040 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1041 ) -> Result<ExecuteResponse, AdapterError> {
1042 let current_network_policy_name = self
1044 .owned_catalog()
1045 .system_config()
1046 .default_network_policy_name();
1047 if current_network_policy_name == name {
1049 self.validate_alter_network_policy(session, &rules)?;
1050 }
1051
1052 let op = catalog::Op::AlterNetworkPolicy {
1053 id,
1054 rules,
1055 name,
1056 owner_id: *session.current_role_id(),
1057 };
1058 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1059 .await
1060 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1061 }
1062
1063 #[instrument]
1064 pub(super) async fn sequence_create_table(
1065 &mut self,
1066 ctx: &mut ExecuteContext,
1067 plan: plan::CreateTablePlan,
1068 resolved_ids: ResolvedIds,
1069 ) -> Result<ExecuteResponse, AdapterError> {
1070 let plan::CreateTablePlan {
1071 name,
1072 table,
1073 if_not_exists,
1074 } = plan;
1075
1076 let conn_id = if table.temporary {
1077 Some(ctx.session().conn_id())
1078 } else {
1079 None
1080 };
1081 let id_ts = self.get_catalog_write_ts().await;
1082 let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1083 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1084
1085 let data_source = match table.data_source {
1086 plan::TableDataSource::TableWrites { defaults } => {
1087 TableDataSource::TableWrites { defaults }
1088 }
1089 plan::TableDataSource::DataSource {
1090 desc: data_source_plan,
1091 timeline,
1092 } => match data_source_plan {
1093 plan::DataSourceDesc::IngestionExport {
1094 ingestion_id,
1095 external_reference,
1096 details,
1097 data_config,
1098 } => TableDataSource::DataSource {
1099 desc: DataSourceDesc::IngestionExport {
1100 ingestion_id,
1101 external_reference,
1102 details,
1103 data_config,
1104 },
1105 timeline,
1106 },
1107 plan::DataSourceDesc::Webhook {
1108 validate_using,
1109 body_format,
1110 headers,
1111 cluster_id,
1112 } => TableDataSource::DataSource {
1113 desc: DataSourceDesc::Webhook {
1114 validate_using,
1115 body_format,
1116 headers,
1117 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1118 },
1119 timeline,
1120 },
1121 o => {
1122 unreachable!("CREATE TABLE data source got {:?}", o)
1123 }
1124 },
1125 };
1126 let table = Table {
1127 create_sql: Some(table.create_sql),
1128 desc: table.desc,
1129 collections,
1130 conn_id: conn_id.cloned(),
1131 resolved_ids,
1132 custom_logical_compaction_window: table.compaction_window,
1133 is_retained_metrics_object: false,
1134 data_source,
1135 };
1136 let ops = vec![catalog::Op::CreateItem {
1137 id: table_id,
1138 name: name.clone(),
1139 item: CatalogItem::Table(table.clone()),
1140 owner_id: *ctx.session().current_role_id(),
1141 }];
1142
1143 let catalog_result = self
1144 .catalog_transact_with_side_effects(Some(ctx.session()), ops, |coord| async {
1145 let (collections, register_ts, read_policies) = match table.data_source {
1149 TableDataSource::TableWrites { defaults: _ } => {
1150 let register_ts = coord.get_local_write_ts().await.timestamp;
1152
1153 coord
1160 .catalog
1161 .confirm_leadership()
1162 .await
1163 .unwrap_or_terminate("unable to confirm leadership");
1164
1165 if let Some(id) = ctx.extra().contents() {
1166 coord.set_statement_execution_timestamp(id, register_ts);
1167 }
1168
1169 let relation_version = RelationVersion::root();
1171 assert_eq!(table.desc.latest_version(), relation_version);
1172 let relation_desc = table
1173 .desc
1174 .at_version(RelationVersionSelector::Specific(relation_version));
1175 let collection_desc = CollectionDescription::for_table(relation_desc, None);
1177 let collections = vec![(global_id, collection_desc)];
1178
1179 let compaction_window = table
1180 .custom_logical_compaction_window
1181 .unwrap_or(CompactionWindow::Default);
1182 let read_policies =
1183 BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1184
1185 (collections, Some(register_ts), read_policies)
1186 }
1187 TableDataSource::DataSource {
1188 desc: data_source,
1189 timeline,
1190 } => {
1191 match data_source {
1192 DataSourceDesc::IngestionExport {
1193 ingestion_id,
1194 external_reference: _,
1195 details,
1196 data_config,
1197 } => {
1198 let source_status_item_id =
1202 coord.catalog().resolve_builtin_storage_collection(
1203 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1204 );
1205 let status_collection_id = Some(
1206 coord
1207 .catalog()
1208 .get_entry(&source_status_item_id)
1209 .latest_global_id(),
1210 );
1211 let ingestion_id =
1214 coord.catalog().get_entry(&ingestion_id).latest_global_id();
1215 let collection_desc = CollectionDescription::<Timestamp> {
1217 desc: table.desc.at_version(RelationVersionSelector::Latest),
1218 data_source: DataSource::IngestionExport {
1219 ingestion_id,
1220 details,
1221 data_config: data_config
1222 .into_inline_connection(coord.catalog.state()),
1223 },
1224 since: None,
1225 status_collection_id,
1226 timeline: Some(timeline.clone()),
1227 };
1228
1229 let collections = vec![(global_id, collection_desc)];
1230 let read_policies = coord
1231 .catalog()
1232 .state()
1233 .source_compaction_windows(vec![table_id]);
1234
1235 (collections, None, read_policies)
1236 }
1237 DataSourceDesc::Webhook { .. } => {
1238 if let Some(url) =
1239 coord.catalog().state().try_get_webhook_url(&table_id)
1240 {
1241 ctx.session()
1242 .add_notice(AdapterNotice::WebhookSourceCreated { url })
1243 }
1244
1245 assert_eq!(
1247 table.desc.latest_version(),
1248 RelationVersion::root(),
1249 "found webhook with more than 1 relation version, {:?}",
1250 table.desc
1251 );
1252 let desc = table.desc.latest();
1253
1254 let collection_desc = CollectionDescription {
1255 desc,
1256 data_source: DataSource::Webhook,
1257 since: None,
1258 status_collection_id: None,
1259 timeline: Some(timeline.clone()),
1260 };
1261 let collections = vec![(global_id, collection_desc)];
1262 let read_policies = coord
1263 .catalog()
1264 .state()
1265 .source_compaction_windows(vec![table_id]);
1266
1267 (collections, None, read_policies)
1268 }
1269 _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1270 }
1271 }
1272 };
1273
1274 let storage_metadata = coord.catalog.state().storage_metadata();
1276 coord
1277 .controller
1278 .storage
1279 .create_collections(storage_metadata, register_ts, collections)
1280 .await
1281 .unwrap_or_terminate("cannot fail to create collections");
1282
1283 if let Some(register_ts) = register_ts {
1285 coord.apply_local_write(register_ts).await;
1286 }
1287
1288 for (compaction_window, storage_policies) in read_policies {
1290 coord
1291 .initialize_storage_read_policies(storage_policies, compaction_window)
1292 .await;
1293 }
1294 })
1295 .await;
1296
1297 match catalog_result {
1298 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1299 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1300 kind:
1301 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1302 })) if if_not_exists => {
1303 ctx.session_mut()
1304 .add_notice(AdapterNotice::ObjectAlreadyExists {
1305 name: name.item,
1306 ty: "table",
1307 });
1308 Ok(ExecuteResponse::CreatedTable)
1309 }
1310 Err(err) => Err(err),
1311 }
1312 }
1313
1314 #[instrument]
1315 pub(super) async fn sequence_create_sink(
1316 &mut self,
1317 ctx: ExecuteContext,
1318 plan: plan::CreateSinkPlan,
1319 resolved_ids: ResolvedIds,
1320 ) {
1321 let plan::CreateSinkPlan {
1322 name,
1323 sink,
1324 with_snapshot,
1325 if_not_exists,
1326 in_cluster,
1327 } = plan;
1328
1329 let id_ts = self.get_catalog_write_ts().await;
1331 let (item_id, global_id) =
1332 return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1333
1334 let catalog_sink = Sink {
1335 create_sql: sink.create_sql,
1336 global_id,
1337 from: sink.from,
1338 connection: sink.connection,
1339 envelope: sink.envelope,
1340 version: sink.version,
1341 with_snapshot,
1342 resolved_ids,
1343 cluster_id: in_cluster,
1344 };
1345
1346 let ops = vec![catalog::Op::CreateItem {
1347 id: item_id,
1348 name: name.clone(),
1349 item: CatalogItem::Sink(catalog_sink.clone()),
1350 owner_id: *ctx.session().current_role_id(),
1351 }];
1352
1353 let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1354 if let Err(e) = self
1355 .controller
1356 .storage
1357 .check_exists(sink.from)
1358 .map_err(|e| match e {
1359 StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1360 "{} is a {}, which cannot be exported as a sink",
1361 from.name().item.clone(),
1362 from.item().typ().to_string(),
1363 )),
1364 e => AdapterError::Storage(e),
1365 })
1366 {
1367 ctx.retire(Err(e));
1368 return;
1369 }
1370
1371 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1372
1373 match result {
1374 Ok(()) => {}
1375 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1376 kind:
1377 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1378 })) if if_not_exists => {
1379 ctx.session()
1380 .add_notice(AdapterNotice::ObjectAlreadyExists {
1381 name: name.item,
1382 ty: "sink",
1383 });
1384 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1385 return;
1386 }
1387 Err(e) => {
1388 ctx.retire(Err(e));
1389 return;
1390 }
1391 };
1392
1393 self.create_storage_export(global_id, &catalog_sink)
1394 .await
1395 .unwrap_or_terminate("cannot fail to create exports");
1396
1397 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1398 .await;
1399
1400 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1401 }
1402
1403 pub(super) fn validate_system_column_references(
1426 &self,
1427 uses_ambiguous_columns: bool,
1428 depends_on: &BTreeSet<GlobalId>,
1429 ) -> Result<(), AdapterError> {
1430 if uses_ambiguous_columns
1431 && depends_on
1432 .iter()
1433 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1434 {
1435 Err(AdapterError::AmbiguousSystemColumnReference)
1436 } else {
1437 Ok(())
1438 }
1439 }
1440
1441 #[instrument]
1442 pub(super) async fn sequence_create_type(
1443 &mut self,
1444 session: &Session,
1445 plan: plan::CreateTypePlan,
1446 resolved_ids: ResolvedIds,
1447 ) -> Result<ExecuteResponse, AdapterError> {
1448 let id_ts = self.get_catalog_write_ts().await;
1449 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1450 let typ = Type {
1451 create_sql: Some(plan.typ.create_sql),
1452 global_id,
1453 desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1454 details: CatalogTypeDetails {
1455 array_id: None,
1456 typ: plan.typ.inner,
1457 pg_metadata: None,
1458 },
1459 resolved_ids,
1460 };
1461 let op = catalog::Op::CreateItem {
1462 id: item_id,
1463 name: plan.name,
1464 item: CatalogItem::Type(typ),
1465 owner_id: *session.current_role_id(),
1466 };
1467 match self.catalog_transact(Some(session), vec![op]).await {
1468 Ok(()) => Ok(ExecuteResponse::CreatedType),
1469 Err(err) => Err(err),
1470 }
1471 }
1472
1473 #[instrument]
1474 pub(super) async fn sequence_comment_on(
1475 &mut self,
1476 session: &Session,
1477 plan: plan::CommentPlan,
1478 ) -> Result<ExecuteResponse, AdapterError> {
1479 let op = catalog::Op::Comment {
1480 object_id: plan.object_id,
1481 sub_component: plan.sub_component,
1482 comment: plan.comment,
1483 };
1484 self.catalog_transact(Some(session), vec![op]).await?;
1485 Ok(ExecuteResponse::Comment)
1486 }
1487
1488 #[instrument]
1489 pub(super) async fn sequence_drop_objects(
1490 &mut self,
1491 session: &Session,
1492 plan::DropObjectsPlan {
1493 drop_ids,
1494 object_type,
1495 referenced_ids,
1496 }: plan::DropObjectsPlan,
1497 ) -> Result<ExecuteResponse, AdapterError> {
1498 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1499 let mut objects = Vec::new();
1500 for obj_id in &drop_ids {
1501 if !referenced_ids_hashset.contains(obj_id) {
1502 let object_info = ErrorMessageObjectDescription::from_id(
1503 obj_id,
1504 &self.catalog().for_session(session),
1505 )
1506 .to_string();
1507 objects.push(object_info);
1508 }
1509 }
1510
1511 if !objects.is_empty() {
1512 session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1513 }
1514
1515 let DropOps {
1516 ops,
1517 dropped_active_db,
1518 dropped_active_cluster,
1519 dropped_in_use_indexes,
1520 } = self.sequence_drop_common(session, drop_ids)?;
1521
1522 self.catalog_transact(Some(session), ops).await?;
1523
1524 fail::fail_point!("after_sequencer_drop_replica");
1525
1526 if dropped_active_db {
1527 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1528 name: session.vars().database().to_string(),
1529 });
1530 }
1531 if dropped_active_cluster {
1532 session.add_notice(AdapterNotice::DroppedActiveCluster {
1533 name: session.vars().cluster().to_string(),
1534 });
1535 }
1536 for dropped_in_use_index in dropped_in_use_indexes {
1537 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1538 self.metrics
1539 .optimization_notices
1540 .with_label_values(&["DroppedInUseIndex"])
1541 .inc_by(1);
1542 }
1543 Ok(ExecuteResponse::DroppedObject(object_type))
1544 }
1545
1546 fn validate_dropped_role_ownership(
1547 &self,
1548 session: &Session,
1549 dropped_roles: &BTreeMap<RoleId, &str>,
1550 ) -> Result<(), AdapterError> {
1551 fn privilege_check(
1552 privileges: &PrivilegeMap,
1553 dropped_roles: &BTreeMap<RoleId, &str>,
1554 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1555 object_id: &SystemObjectId,
1556 catalog: &ConnCatalog,
1557 ) {
1558 for privilege in privileges.all_values() {
1559 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1560 let grantor_name = catalog.get_role(&privilege.grantor).name();
1561 let object_description =
1562 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1563 dependent_objects
1564 .entry(role_name.to_string())
1565 .or_default()
1566 .push(format!(
1567 "privileges on {object_description} granted by {grantor_name}",
1568 ));
1569 }
1570 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1571 let grantee_name = catalog.get_role(&privilege.grantee).name();
1572 let object_description =
1573 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1574 dependent_objects
1575 .entry(role_name.to_string())
1576 .or_default()
1577 .push(format!(
1578 "privileges granted on {object_description} to {grantee_name}"
1579 ));
1580 }
1581 }
1582 }
1583
1584 let catalog = self.catalog().for_session(session);
1585 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1586 for entry in self.catalog.entries() {
1587 let id = SystemObjectId::Object(entry.id().into());
1588 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1589 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1590 dependent_objects
1591 .entry(role_name.to_string())
1592 .or_default()
1593 .push(format!("owner of {object_description}"));
1594 }
1595 privilege_check(
1596 entry.privileges(),
1597 dropped_roles,
1598 &mut dependent_objects,
1599 &id,
1600 &catalog,
1601 );
1602 }
1603 for database in self.catalog.databases() {
1604 let database_id = SystemObjectId::Object(database.id().into());
1605 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1606 let object_description =
1607 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1608 dependent_objects
1609 .entry(role_name.to_string())
1610 .or_default()
1611 .push(format!("owner of {object_description}"));
1612 }
1613 privilege_check(
1614 &database.privileges,
1615 dropped_roles,
1616 &mut dependent_objects,
1617 &database_id,
1618 &catalog,
1619 );
1620 for schema in database.schemas_by_id.values() {
1621 let schema_id = SystemObjectId::Object(
1622 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1623 );
1624 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1625 let object_description =
1626 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1627 dependent_objects
1628 .entry(role_name.to_string())
1629 .or_default()
1630 .push(format!("owner of {object_description}"));
1631 }
1632 privilege_check(
1633 &schema.privileges,
1634 dropped_roles,
1635 &mut dependent_objects,
1636 &schema_id,
1637 &catalog,
1638 );
1639 }
1640 }
1641 for cluster in self.catalog.clusters() {
1642 let cluster_id = SystemObjectId::Object(cluster.id().into());
1643 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1644 let object_description =
1645 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1646 dependent_objects
1647 .entry(role_name.to_string())
1648 .or_default()
1649 .push(format!("owner of {object_description}"));
1650 }
1651 privilege_check(
1652 &cluster.privileges,
1653 dropped_roles,
1654 &mut dependent_objects,
1655 &cluster_id,
1656 &catalog,
1657 );
1658 for replica in cluster.replicas() {
1659 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1660 let replica_id =
1661 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1662 let object_description =
1663 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1664 dependent_objects
1665 .entry(role_name.to_string())
1666 .or_default()
1667 .push(format!("owner of {object_description}"));
1668 }
1669 }
1670 }
1671 privilege_check(
1672 self.catalog().system_privileges(),
1673 dropped_roles,
1674 &mut dependent_objects,
1675 &SystemObjectId::System,
1676 &catalog,
1677 );
1678 for (default_privilege_object, default_privilege_acl_items) in
1679 self.catalog.default_privileges()
1680 {
1681 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1682 dependent_objects
1683 .entry(role_name.to_string())
1684 .or_default()
1685 .push(format!(
1686 "default privileges on {}S created by {}",
1687 default_privilege_object.object_type, role_name
1688 ));
1689 }
1690 for default_privilege_acl_item in default_privilege_acl_items {
1691 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1692 dependent_objects
1693 .entry(role_name.to_string())
1694 .or_default()
1695 .push(format!(
1696 "default privileges on {}S granted to {}",
1697 default_privilege_object.object_type, role_name
1698 ));
1699 }
1700 }
1701 }
1702
1703 if !dependent_objects.is_empty() {
1704 Err(AdapterError::DependentObject(dependent_objects))
1705 } else {
1706 Ok(())
1707 }
1708 }
1709
1710 #[instrument]
1711 pub(super) async fn sequence_drop_owned(
1712 &mut self,
1713 session: &Session,
1714 plan: plan::DropOwnedPlan,
1715 ) -> Result<ExecuteResponse, AdapterError> {
1716 for role_id in &plan.role_ids {
1717 self.catalog().ensure_not_reserved_role(role_id)?;
1718 }
1719
1720 let mut privilege_revokes = plan.privilege_revokes;
1721
1722 let session_catalog = self.catalog().for_session(session);
1724 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1725 && !session.is_superuser()
1726 {
1727 let role_membership =
1729 session_catalog.collect_role_membership(session.current_role_id());
1730 let invalid_revokes: BTreeSet<_> = privilege_revokes
1731 .drain_filter_swapping(|(_, privilege)| {
1732 !role_membership.contains(&privilege.grantor)
1733 })
1734 .map(|(object_id, _)| object_id)
1735 .collect();
1736 for invalid_revoke in invalid_revokes {
1737 let object_description =
1738 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1739 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1740 }
1741 }
1742
1743 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1744 catalog::Op::UpdatePrivilege {
1745 target_id: object_id,
1746 privilege,
1747 variant: UpdatePrivilegeVariant::Revoke,
1748 }
1749 });
1750 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1751 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1752 privilege_object,
1753 privilege_acl_item,
1754 variant: UpdatePrivilegeVariant::Revoke,
1755 },
1756 );
1757 let DropOps {
1758 ops: drop_ops,
1759 dropped_active_db,
1760 dropped_active_cluster,
1761 dropped_in_use_indexes,
1762 } = self.sequence_drop_common(session, plan.drop_ids)?;
1763
1764 let ops = privilege_revoke_ops
1765 .chain(default_privilege_revoke_ops)
1766 .chain(drop_ops.into_iter())
1767 .collect();
1768
1769 self.catalog_transact(Some(session), ops).await?;
1770
1771 if dropped_active_db {
1772 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1773 name: session.vars().database().to_string(),
1774 });
1775 }
1776 if dropped_active_cluster {
1777 session.add_notice(AdapterNotice::DroppedActiveCluster {
1778 name: session.vars().cluster().to_string(),
1779 });
1780 }
1781 for dropped_in_use_index in dropped_in_use_indexes {
1782 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1783 }
1784 Ok(ExecuteResponse::DroppedOwned)
1785 }
1786
1787 fn sequence_drop_common(
1788 &self,
1789 session: &Session,
1790 ids: Vec<ObjectId>,
1791 ) -> Result<DropOps, AdapterError> {
1792 let mut dropped_active_db = false;
1793 let mut dropped_active_cluster = false;
1794 let mut dropped_in_use_indexes = Vec::new();
1795 let mut dropped_roles = BTreeMap::new();
1796 let mut dropped_databases = BTreeSet::new();
1797 let mut dropped_schemas = BTreeSet::new();
1798 let mut role_revokes = BTreeSet::new();
1802 let mut default_privilege_revokes = BTreeSet::new();
1805
1806 let mut clusters_to_drop = BTreeSet::new();
1808
1809 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1810 for id in &ids {
1811 match id {
1812 ObjectId::Database(id) => {
1813 let name = self.catalog().get_database(id).name();
1814 if name == session.vars().database() {
1815 dropped_active_db = true;
1816 }
1817 dropped_databases.insert(id);
1818 }
1819 ObjectId::Schema((_, spec)) => {
1820 if let SchemaSpecifier::Id(id) = spec {
1821 dropped_schemas.insert(id);
1822 }
1823 }
1824 ObjectId::Cluster(id) => {
1825 clusters_to_drop.insert(*id);
1826 if let Some(active_id) = self
1827 .catalog()
1828 .active_cluster(session)
1829 .ok()
1830 .map(|cluster| cluster.id())
1831 {
1832 if id == &active_id {
1833 dropped_active_cluster = true;
1834 }
1835 }
1836 }
1837 ObjectId::Role(id) => {
1838 let role = self.catalog().get_role(id);
1839 let name = role.name();
1840 dropped_roles.insert(*id, name);
1841 for (group_id, grantor_id) in &role.membership.map {
1843 role_revokes.insert((*group_id, *id, *grantor_id));
1844 }
1845 }
1846 ObjectId::Item(id) => {
1847 if let Some(index) = self.catalog().get_entry(id).index() {
1848 let humanizer = self.catalog().for_session(session);
1849 let dependants = self
1850 .controller
1851 .compute
1852 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1853 .ok()
1854 .into_iter()
1855 .flatten()
1856 .filter(|dependant_id| {
1857 if dependant_id.is_transient() {
1864 return false;
1865 }
1866 let Some(dependent_id) = humanizer
1868 .try_get_item_by_global_id(dependant_id)
1869 .map(|item| item.id())
1870 else {
1871 return false;
1872 };
1873 !ids_set.contains(&ObjectId::Item(dependent_id))
1876 })
1877 .flat_map(|dependant_id| {
1878 humanizer.humanize_id(dependant_id)
1882 })
1883 .collect_vec();
1884 if !dependants.is_empty() {
1885 dropped_in_use_indexes.push(DroppedInUseIndex {
1886 index_name: humanizer
1887 .humanize_id(index.global_id())
1888 .unwrap_or_else(|| id.to_string()),
1889 dependant_objects: dependants,
1890 });
1891 }
1892 }
1893 }
1894 _ => {}
1895 }
1896 }
1897
1898 for id in &ids {
1899 match id {
1900 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1904 if !clusters_to_drop.contains(cluster_id) {
1905 let cluster = self.catalog.get_cluster(*cluster_id);
1906 if cluster.is_managed() {
1907 let replica =
1908 cluster.replica(*replica_id).expect("Catalog out of sync");
1909 if !replica.config.location.internal() {
1910 coord_bail!("cannot drop replica of managed cluster");
1911 }
1912 }
1913 }
1914 }
1915 _ => {}
1916 }
1917 }
1918
1919 for role_id in dropped_roles.keys() {
1920 self.catalog().ensure_not_reserved_role(role_id)?;
1921 }
1922 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1923 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1925 for role in self.catalog().user_roles() {
1926 for dropped_role_id in
1927 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1928 {
1929 role_revokes.insert((
1930 **dropped_role_id,
1931 role.id(),
1932 *role
1933 .membership
1934 .map
1935 .get(*dropped_role_id)
1936 .expect("included in keys above"),
1937 ));
1938 }
1939 }
1940
1941 for (default_privilege_object, default_privilege_acls) in
1942 self.catalog().default_privileges()
1943 {
1944 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1945 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1946 {
1947 for default_privilege_acl in default_privilege_acls {
1948 default_privilege_revokes.insert((
1949 default_privilege_object.clone(),
1950 default_privilege_acl.clone(),
1951 ));
1952 }
1953 }
1954 }
1955
1956 let ops = role_revokes
1957 .into_iter()
1958 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
1959 role_id,
1960 member_id,
1961 grantor_id,
1962 })
1963 .chain(default_privilege_revokes.into_iter().map(
1964 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1965 privilege_object,
1966 privilege_acl_item,
1967 variant: UpdatePrivilegeVariant::Revoke,
1968 },
1969 ))
1970 .chain(iter::once(catalog::Op::DropObjects(
1971 ids.into_iter()
1972 .map(DropObjectInfo::manual_drop_from_object_id)
1973 .collect(),
1974 )))
1975 .collect();
1976
1977 Ok(DropOps {
1978 ops,
1979 dropped_active_db,
1980 dropped_active_cluster,
1981 dropped_in_use_indexes,
1982 })
1983 }
1984
1985 pub(super) fn sequence_explain_schema(
1986 &self,
1987 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
1988 ) -> Result<ExecuteResponse, AdapterError> {
1989 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
1990 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
1991 })?;
1992
1993 let json_string = json_string(&json_value);
1994 let row = Row::pack_slice(&[Datum::String(&json_string)]);
1995 Ok(Self::send_immediate_rows(row))
1996 }
1997
1998 pub(super) fn sequence_show_all_variables(
1999 &self,
2000 session: &Session,
2001 ) -> Result<ExecuteResponse, AdapterError> {
2002 let mut rows = viewable_variables(self.catalog().state(), session)
2003 .map(|v| (v.name(), v.value(), v.description()))
2004 .collect::<Vec<_>>();
2005 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2006
2007 let rows: Vec<_> = rows
2009 .into_iter()
2010 .map(|(name, val, desc)| {
2011 Row::pack_slice(&[
2012 Datum::String(name),
2013 Datum::String(&val),
2014 Datum::String(desc),
2015 ])
2016 })
2017 .collect();
2018 Ok(Self::send_immediate_rows(rows))
2019 }
2020
2021 pub(super) fn sequence_show_variable(
2022 &self,
2023 session: &Session,
2024 plan: plan::ShowVariablePlan,
2025 ) -> Result<ExecuteResponse, AdapterError> {
2026 if &plan.name == SCHEMA_ALIAS {
2027 let schemas = self.catalog.resolve_search_path(session);
2028 let schema = schemas.first();
2029 return match schema {
2030 Some((database_spec, schema_spec)) => {
2031 let schema_name = &self
2032 .catalog
2033 .get_schema(database_spec, schema_spec, session.conn_id())
2034 .name()
2035 .schema;
2036 let row = Row::pack_slice(&[Datum::String(schema_name)]);
2037 Ok(Self::send_immediate_rows(row))
2038 }
2039 None => {
2040 if session.vars().current_object_missing_warnings() {
2041 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2042 search_path: session
2043 .vars()
2044 .search_path()
2045 .into_iter()
2046 .map(|schema| schema.to_string())
2047 .collect(),
2048 });
2049 }
2050 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2051 }
2052 };
2053 }
2054
2055 let variable = session
2056 .vars()
2057 .get(self.catalog().system_config(), &plan.name)
2058 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2059
2060 variable.visible(session.user(), self.catalog().system_config())?;
2063
2064 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2065 if variable.name() == vars::DATABASE.name()
2066 && matches!(
2067 self.catalog().resolve_database(&variable.value()),
2068 Err(CatalogError::UnknownDatabase(_))
2069 )
2070 && session.vars().current_object_missing_warnings()
2071 {
2072 let name = variable.value();
2073 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2074 } else if variable.name() == vars::CLUSTER.name()
2075 && matches!(
2076 self.catalog().resolve_cluster(&variable.value()),
2077 Err(CatalogError::UnknownCluster(_))
2078 )
2079 && session.vars().current_object_missing_warnings()
2080 {
2081 let name = variable.value();
2082 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2083 }
2084 Ok(Self::send_immediate_rows(row))
2085 }
2086
2087 #[instrument]
2088 pub(super) async fn sequence_inspect_shard(
2089 &self,
2090 session: &Session,
2091 plan: plan::InspectShardPlan,
2092 ) -> Result<ExecuteResponse, AdapterError> {
2093 if !session.user().is_internal() {
2096 return Err(AdapterError::Unauthorized(
2097 rbac::UnauthorizedError::MzSystem {
2098 action: "inspect".into(),
2099 },
2100 ));
2101 }
2102 let state = self
2103 .controller
2104 .storage
2105 .inspect_persist_state(plan.id)
2106 .await?;
2107 let jsonb = Jsonb::from_serde_json(state)?;
2108 Ok(Self::send_immediate_rows(jsonb.into_row()))
2109 }
2110
2111 #[instrument]
2112 pub(super) fn sequence_set_variable(
2113 &self,
2114 session: &mut Session,
2115 plan: plan::SetVariablePlan,
2116 ) -> Result<ExecuteResponse, AdapterError> {
2117 let (name, local) = (plan.name, plan.local);
2118 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2119 self.validate_set_isolation_level(session)?;
2120 }
2121 if &name == vars::CLUSTER.name() {
2122 self.validate_set_cluster(session)?;
2123 }
2124
2125 let vars = session.vars_mut();
2126 let values = match plan.value {
2127 plan::VariableValue::Default => None,
2128 plan::VariableValue::Values(values) => Some(values),
2129 };
2130
2131 match values {
2132 Some(values) => {
2133 vars.set(
2134 self.catalog().system_config(),
2135 &name,
2136 VarInput::SqlSet(&values),
2137 local,
2138 )?;
2139
2140 let vars = session.vars();
2141
2142 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2145 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2146 } else if name == vars::CLUSTER.name()
2147 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2148 {
2149 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2150 }
2151
2152 if name.as_str() == vars::DATABASE.name()
2154 && matches!(
2155 self.catalog().resolve_database(vars.database()),
2156 Err(CatalogError::UnknownDatabase(_))
2157 )
2158 && session.vars().current_object_missing_warnings()
2159 {
2160 let name = vars.database().to_string();
2161 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2162 } else if name.as_str() == vars::CLUSTER.name()
2163 && matches!(
2164 self.catalog().resolve_cluster(vars.cluster()),
2165 Err(CatalogError::UnknownCluster(_))
2166 )
2167 && session.vars().current_object_missing_warnings()
2168 {
2169 let name = vars.cluster().to_string();
2170 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2171 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2172 let v = values.into_first().to_lowercase();
2173 if v == IsolationLevel::ReadUncommitted.as_str()
2174 || v == IsolationLevel::ReadCommitted.as_str()
2175 || v == IsolationLevel::RepeatableRead.as_str()
2176 {
2177 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2178 isolation_level: v,
2179 });
2180 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2181 session.add_notice(AdapterNotice::StrongSessionSerializable);
2182 }
2183 }
2184 }
2185 None => vars.reset(self.catalog().system_config(), &name, local)?,
2186 }
2187
2188 Ok(ExecuteResponse::SetVariable { name, reset: false })
2189 }
2190
2191 pub(super) fn sequence_reset_variable(
2192 &self,
2193 session: &mut Session,
2194 plan: plan::ResetVariablePlan,
2195 ) -> Result<ExecuteResponse, AdapterError> {
2196 let name = plan.name;
2197 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2198 self.validate_set_isolation_level(session)?;
2199 }
2200 if &name == vars::CLUSTER.name() {
2201 self.validate_set_cluster(session)?;
2202 }
2203 session
2204 .vars_mut()
2205 .reset(self.catalog().system_config(), &name, false)?;
2206 Ok(ExecuteResponse::SetVariable { name, reset: true })
2207 }
2208
2209 pub(super) fn sequence_set_transaction(
2210 &self,
2211 session: &mut Session,
2212 plan: plan::SetTransactionPlan,
2213 ) -> Result<ExecuteResponse, AdapterError> {
2214 for mode in plan.modes {
2216 match mode {
2217 TransactionMode::AccessMode(_) => {
2218 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2219 }
2220 TransactionMode::IsolationLevel(isolation_level) => {
2221 self.validate_set_isolation_level(session)?;
2222
2223 session.vars_mut().set(
2224 self.catalog().system_config(),
2225 TRANSACTION_ISOLATION_VAR_NAME,
2226 VarInput::Flat(&isolation_level.to_ast_string_stable()),
2227 plan.local,
2228 )?
2229 }
2230 }
2231 }
2232 Ok(ExecuteResponse::SetVariable {
2233 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2234 reset: false,
2235 })
2236 }
2237
2238 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2239 if session.transaction().contains_ops() {
2240 Err(AdapterError::InvalidSetIsolationLevel)
2241 } else {
2242 Ok(())
2243 }
2244 }
2245
2246 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2247 if session.transaction().contains_ops() {
2248 Err(AdapterError::InvalidSetCluster)
2249 } else {
2250 Ok(())
2251 }
2252 }
2253
2254 #[instrument]
2255 pub(super) async fn sequence_end_transaction(
2256 &mut self,
2257 mut ctx: ExecuteContext,
2258 mut action: EndTransactionAction,
2259 ) {
2260 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2262 (&action, ctx.session().transaction())
2263 {
2264 action = EndTransactionAction::Rollback;
2265 }
2266 let response = match action {
2267 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2268 params: BTreeMap::new(),
2269 }),
2270 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2271 params: BTreeMap::new(),
2272 }),
2273 };
2274
2275 let result = self
2276 .sequence_end_transaction_inner(ctx.session_mut(), action)
2277 .await;
2278
2279 let (response, action) = match result {
2280 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2281 (response, action)
2282 }
2283 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2284 let validated_locks = match write_lock_guards {
2288 None => None,
2289 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2290 Ok(locks) => Some(locks),
2291 Err(missing) => {
2292 tracing::error!(?missing, "programming error, missing write locks");
2293 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2294 }
2295 },
2296 };
2297
2298 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2299 for WriteOp { id, rows } in writes {
2300 let total_rows = collected_writes.entry(id).or_default();
2301 total_rows.push(rows);
2302 }
2303
2304 self.submit_write(PendingWriteTxn::User {
2305 span: Span::current(),
2306 writes: collected_writes,
2307 write_locks: validated_locks,
2308 pending_txn: PendingTxn {
2309 ctx,
2310 response,
2311 action,
2312 },
2313 });
2314 return;
2315 }
2316 Ok((
2317 Some(TransactionOps::Peeks {
2318 determination,
2319 requires_linearization: RequireLinearization::Required,
2320 ..
2321 }),
2322 _,
2323 )) if ctx.session().vars().transaction_isolation()
2324 == &IsolationLevel::StrictSerializable =>
2325 {
2326 let conn_id = ctx.session().conn_id().clone();
2327 let pending_read_txn = PendingReadTxn {
2328 txn: PendingRead::Read {
2329 txn: PendingTxn {
2330 ctx,
2331 response,
2332 action,
2333 },
2334 },
2335 timestamp_context: determination.timestamp_context,
2336 created: Instant::now(),
2337 num_requeues: 0,
2338 otel_ctx: OpenTelemetryContext::obtain(),
2339 };
2340 self.strict_serializable_reads_tx
2341 .send((conn_id, pending_read_txn))
2342 .expect("sending to strict_serializable_reads_tx cannot fail");
2343 return;
2344 }
2345 Ok((
2346 Some(TransactionOps::Peeks {
2347 determination,
2348 requires_linearization: RequireLinearization::Required,
2349 ..
2350 }),
2351 _,
2352 )) if ctx.session().vars().transaction_isolation()
2353 == &IsolationLevel::StrongSessionSerializable =>
2354 {
2355 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2356 ctx.session_mut()
2357 .ensure_timestamp_oracle(timeline.clone())
2358 .apply_write(*ts);
2359 }
2360 (response, action)
2361 }
2362 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2363 self.internal_cmd_tx
2364 .send(Message::ExecuteSingleStatementTransaction {
2365 ctx,
2366 otel_ctx: OpenTelemetryContext::obtain(),
2367 stmt,
2368 params,
2369 })
2370 .expect("must send");
2371 return;
2372 }
2373 Ok((_, _)) => (response, action),
2374 Err(err) => (Err(err), EndTransactionAction::Rollback),
2375 };
2376 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2377 let response = response.map(|mut r| {
2379 r.extend_params(changed);
2380 ExecuteResponse::from(r)
2381 });
2382
2383 ctx.retire(response);
2384 }
2385
2386 #[instrument]
2387 async fn sequence_end_transaction_inner(
2388 &mut self,
2389 session: &mut Session,
2390 action: EndTransactionAction,
2391 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2392 let txn = self.clear_transaction(session).await;
2393
2394 if let EndTransactionAction::Commit = action {
2395 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2396 match &mut ops {
2397 TransactionOps::Writes(writes) => {
2398 for WriteOp { id, .. } in &mut writes.iter() {
2399 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2401 AdapterError::Catalog(mz_catalog::memory::error::Error {
2402 kind: mz_catalog::memory::error::ErrorKind::Sql(
2403 CatalogError::UnknownItem(id.to_string()),
2404 ),
2405 })
2406 })?;
2407 }
2408
2409 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2411 }
2412 TransactionOps::DDL {
2413 ops,
2414 state: _,
2415 revision,
2416 } => {
2417 if *revision != self.catalog().transient_revision() {
2419 return Err(AdapterError::DDLTransactionRace);
2420 }
2421 self.catalog_transact(Some(session), std::mem::take(ops))
2423 .await?;
2424 }
2425 _ => (),
2426 }
2427 return Ok((Some(ops), write_lock_guards));
2428 }
2429 }
2430
2431 Ok((None, None))
2432 }
2433
2434 pub(super) async fn sequence_side_effecting_func(
2435 &mut self,
2436 ctx: ExecuteContext,
2437 plan: SideEffectingFunc,
2438 ) {
2439 match plan {
2440 SideEffectingFunc::PgCancelBackend { connection_id } => {
2441 if ctx.session().conn_id().unhandled() == connection_id {
2442 ctx.retire(Err(AdapterError::Canceled));
2446 return;
2447 }
2448
2449 let res = if let Some((id_handle, _conn_meta)) =
2450 self.active_conns.get_key_value(&connection_id)
2451 {
2452 self.handle_privileged_cancel(id_handle.clone()).await;
2454 Datum::True
2455 } else {
2456 Datum::False
2457 };
2458 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2459 }
2460 }
2461 }
2462
2463 pub(super) async fn determine_real_time_recent_timestamp(
2466 &self,
2467 session: &Session,
2468 source_ids: impl Iterator<Item = CatalogItemId>,
2469 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2470 {
2471 let vars = session.vars();
2472
2473 let r = if vars.real_time_recency()
2478 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2479 && !session.contains_read_timestamp()
2480 {
2481 let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2487 if to_visit.is_empty() {
2490 return Ok(None);
2491 }
2492
2493 let mut timestamp_objects = BTreeSet::new();
2494
2495 while let Some(id) = to_visit.pop_front() {
2496 timestamp_objects.insert(id);
2497 to_visit.extend(
2498 self.catalog()
2499 .get_entry(&id)
2500 .uses()
2501 .into_iter()
2502 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2503 );
2504 }
2505 let timestamp_objects = timestamp_objects
2506 .into_iter()
2507 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2508 .collect();
2509
2510 let r = self
2511 .controller
2512 .determine_real_time_recent_timestamp(
2513 timestamp_objects,
2514 *vars.real_time_recency_timeout(),
2515 )
2516 .await?;
2517
2518 Some(r)
2519 } else {
2520 None
2521 };
2522
2523 Ok(r)
2524 }
2525
2526 #[instrument]
2527 pub(super) async fn sequence_explain_plan(
2528 &mut self,
2529 ctx: ExecuteContext,
2530 plan: plan::ExplainPlanPlan,
2531 target_cluster: TargetCluster,
2532 ) {
2533 match &plan.explainee {
2534 plan::Explainee::Statement(stmt) => match stmt {
2535 plan::ExplaineeStatement::CreateView { .. } => {
2536 self.explain_create_view(ctx, plan).await;
2537 }
2538 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2539 self.explain_create_materialized_view(ctx, plan).await;
2540 }
2541 plan::ExplaineeStatement::CreateIndex { .. } => {
2542 self.explain_create_index(ctx, plan).await;
2543 }
2544 plan::ExplaineeStatement::Select { .. } => {
2545 self.explain_peek(ctx, plan, target_cluster).await;
2546 }
2547 },
2548 plan::Explainee::View(_) => {
2549 let result = self.explain_view(&ctx, plan);
2550 ctx.retire(result);
2551 }
2552 plan::Explainee::MaterializedView(_) => {
2553 let result = self.explain_materialized_view(&ctx, plan);
2554 ctx.retire(result);
2555 }
2556 plan::Explainee::Index(_) => {
2557 let result = self.explain_index(&ctx, plan);
2558 ctx.retire(result);
2559 }
2560 plan::Explainee::ReplanView(_) => {
2561 self.explain_replan_view(ctx, plan).await;
2562 }
2563 plan::Explainee::ReplanMaterializedView(_) => {
2564 self.explain_replan_materialized_view(ctx, plan).await;
2565 }
2566 plan::Explainee::ReplanIndex(_) => {
2567 self.explain_replan_index(ctx, plan).await;
2568 }
2569 };
2570 }
2571
2572 pub(super) async fn sequence_explain_pushdown(
2573 &mut self,
2574 ctx: ExecuteContext,
2575 plan: plan::ExplainPushdownPlan,
2576 target_cluster: TargetCluster,
2577 ) {
2578 match plan.explainee {
2579 Explainee::Statement(ExplaineeStatement::Select {
2580 broken: false,
2581 plan,
2582 desc: _,
2583 }) => {
2584 let stage = return_if_err!(
2585 self.peek_validate(
2586 ctx.session(),
2587 plan,
2588 target_cluster,
2589 None,
2590 ExplainContext::Pushdown,
2591 Some(ctx.session().vars().max_query_result_size()),
2592 ),
2593 ctx
2594 );
2595 self.sequence_staged(ctx, Span::current(), stage).await;
2596 }
2597 Explainee::MaterializedView(item_id) => {
2598 self.explain_pushdown_materialized_view(ctx, item_id).await;
2599 }
2600 _ => {
2601 ctx.retire(Err(AdapterError::Unsupported(
2602 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2603 )));
2604 }
2605 };
2606 }
2607
2608 async fn render_explain_pushdown(
2609 &self,
2610 ctx: ExecuteContext,
2611 as_of: Antichain<Timestamp>,
2612 mz_now: ResultSpec<'static>,
2613 read_holds: Option<ReadHolds<Timestamp>>,
2614 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2615 ) {
2616 let fut = self
2617 .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2618 .await;
2619 task::spawn(|| "render explain pushdown", async move {
2620 let _read_holds = read_holds;
2622 let res = fut.await;
2623 ctx.retire(res);
2624 });
2625 }
2626
2627 async fn render_explain_pushdown_prepare<
2628 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2629 >(
2630 &self,
2631 session: &Session,
2632 as_of: Antichain<Timestamp>,
2633 mz_now: ResultSpec<'static>,
2634 imports: I,
2635 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2636 let explain_timeout = *session.vars().statement_timeout();
2637 let mut futures = FuturesOrdered::new();
2638 for (id, mfp) in imports {
2639 let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2640 let full_name = self
2641 .catalog
2642 .for_session(session)
2643 .resolve_full_name(&catalog_entry.name);
2644 let name = format!("{}", full_name);
2645 let relation_desc = catalog_entry
2646 .desc_opt()
2647 .expect("source should have a proper desc")
2648 .into_owned();
2649 let stats_future = self
2650 .controller
2651 .storage_collections
2652 .snapshot_parts_stats(id, as_of.clone())
2653 .await;
2654
2655 let mz_now = mz_now.clone();
2656 futures.push_back(async move {
2659 let snapshot_stats = match stats_future.await {
2660 Ok(stats) => stats,
2661 Err(e) => return Err(e),
2662 };
2663 let mut total_bytes = 0;
2664 let mut total_parts = 0;
2665 let mut selected_bytes = 0;
2666 let mut selected_parts = 0;
2667 for SnapshotPartStats {
2668 encoded_size_bytes: bytes,
2669 stats,
2670 } in &snapshot_stats.parts
2671 {
2672 let bytes = u64::cast_from(*bytes);
2673 total_bytes += bytes;
2674 total_parts += 1u64;
2675 let selected = match stats {
2676 None => true,
2677 Some(stats) => {
2678 let stats = stats.decode();
2679 let stats = RelationPartStats::new(
2680 name.as_str(),
2681 &snapshot_stats.metrics.pushdown.part_stats,
2682 &relation_desc,
2683 &stats,
2684 );
2685 stats.may_match_mfp(mz_now.clone(), &mfp)
2686 }
2687 };
2688
2689 if selected {
2690 selected_bytes += bytes;
2691 selected_parts += 1u64;
2692 }
2693 }
2694 Ok(Row::pack_slice(&[
2695 name.as_str().into(),
2696 total_bytes.into(),
2697 selected_bytes.into(),
2698 total_parts.into(),
2699 selected_parts.into(),
2700 ]))
2701 });
2702 }
2703
2704 let fut = async move {
2705 match tokio::time::timeout(
2706 explain_timeout,
2707 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2708 )
2709 .await
2710 {
2711 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2712 rows: Box::new(rows.into_row_iter()),
2713 }),
2714 Ok(Err(err)) => Err(err.into()),
2715 Err(_) => Err(AdapterError::StatementTimeout),
2716 }
2717 };
2718 fut
2719 }
2720
2721 #[instrument]
2722 pub(super) async fn sequence_insert(
2723 &mut self,
2724 mut ctx: ExecuteContext,
2725 plan: plan::InsertPlan,
2726 ) {
2727 if !ctx.session_mut().transaction().allows_writes() {
2735 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2736 return;
2737 }
2738
2739 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2753 let expr = return_if_err!(
2756 plan.values
2757 .clone()
2758 .lower(self.catalog().system_config(), None),
2759 ctx
2760 );
2761 OptimizedMirRelationExpr(expr)
2762 } else {
2763 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2765
2766 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2768
2769 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2771 };
2772
2773 match optimized_mir.into_inner() {
2774 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2775 let catalog = self.owned_catalog();
2776 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2777 let result =
2778 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2779 ctx.retire(result);
2780 });
2781 }
2782 _ => {
2784 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2785 Some(table) => {
2786 let fullname = self
2787 .catalog()
2788 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2789 table
2791 .desc_latest(&fullname)
2792 .expect("desc called on table")
2793 .arity()
2794 }
2795 None => {
2796 ctx.retire(Err(AdapterError::Catalog(
2797 mz_catalog::memory::error::Error {
2798 kind: mz_catalog::memory::error::ErrorKind::Sql(
2799 CatalogError::UnknownItem(plan.id.to_string()),
2800 ),
2801 },
2802 )));
2803 return;
2804 }
2805 };
2806
2807 let finishing = RowSetFinishing {
2808 order_by: vec![],
2809 limit: None,
2810 offset: 0,
2811 project: (0..desc_arity).collect(),
2812 };
2813
2814 let read_then_write_plan = plan::ReadThenWritePlan {
2815 id: plan.id,
2816 selection: plan.values,
2817 finishing,
2818 assignments: BTreeMap::new(),
2819 kind: MutationKind::Insert,
2820 returning: plan.returning,
2821 };
2822
2823 self.sequence_read_then_write(ctx, read_then_write_plan)
2824 .await;
2825 }
2826 }
2827 }
2828
2829 #[instrument]
2834 pub(super) async fn sequence_read_then_write(
2835 &mut self,
2836 mut ctx: ExecuteContext,
2837 plan: plan::ReadThenWritePlan,
2838 ) {
2839 let mut source_ids: BTreeSet<_> = plan
2840 .selection
2841 .depends_on()
2842 .into_iter()
2843 .map(|gid| self.catalog().resolve_item_id(&gid))
2844 .collect();
2845 source_ids.insert(plan.id);
2846
2847 if ctx.session().transaction().write_locks().is_none() {
2849 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2851
2852 for id in &source_ids {
2854 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2855 write_locks.insert_lock(*id, lock);
2856 }
2857 }
2858
2859 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2861 Ok(locks) => locks,
2862 Err(missing) => {
2863 let role_metadata = ctx.session().role_metadata().clone();
2865 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2866 let plan = DeferredPlan {
2867 ctx,
2868 plan: Plan::ReadThenWrite(plan),
2869 validity: PlanValidity::new(
2870 self.catalog.transient_revision(),
2871 source_ids.clone(),
2872 None,
2873 None,
2874 role_metadata,
2875 ),
2876 requires_locks: source_ids,
2877 };
2878 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2879 }
2880 };
2881
2882 ctx.session_mut()
2883 .try_grant_write_locks(write_locks)
2884 .expect("session has already been granted write locks");
2885 }
2886
2887 let plan::ReadThenWritePlan {
2888 id,
2889 kind,
2890 selection,
2891 mut assignments,
2892 finishing,
2893 returning,
2894 } = plan;
2895
2896 let desc = match self.catalog().try_get_entry(&id) {
2898 Some(table) => {
2899 let full_name = self
2900 .catalog()
2901 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2902 table
2904 .desc_latest(&full_name)
2905 .expect("desc called on table")
2906 .into_owned()
2907 }
2908 None => {
2909 ctx.retire(Err(AdapterError::Catalog(
2910 mz_catalog::memory::error::Error {
2911 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2912 id.to_string(),
2913 )),
2914 },
2915 )));
2916 return;
2917 }
2918 };
2919
2920 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2922 || assignments.values().any(|e| e.contains_temporal())
2923 || returning.iter().any(|e| e.contains_temporal());
2924 if contains_temporal {
2925 ctx.retire(Err(AdapterError::Unsupported(
2926 "calls to mz_now in write statements",
2927 )));
2928 return;
2929 }
2930
2931 fn validate_read_dependencies(
2939 catalog: &Catalog,
2940 id: &CatalogItemId,
2941 ) -> Result<(), AdapterError> {
2942 use CatalogItemType::*;
2943 use mz_catalog::memory::objects;
2944 let mut ids_to_check = Vec::new();
2945 let valid = match catalog.try_get_entry(id) {
2946 Some(entry) => {
2947 if let CatalogItem::View(objects::View { optimized_expr, .. })
2948 | CatalogItem::MaterializedView(objects::MaterializedView {
2949 optimized_expr,
2950 ..
2951 }) = entry.item()
2952 {
2953 if optimized_expr.contains_temporal() {
2954 return Err(AdapterError::Unsupported(
2955 "calls to mz_now in write statements",
2956 ));
2957 }
2958 }
2959 match entry.item().typ() {
2960 typ @ (Func | View | MaterializedView | ContinualTask) => {
2961 ids_to_check.extend(entry.uses());
2962 let valid_id = id.is_user() || matches!(typ, Func);
2963 valid_id
2964 }
2965 Source | Secret | Connection => false,
2966 Sink | Index => unreachable!(),
2968 Table => {
2969 if !id.is_user() {
2970 false
2972 } else {
2973 entry.source_export_details().is_none()
2975 }
2976 }
2977 Type => true,
2978 }
2979 }
2980 None => false,
2981 };
2982 if !valid {
2983 return Err(AdapterError::InvalidTableMutationSelection);
2984 }
2985 for id in ids_to_check {
2986 validate_read_dependencies(catalog, &id)?;
2987 }
2988 Ok(())
2989 }
2990
2991 for gid in selection.depends_on() {
2992 let item_id = self.catalog().resolve_item_id(&gid);
2993 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
2994 ctx.retire(Err(err));
2995 return;
2996 }
2997 }
2998
2999 let (peek_tx, peek_rx) = oneshot::channel();
3000 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3001 let (tx, _, session, extra) = ctx.into_parts();
3002 let peek_ctx = ExecuteContext::from_parts(
3014 peek_client_tx,
3015 self.internal_cmd_tx.clone(),
3016 session,
3017 Default::default(),
3018 );
3019 self.sequence_peek(
3020 peek_ctx,
3021 plan::SelectPlan {
3022 select: None,
3023 source: selection,
3024 when: QueryWhen::FreshestTableWrite,
3025 finishing,
3026 copy_to: None,
3027 },
3028 TargetCluster::Active,
3029 None,
3030 )
3031 .await;
3032
3033 let internal_cmd_tx = self.internal_cmd_tx.clone();
3034 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3035 let catalog = self.owned_catalog();
3036 let max_result_size = self.catalog().system_config().max_result_size();
3037 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3038 let (peek_response, session) = match peek_rx.await {
3039 Ok(Response {
3040 result: Ok(resp),
3041 session,
3042 otel_ctx,
3043 }) => {
3044 otel_ctx.attach_as_parent();
3045 (resp, session)
3046 }
3047 Ok(Response {
3048 result: Err(e),
3049 session,
3050 otel_ctx,
3051 }) => {
3052 let ctx =
3053 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3054 otel_ctx.attach_as_parent();
3055 ctx.retire(Err(e));
3056 return;
3057 }
3058 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3060 };
3061 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3062 let mut timeout_dur = *ctx.session().vars().statement_timeout();
3063
3064 if timeout_dur == Duration::ZERO {
3066 timeout_dur = Duration::MAX;
3067 }
3068
3069 let style = ExprPrepStyle::OneShot {
3070 logical_time: EvalTime::NotAvailable,
3071 session: ctx.session(),
3072 catalog_state: catalog.state(),
3073 };
3074 for expr in assignments.values_mut() {
3075 return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3076 }
3077
3078 let make_diffs =
3079 move |mut rows: Box<dyn RowIterator>| -> Result<Vec<(Row, Diff)>, AdapterError> {
3080 let arena = RowArena::new();
3081 let mut diffs = Vec::new();
3082 let mut datum_vec = mz_repr::DatumVec::new();
3083
3084 while let Some(row) = rows.next() {
3085 if !assignments.is_empty() {
3086 assert!(
3087 matches!(kind, MutationKind::Update),
3088 "only updates support assignments"
3089 );
3090 let mut datums = datum_vec.borrow_with(row);
3091 let mut updates = vec![];
3092 for (idx, expr) in &assignments {
3093 let updated = match expr.eval(&datums, &arena) {
3094 Ok(updated) => updated,
3095 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3096 };
3097 updates.push((*idx, updated));
3098 }
3099 for (idx, new_value) in updates {
3100 datums[idx] = new_value;
3101 }
3102 let updated = Row::pack_slice(&datums);
3103 diffs.push((updated, Diff::ONE));
3104 }
3105 match kind {
3106 MutationKind::Update | MutationKind::Delete => {
3110 diffs.push((row.to_owned(), Diff::MINUS_ONE))
3111 }
3112 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3113 }
3114 }
3115 for (row, diff) in &diffs {
3116 if diff.is_positive() {
3117 for (idx, datum) in row.iter().enumerate() {
3118 desc.constraints_met(idx, &datum)?;
3119 }
3120 }
3121 }
3122 Ok(diffs)
3123 };
3124 let diffs = match peek_response {
3125 ExecuteResponse::SendingRows { future: batch, .. } => {
3126 match tokio::time::timeout(timeout_dur, batch).await {
3131 Ok(res) => match res {
3132 PeekResponseUnary::Rows(rows) => make_diffs(rows),
3133 PeekResponseUnary::Canceled => Err(AdapterError::Canceled),
3134 PeekResponseUnary::Error(e) => {
3135 Err(AdapterError::Unstructured(anyhow!(e)))
3136 }
3137 },
3138 Err(_) => {
3139 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3144 conn_id: ctx.session().conn_id().clone(),
3145 });
3146 if let Err(e) = result {
3147 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3148 }
3149 Err(AdapterError::StatementTimeout)
3150 }
3151 }
3152 }
3153 ExecuteResponse::SendingRowsImmediate { rows } => make_diffs(rows),
3154 resp => Err(AdapterError::Unstructured(anyhow!(
3155 "unexpected peek response: {resp:?}"
3156 ))),
3157 };
3158 let mut returning_rows = Vec::new();
3159 let mut diff_err: Option<AdapterError> = None;
3160 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3161 let arena = RowArena::new();
3162 for (row, diff) in diffs {
3163 if !diff.is_positive() {
3164 continue;
3165 }
3166 let mut returning_row = Row::with_capacity(returning.len());
3167 let mut packer = returning_row.packer();
3168 for expr in &returning {
3169 let datums: Vec<_> = row.iter().collect();
3170 match expr.eval(&datums, &arena) {
3171 Ok(datum) => {
3172 packer.push(datum);
3173 }
3174 Err(err) => {
3175 diff_err = Some(err.into());
3176 break;
3177 }
3178 }
3179 }
3180 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3181 let diff = match NonZeroUsize::try_from(diff) {
3182 Ok(diff) => diff,
3183 Err(err) => {
3184 diff_err = Some(err.into());
3185 break;
3186 }
3187 };
3188 returning_rows.push((returning_row, diff));
3189 if diff_err.is_some() {
3190 break;
3191 }
3192 }
3193 }
3194 let diffs = if let Some(err) = diff_err {
3195 Err(err)
3196 } else {
3197 diffs
3198 };
3199
3200 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3203 if let Some(timestamp_context) = timestamp_context {
3212 let (tx, rx) = tokio::sync::oneshot::channel();
3213 let conn_id = ctx.session().conn_id().clone();
3214 let pending_read_txn = PendingReadTxn {
3215 txn: PendingRead::ReadThenWrite { ctx, tx },
3216 timestamp_context,
3217 created: Instant::now(),
3218 num_requeues: 0,
3219 otel_ctx: OpenTelemetryContext::obtain(),
3220 };
3221 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3222 if let Err(e) = result {
3224 warn!(
3225 "strict_serializable_reads_tx dropped before we could send: {:?}",
3226 e
3227 );
3228 return;
3229 }
3230 let result = rx.await;
3231 ctx = match result {
3233 Ok(Some(ctx)) => ctx,
3234 Ok(None) => {
3235 return;
3238 }
3239 Err(e) => {
3240 warn!(
3241 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3242 e
3243 );
3244 return;
3245 }
3246 };
3247 }
3248
3249 match diffs {
3250 Ok(diffs) => {
3251 let result = Self::send_diffs(
3252 ctx.session_mut(),
3253 plan::SendDiffsPlan {
3254 id,
3255 updates: diffs,
3256 kind,
3257 returning: returning_rows,
3258 max_result_size,
3259 },
3260 );
3261 ctx.retire(result);
3262 }
3263 Err(e) => {
3264 ctx.retire(Err(e));
3265 }
3266 }
3267 });
3268 }
3269
3270 #[instrument]
3271 pub(super) async fn sequence_alter_item_rename(
3272 &mut self,
3273 session: &mut Session,
3274 plan: plan::AlterItemRenamePlan,
3275 ) -> Result<ExecuteResponse, AdapterError> {
3276 let op = catalog::Op::RenameItem {
3277 id: plan.id,
3278 current_full_name: plan.current_full_name,
3279 to_name: plan.to_name,
3280 };
3281 match self
3282 .catalog_transact_with_ddl_transaction(session, vec![op])
3283 .await
3284 {
3285 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3286 Err(err) => Err(err),
3287 }
3288 }
3289
3290 #[instrument]
3291 pub(super) async fn sequence_alter_retain_history(
3292 &mut self,
3293 session: &mut Session,
3294 plan: plan::AlterRetainHistoryPlan,
3295 ) -> Result<ExecuteResponse, AdapterError> {
3296 let ops = vec![catalog::Op::AlterRetainHistory {
3297 id: plan.id,
3298 value: plan.value,
3299 window: plan.window,
3300 }];
3301 self.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
3302 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3303 let cluster = match catalog_item {
3304 CatalogItem::Table(_)
3305 | CatalogItem::MaterializedView(_)
3306 | CatalogItem::ContinualTask(_) => None,
3307 CatalogItem::Index(index) => Some(index.cluster_id),
3308 CatalogItem::Source(_) => {
3309 let read_policies = coord.catalog().source_read_policies(plan.id);
3310 coord.update_storage_read_policies(read_policies);
3311 return;
3312 }
3313 CatalogItem::Log(_)
3314 | CatalogItem::View(_)
3315 | CatalogItem::Sink(_)
3316 | CatalogItem::Type(_)
3317 | CatalogItem::Func(_)
3318 | CatalogItem::Secret(_)
3319 | CatalogItem::Connection(_) => unreachable!(),
3320 };
3321 match cluster {
3322 Some(cluster) => {
3323 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3324 }
3325 None => {
3326 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3327 }
3328 }
3329 })
3330 .await?;
3331 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3332 }
3333
3334 #[instrument]
3335 pub(super) async fn sequence_alter_schema_rename(
3336 &mut self,
3337 session: &mut Session,
3338 plan: plan::AlterSchemaRenamePlan,
3339 ) -> Result<ExecuteResponse, AdapterError> {
3340 let (database_spec, schema_spec) = plan.cur_schema_spec;
3341 let op = catalog::Op::RenameSchema {
3342 database_spec,
3343 schema_spec,
3344 new_name: plan.new_schema_name,
3345 check_reserved_names: true,
3346 };
3347 match self
3348 .catalog_transact_with_ddl_transaction(session, vec![op])
3349 .await
3350 {
3351 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3352 Err(err) => Err(err),
3353 }
3354 }
3355
3356 #[instrument]
3357 pub(super) async fn sequence_alter_schema_swap(
3358 &mut self,
3359 session: &mut Session,
3360 plan: plan::AlterSchemaSwapPlan,
3361 ) -> Result<ExecuteResponse, AdapterError> {
3362 let plan::AlterSchemaSwapPlan {
3363 schema_a_spec: (schema_a_db, schema_a),
3364 schema_a_name,
3365 schema_b_spec: (schema_b_db, schema_b),
3366 schema_b_name,
3367 name_temp,
3368 } = plan;
3369
3370 let op_a = catalog::Op::RenameSchema {
3371 database_spec: schema_a_db,
3372 schema_spec: schema_a,
3373 new_name: name_temp,
3374 check_reserved_names: false,
3375 };
3376 let op_b = catalog::Op::RenameSchema {
3377 database_spec: schema_b_db,
3378 schema_spec: schema_b,
3379 new_name: schema_a_name,
3380 check_reserved_names: false,
3381 };
3382 let op_c = catalog::Op::RenameSchema {
3383 database_spec: schema_a_db,
3384 schema_spec: schema_a,
3385 new_name: schema_b_name,
3386 check_reserved_names: false,
3387 };
3388
3389 match self
3390 .catalog_transact_with_ddl_transaction(session, vec![op_a, op_b, op_c])
3391 .await
3392 {
3393 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3394 Err(err) => Err(err),
3395 }
3396 }
3397
3398 #[instrument]
3399 pub(super) async fn sequence_alter_role(
3400 &mut self,
3401 session: &Session,
3402 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3403 ) -> Result<ExecuteResponse, AdapterError> {
3404 let catalog = self.catalog().for_session(session);
3405 let role = catalog.get_role(&id);
3406
3407 let mut notices = vec![];
3409
3410 let mut attributes = role.attributes().clone();
3412 let mut vars = role.vars().clone();
3413
3414 match option {
3416 PlannedAlterRoleOption::Attributes(attrs) => {
3417 self.validate_role_attributes(&attrs.clone().into())?;
3418
3419 if let Some(inherit) = attrs.inherit {
3420 attributes.inherit = inherit;
3421 }
3422
3423 if let Some(password) = attrs.password {
3424 attributes.password = Some(password);
3425 }
3426
3427 if let Some(superuser) = attrs.superuser {
3428 attributes.superuser = Some(superuser);
3429 }
3430
3431 if let Some(login) = attrs.login {
3432 attributes.login = Some(login);
3433 }
3434
3435 if attrs.nopassword.unwrap_or(false) {
3436 attributes.password = None;
3437 }
3438
3439 if let Some(notice) = self.should_emit_rbac_notice(session) {
3440 notices.push(notice);
3441 }
3442 }
3443 PlannedAlterRoleOption::Variable(variable) => {
3444 let session_var = session.vars().inspect(variable.name())?;
3446 session_var.visible(session.user(), catalog.system_vars())?;
3448
3449 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3452 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3453 } else if let PlannedRoleVariable::Set {
3454 name,
3455 value: VariableValue::Values(vals),
3456 } = &variable
3457 {
3458 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3459 notices.push(AdapterNotice::IntrospectionClusterUsage);
3460 }
3461 }
3462
3463 let var_name = match variable {
3464 PlannedRoleVariable::Set { name, value } => {
3465 match &value {
3467 VariableValue::Default => {
3468 vars.remove(&name);
3469 }
3470 VariableValue::Values(vals) => {
3471 let var = match &vals[..] {
3472 [val] => OwnedVarInput::Flat(val.clone()),
3473 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3474 };
3475 session_var.check(var.borrow())?;
3477
3478 vars.insert(name.clone(), var);
3479 }
3480 };
3481 name
3482 }
3483 PlannedRoleVariable::Reset { name } => {
3484 vars.remove(&name);
3486 name
3487 }
3488 };
3489
3490 notices.push(AdapterNotice::VarDefaultUpdated {
3492 role: Some(name.clone()),
3493 var_name: Some(var_name),
3494 });
3495 }
3496 }
3497
3498 let op = catalog::Op::AlterRole {
3499 id,
3500 name,
3501 attributes,
3502 vars: RoleVars { map: vars },
3503 };
3504 let response = self
3505 .catalog_transact(Some(session), vec![op])
3506 .await
3507 .map(|_| ExecuteResponse::AlteredRole)?;
3508
3509 session.add_notices(notices);
3511
3512 Ok(response)
3513 }
3514
3515 #[instrument]
3516 pub(super) async fn sequence_alter_sink_prepare(
3517 &mut self,
3518 ctx: ExecuteContext,
3519 plan: plan::AlterSinkPlan,
3520 ) {
3521 let id_bundle = crate::CollectionIdBundle {
3523 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3524 compute_ids: BTreeMap::new(),
3525 };
3526 let read_hold = self.acquire_read_holds(&id_bundle);
3527
3528 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3529 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3530 return;
3531 };
3532
3533 let otel_ctx = OpenTelemetryContext::obtain();
3534 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3535
3536 let plan_validity = PlanValidity::new(
3537 self.catalog().transient_revision(),
3538 BTreeSet::from_iter([from_item_id]),
3539 Some(plan.in_cluster),
3540 None,
3541 ctx.session().role_metadata().clone(),
3542 );
3543
3544 let create_sink_stmt = match mz_sql::parse::parse(&plan.sink.create_sql)
3547 .expect("invalid create sink sql")
3548 .into_element()
3549 .ast
3550 {
3551 Statement::CreateSink(stmt) => stmt,
3552 _ => unreachable!("invalid statment kind for sink"),
3553 };
3554 let catalog = self.catalog().for_system_session();
3555 let (_, resolved_ids) = match mz_sql::names::resolve(&catalog, create_sink_stmt) {
3556 Ok(ok) => ok,
3557 Err(e) => {
3558 ctx.retire(Err(AdapterError::internal("ALTER SINK", e)));
3559 return;
3560 }
3561 };
3562
3563 info!(
3564 "preparing alter sink for {}: frontiers={:?} export={:?}",
3565 plan.global_id,
3566 self.controller
3567 .storage_collections
3568 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3569 self.controller.storage.export(plan.global_id)
3570 );
3571
3572 self.install_storage_watch_set(
3575 ctx.session().conn_id().clone(),
3576 BTreeSet::from_iter([plan.global_id]),
3577 read_ts,
3578 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3579 ctx: Some(ctx),
3580 otel_ctx,
3581 plan,
3582 plan_validity,
3583 resolved_ids,
3584 read_hold,
3585 }),
3586 );
3587 }
3588
3589 #[instrument]
3590 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3591 ctx.otel_ctx.attach_as_parent();
3592 match ctx.plan_validity.check(self.catalog()) {
3593 Ok(()) => {}
3594 Err(err) => {
3595 ctx.retire(Err(err));
3596 return;
3597 }
3598 }
3599 {
3600 let plan = &ctx.plan;
3601 info!(
3602 "finishing alter sink for {}: frontiers={:?} export={:?}",
3603 plan.global_id,
3604 self.controller
3605 .storage_collections
3606 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3607 self.controller.storage.export(plan.global_id)
3608 );
3609 }
3610
3611 let plan::AlterSinkPlan {
3612 item_id,
3613 global_id,
3614 sink,
3615 with_snapshot,
3616 in_cluster,
3617 } = ctx.plan.clone();
3618 let write_frontier = &self
3621 .controller
3622 .storage
3623 .export(global_id)
3624 .expect("sink known to exist")
3625 .write_frontier;
3626 let as_of = ctx.read_hold.least_valid_read();
3627 assert!(
3628 write_frontier.iter().all(|t| as_of.less_than(t)),
3629 "{:?} should be strictly less than {:?}",
3630 &*as_of,
3631 &**write_frontier
3632 );
3633
3634 let catalog_sink = Sink {
3635 create_sql: sink.create_sql,
3636 global_id,
3637 from: sink.from,
3638 connection: sink.connection.clone(),
3639 envelope: sink.envelope,
3640 version: sink.version,
3641 with_snapshot,
3642 resolved_ids: ctx.resolved_ids.clone(),
3643 cluster_id: in_cluster,
3644 };
3645
3646 let ops = vec![catalog::Op::UpdateItem {
3647 id: item_id,
3648 name: self.catalog.get_entry(&item_id).name().clone(),
3649 to_item: CatalogItem::Sink(catalog_sink),
3650 }];
3651
3652 match self
3653 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3654 .await
3655 {
3656 Ok(()) => {}
3657 Err(err) => {
3658 ctx.retire(Err(err));
3659 return;
3660 }
3661 }
3662
3663 let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3664 let storage_sink_desc = StorageSinkDesc {
3665 from: sink.from,
3666 from_desc: from_entry
3667 .desc_opt()
3668 .expect("sinks can only be built on items with descs")
3669 .into_owned(),
3670 connection: sink
3671 .connection
3672 .clone()
3673 .into_inline_connection(self.catalog().state()),
3674 envelope: sink.envelope,
3675 as_of,
3676 with_snapshot,
3677 version: sink.version,
3678 from_storage_metadata: (),
3679 to_storage_metadata: (),
3680 };
3681
3682 self.controller
3683 .storage
3684 .alter_export(
3685 global_id,
3686 ExportDescription {
3687 sink: storage_sink_desc,
3688 instance_id: in_cluster,
3689 },
3690 )
3691 .await
3692 .unwrap_or_terminate("cannot fail to alter source desc");
3693
3694 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3695 }
3696
3697 #[instrument]
3698 pub(super) async fn sequence_alter_connection(
3699 &mut self,
3700 ctx: ExecuteContext,
3701 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3702 ) {
3703 match action {
3704 AlterConnectionAction::RotateKeys => {
3705 self.sequence_rotate_keys(ctx, id).await;
3706 }
3707 AlterConnectionAction::AlterOptions {
3708 set_options,
3709 drop_options,
3710 validate,
3711 } => {
3712 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3713 .await
3714 }
3715 }
3716 }
3717
3718 #[instrument]
3719 async fn sequence_alter_connection_options(
3720 &mut self,
3721 mut ctx: ExecuteContext,
3722 id: CatalogItemId,
3723 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3724 drop_options: BTreeSet<ConnectionOptionName>,
3725 validate: bool,
3726 ) {
3727 let cur_entry = self.catalog().get_entry(&id);
3728 let cur_conn = cur_entry.connection().expect("known to be connection");
3729 let connection_gid = cur_conn.global_id();
3730
3731 let inner = || -> Result<Connection, AdapterError> {
3732 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3734 .expect("invalid create sql persisted to catalog")
3735 .into_element()
3736 .ast
3737 {
3738 Statement::CreateConnection(stmt) => stmt,
3739 _ => unreachable!("proved type is source"),
3740 };
3741
3742 let catalog = self.catalog().for_system_session();
3743
3744 let (mut create_conn_stmt, resolved_ids) =
3746 mz_sql::names::resolve(&catalog, create_conn_stmt)
3747 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3748
3749 create_conn_stmt
3751 .values
3752 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3753
3754 create_conn_stmt.values.extend(
3756 set_options
3757 .into_iter()
3758 .map(|(name, value)| ConnectionOption { name, value }),
3759 );
3760
3761 let mut catalog = self.catalog().for_system_session();
3764 catalog.mark_id_unresolvable_for_replanning(id);
3765
3766 let plan = match mz_sql::plan::plan(
3768 None,
3769 &catalog,
3770 Statement::CreateConnection(create_conn_stmt),
3771 &Params::empty(),
3772 &resolved_ids,
3773 )
3774 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3775 {
3776 Plan::CreateConnection(plan) => plan,
3777 _ => unreachable!("create source plan is only valid response"),
3778 };
3779
3780 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3782 .expect("invalid create sql persisted to catalog")
3783 .into_element()
3784 .ast
3785 {
3786 Statement::CreateConnection(stmt) => stmt,
3787 _ => unreachable!("proved type is source"),
3788 };
3789
3790 let catalog = self.catalog().for_system_session();
3791
3792 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3794 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3795
3796 Ok(Connection {
3797 create_sql: plan.connection.create_sql,
3798 global_id: cur_conn.global_id,
3799 details: plan.connection.details,
3800 resolved_ids: new_deps,
3801 })
3802 };
3803
3804 let conn = match inner() {
3805 Ok(conn) => conn,
3806 Err(e) => {
3807 return ctx.retire(Err(e));
3808 }
3809 };
3810
3811 if validate {
3812 let connection = conn
3813 .details
3814 .to_connection()
3815 .into_inline_connection(self.catalog().state());
3816
3817 let internal_cmd_tx = self.internal_cmd_tx.clone();
3818 let transient_revision = self.catalog().transient_revision();
3819 let conn_id = ctx.session().conn_id().clone();
3820 let otel_ctx = OpenTelemetryContext::obtain();
3821 let role_metadata = ctx.session().role_metadata().clone();
3822 let current_storage_parameters = self.controller.storage.config().clone();
3823
3824 task::spawn(
3825 || format!("validate_alter_connection:{conn_id}"),
3826 async move {
3827 let resolved_ids = conn.resolved_ids.clone();
3828 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3829 let result = match connection.validate(id, ¤t_storage_parameters).await {
3830 Ok(()) => Ok(conn),
3831 Err(err) => Err(err.into()),
3832 };
3833
3834 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3836 AlterConnectionValidationReady {
3837 ctx,
3838 result,
3839 connection_id: id,
3840 connection_gid,
3841 plan_validity: PlanValidity::new(
3842 transient_revision,
3843 dependency_ids.clone(),
3844 None,
3845 None,
3846 role_metadata,
3847 ),
3848 otel_ctx,
3849 resolved_ids,
3850 },
3851 ));
3852 if let Err(e) = result {
3853 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3854 }
3855 },
3856 );
3857 } else {
3858 let result = self
3859 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3860 .await;
3861 ctx.retire(result);
3862 }
3863 }
3864
3865 #[instrument]
3866 pub(crate) async fn sequence_alter_connection_stage_finish(
3867 &mut self,
3868 session: &mut Session,
3869 id: CatalogItemId,
3870 connection: Connection,
3871 ) -> Result<ExecuteResponse, AdapterError> {
3872 match self.catalog.get_entry(&id).item() {
3873 CatalogItem::Connection(curr_conn) => {
3874 curr_conn
3875 .details
3876 .to_connection()
3877 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3878 .map_err(StorageError::from)?;
3879 }
3880 _ => unreachable!("known to be a connection"),
3881 };
3882
3883 let ops = vec![catalog::Op::UpdateItem {
3884 id,
3885 name: self.catalog.get_entry(&id).name().clone(),
3886 to_item: CatalogItem::Connection(connection.clone()),
3887 }];
3888
3889 self.catalog_transact(Some(session), ops).await?;
3890
3891 match connection.details {
3892 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3893 let spec = VpcEndpointConfig {
3894 aws_service_name: privatelink.service_name.to_owned(),
3895 availability_zone_ids: privatelink.availability_zones.to_owned(),
3896 };
3897 self.cloud_resource_controller
3898 .as_ref()
3899 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3900 .ensure_vpc_endpoint(id, spec)
3901 .await?;
3902 }
3903 _ => {}
3904 };
3905
3906 let entry = self.catalog().get_entry(&id);
3907
3908 let mut connections = VecDeque::new();
3909 connections.push_front(entry.id());
3910
3911 let mut source_connections = BTreeMap::new();
3912 let mut sink_connections = BTreeMap::new();
3913 let mut source_export_data_configs = BTreeMap::new();
3914
3915 while let Some(id) = connections.pop_front() {
3916 for id in self.catalog.get_entry(&id).used_by() {
3917 let entry = self.catalog.get_entry(id);
3918 match entry.item() {
3919 CatalogItem::Connection(_) => connections.push_back(*id),
3920 CatalogItem::Source(source) => {
3921 let desc = match &entry.source().expect("known to be source").data_source {
3922 DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
3923 .desc
3924 .clone()
3925 .into_inline_connection(self.catalog().state()),
3926 _ => unreachable!("only ingestions reference connections"),
3927 };
3928
3929 source_connections.insert(source.global_id, desc.connection);
3930 }
3931 CatalogItem::Sink(sink) => {
3932 let export = entry.sink().expect("known to be sink");
3933 sink_connections.insert(
3934 sink.global_id,
3935 export
3936 .connection
3937 .clone()
3938 .into_inline_connection(self.catalog().state()),
3939 );
3940 }
3941 CatalogItem::Table(table) => {
3942 if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
3945 let data_config = export_data_config.clone();
3946 source_export_data_configs.insert(
3947 table.global_id_writes(),
3948 data_config.into_inline_connection(self.catalog().state()),
3949 );
3950 }
3951 }
3952 t => unreachable!("connection dependency not expected on {:?}", t),
3953 }
3954 }
3955 }
3956
3957 if !source_connections.is_empty() {
3958 self.controller
3959 .storage
3960 .alter_ingestion_connections(source_connections)
3961 .await
3962 .unwrap_or_terminate("cannot fail to alter ingestion connection");
3963 }
3964
3965 if !sink_connections.is_empty() {
3966 self.controller
3967 .storage
3968 .alter_export_connections(sink_connections)
3969 .await
3970 .unwrap_or_terminate("altering exports after txn must succeed");
3971 }
3972
3973 if !source_export_data_configs.is_empty() {
3974 self.controller
3975 .storage
3976 .alter_ingestion_export_data_configs(source_export_data_configs)
3977 .await
3978 .unwrap_or_terminate("altering source export data configs after txn must succeed");
3979 }
3980
3981 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
3982 }
3983
3984 #[instrument]
3985 pub(super) async fn sequence_alter_source(
3986 &mut self,
3987 session: &Session,
3988 plan::AlterSourcePlan {
3989 item_id,
3990 ingestion_id,
3991 action,
3992 }: plan::AlterSourcePlan,
3993 ) -> Result<ExecuteResponse, AdapterError> {
3994 let cur_entry = self.catalog().get_entry(&item_id);
3995 let cur_source = cur_entry.source().expect("known to be source");
3996
3997 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
3998 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4000 .expect("invalid create sql persisted to catalog")
4001 .into_element()
4002 .ast
4003 {
4004 Statement::CreateSource(stmt) => stmt,
4005 _ => unreachable!("proved type is source"),
4006 };
4007
4008 let catalog = coord.catalog().for_system_session();
4009
4010 mz_sql::names::resolve(&catalog, create_source_stmt)
4012 .map_err(|e| AdapterError::internal(err_cx, e))
4013 };
4014
4015 match action {
4016 plan::AlterSourceAction::AddSubsourceExports {
4017 subsources,
4018 options,
4019 } => {
4020 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4021
4022 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4023 text_columns: mut new_text_columns,
4024 exclude_columns: mut new_exclude_columns,
4025 ..
4026 } = options.try_into()?;
4027
4028 let (mut create_source_stmt, resolved_ids) =
4030 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4031
4032 let catalog = self.catalog();
4034 let curr_references: BTreeSet<_> = catalog
4035 .get_entry(&item_id)
4036 .used_by()
4037 .into_iter()
4038 .filter_map(|subsource| {
4039 catalog
4040 .get_entry(subsource)
4041 .subsource_details()
4042 .map(|(_id, reference, _details)| reference)
4043 })
4044 .collect();
4045
4046 let purification_err =
4049 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4050
4051 match &mut create_source_stmt.connection {
4055 CreateSourceConnection::Postgres {
4056 options: curr_options,
4057 ..
4058 } => {
4059 let mz_sql::plan::PgConfigOptionExtracted {
4060 mut text_columns, ..
4061 } = curr_options.clone().try_into()?;
4062
4063 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4066
4067 text_columns.retain(|column_qualified_reference| {
4069 mz_ore::soft_assert_eq_or_log!(
4070 column_qualified_reference.0.len(),
4071 4,
4072 "all TEXT COLUMNS values must be column-qualified references"
4073 );
4074 let mut table = column_qualified_reference.clone();
4075 table.0.truncate(3);
4076 curr_references.contains(&table)
4077 });
4078
4079 new_text_columns.extend(text_columns);
4081
4082 if !new_text_columns.is_empty() {
4084 new_text_columns.sort();
4085 let new_text_columns = new_text_columns
4086 .into_iter()
4087 .map(WithOptionValue::UnresolvedItemName)
4088 .collect();
4089
4090 curr_options.push(PgConfigOption {
4091 name: PgConfigOptionName::TextColumns,
4092 value: Some(WithOptionValue::Sequence(new_text_columns)),
4093 });
4094 }
4095 }
4096 CreateSourceConnection::MySql {
4097 options: curr_options,
4098 ..
4099 } => {
4100 let mz_sql::plan::MySqlConfigOptionExtracted {
4101 mut text_columns,
4102 mut exclude_columns,
4103 ..
4104 } = curr_options.clone().try_into()?;
4105
4106 curr_options.retain(|o| {
4109 !matches!(
4110 o.name,
4111 MySqlConfigOptionName::TextColumns
4112 | MySqlConfigOptionName::ExcludeColumns
4113 )
4114 });
4115
4116 let column_referenced =
4118 |column_qualified_reference: &UnresolvedItemName| {
4119 mz_ore::soft_assert_eq_or_log!(
4120 column_qualified_reference.0.len(),
4121 3,
4122 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4123 );
4124 let mut table = column_qualified_reference.clone();
4125 table.0.truncate(2);
4126 curr_references.contains(&table)
4127 };
4128 text_columns.retain(column_referenced);
4129 exclude_columns.retain(column_referenced);
4130
4131 new_text_columns.extend(text_columns);
4133 new_exclude_columns.extend(exclude_columns);
4134
4135 if !new_text_columns.is_empty() {
4137 new_text_columns.sort();
4138 let new_text_columns = new_text_columns
4139 .into_iter()
4140 .map(WithOptionValue::UnresolvedItemName)
4141 .collect();
4142
4143 curr_options.push(MySqlConfigOption {
4144 name: MySqlConfigOptionName::TextColumns,
4145 value: Some(WithOptionValue::Sequence(new_text_columns)),
4146 });
4147 }
4148 if !new_exclude_columns.is_empty() {
4150 new_exclude_columns.sort();
4151 let new_exclude_columns = new_exclude_columns
4152 .into_iter()
4153 .map(WithOptionValue::UnresolvedItemName)
4154 .collect();
4155
4156 curr_options.push(MySqlConfigOption {
4157 name: MySqlConfigOptionName::ExcludeColumns,
4158 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4159 });
4160 }
4161 }
4162 CreateSourceConnection::SqlServer {
4163 options: curr_options,
4164 ..
4165 } => {
4166 let mz_sql::plan::SqlServerConfigOptionExtracted {
4167 mut text_columns,
4168 mut exclude_columns,
4169 ..
4170 } = curr_options.clone().try_into()?;
4171
4172 curr_options.retain(|o| {
4175 !matches!(
4176 o.name,
4177 SqlServerConfigOptionName::TextColumns
4178 | SqlServerConfigOptionName::ExcludeColumns
4179 )
4180 });
4181
4182 let column_referenced =
4184 |column_qualified_reference: &UnresolvedItemName| {
4185 mz_ore::soft_assert_eq_or_log!(
4186 column_qualified_reference.0.len(),
4187 3,
4188 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4189 );
4190 let mut table = column_qualified_reference.clone();
4191 table.0.truncate(2);
4192 curr_references.contains(&table)
4193 };
4194 text_columns.retain(column_referenced);
4195 exclude_columns.retain(column_referenced);
4196
4197 new_text_columns.extend(text_columns);
4199 new_exclude_columns.extend(exclude_columns);
4200
4201 if !new_text_columns.is_empty() {
4203 new_text_columns.sort();
4204 let new_text_columns = new_text_columns
4205 .into_iter()
4206 .map(WithOptionValue::UnresolvedItemName)
4207 .collect();
4208
4209 curr_options.push(SqlServerConfigOption {
4210 name: SqlServerConfigOptionName::TextColumns,
4211 value: Some(WithOptionValue::Sequence(new_text_columns)),
4212 });
4213 }
4214 if !new_exclude_columns.is_empty() {
4216 new_exclude_columns.sort();
4217 let new_exclude_columns = new_exclude_columns
4218 .into_iter()
4219 .map(WithOptionValue::UnresolvedItemName)
4220 .collect();
4221
4222 curr_options.push(SqlServerConfigOption {
4223 name: SqlServerConfigOptionName::ExcludeColumns,
4224 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4225 });
4226 }
4227 }
4228 _ => return Err(purification_err()),
4229 };
4230
4231 let mut catalog = self.catalog().for_system_session();
4232 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4233
4234 let plan = match mz_sql::plan::plan(
4236 None,
4237 &catalog,
4238 Statement::CreateSource(create_source_stmt),
4239 &Params::empty(),
4240 &resolved_ids,
4241 )
4242 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4243 {
4244 Plan::CreateSource(plan) => plan,
4245 _ => unreachable!("create source plan is only valid response"),
4246 };
4247
4248 let source = Source::new(
4252 plan,
4253 cur_source.global_id,
4254 resolved_ids,
4255 cur_source.custom_logical_compaction_window,
4256 cur_source.is_retained_metrics_object,
4257 );
4258
4259 let source_compaction_window = source.custom_logical_compaction_window;
4260
4261 let desc = match &source.data_source {
4263 DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
4264 .desc
4265 .clone()
4266 .into_inline_connection(self.catalog().state()),
4267 _ => unreachable!("already verified of type ingestion"),
4268 };
4269
4270 self.controller
4271 .storage
4272 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4273 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4274
4275 let mut ops = vec![catalog::Op::UpdateItem {
4278 id: item_id,
4279 name: self.catalog.get_entry(&item_id).name().clone(),
4282 to_item: CatalogItem::Source(source),
4283 }];
4284
4285 let CreateSourceInner {
4286 ops: new_ops,
4287 sources,
4288 if_not_exists_ids,
4289 } = self.create_source_inner(session, subsources).await?;
4290
4291 ops.extend(new_ops.into_iter());
4292
4293 assert!(
4294 if_not_exists_ids.is_empty(),
4295 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4296 );
4297
4298 self.catalog_transact(Some(session), ops).await?;
4299
4300 self.controller
4301 .storage
4302 .alter_ingestion_source_desc(ingestion_id, desc)
4303 .await
4304 .unwrap_or_terminate("cannot fail to alter source desc");
4305
4306 let mut item_ids = BTreeSet::new();
4307 let mut collections = Vec::with_capacity(sources.len());
4308 for (item_id, source) in sources {
4309 let status_id = self.catalog().resolve_builtin_storage_collection(
4310 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4311 );
4312 let source_status_collection_id =
4313 Some(self.catalog().get_entry(&status_id).latest_global_id());
4314
4315 let (data_source, status_collection_id) = match source.data_source {
4316 DataSourceDesc::IngestionExport {
4318 ingestion_id,
4319 external_reference: _,
4320 details,
4321 data_config,
4322 } => {
4323 let ingestion_id =
4326 self.catalog().get_entry(&ingestion_id).latest_global_id();
4327 (
4328 DataSource::IngestionExport {
4329 ingestion_id,
4330 details,
4331 data_config: data_config
4332 .into_inline_connection(self.catalog().state()),
4333 },
4334 source_status_collection_id,
4335 )
4336 }
4337 o => {
4338 unreachable!(
4339 "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4340 o
4341 )
4342 }
4343 };
4344
4345 collections.push((
4346 source.global_id,
4347 CollectionDescription {
4348 desc: source.desc.clone(),
4349 data_source,
4350 since: None,
4351 status_collection_id,
4352 timeline: Some(source.timeline.clone()),
4353 },
4354 ));
4355
4356 item_ids.insert(item_id);
4357 }
4358
4359 let storage_metadata = self.catalog.state().storage_metadata();
4360
4361 self.controller
4362 .storage
4363 .create_collections(storage_metadata, None, collections)
4364 .await
4365 .unwrap_or_terminate("cannot fail to create collections");
4366
4367 self.initialize_storage_read_policies(
4368 item_ids,
4369 source_compaction_window.unwrap_or(CompactionWindow::Default),
4370 )
4371 .await;
4372 }
4373 plan::AlterSourceAction::RefreshReferences { references } => {
4374 self.catalog_transact(
4375 Some(session),
4376 vec![catalog::Op::UpdateSourceReferences {
4377 source_id: item_id,
4378 references: references.into(),
4379 }],
4380 )
4381 .await?;
4382 }
4383 }
4384
4385 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4386 }
4387
4388 #[instrument]
4389 pub(super) async fn sequence_alter_system_set(
4390 &mut self,
4391 session: &Session,
4392 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4393 ) -> Result<ExecuteResponse, AdapterError> {
4394 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4395 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4397 self.validate_alter_system_network_policy(session, &value)?;
4398 }
4399
4400 let op = match value {
4401 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4402 name: name.clone(),
4403 value: OwnedVarInput::SqlSet(values),
4404 },
4405 plan::VariableValue::Default => {
4406 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4407 }
4408 };
4409 self.catalog_transact(Some(session), vec![op]).await?;
4410
4411 session.add_notice(AdapterNotice::VarDefaultUpdated {
4412 role: None,
4413 var_name: Some(name),
4414 });
4415 Ok(ExecuteResponse::AlteredSystemConfiguration)
4416 }
4417
4418 #[instrument]
4419 pub(super) async fn sequence_alter_system_reset(
4420 &mut self,
4421 session: &Session,
4422 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4423 ) -> Result<ExecuteResponse, AdapterError> {
4424 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4425 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4426 self.catalog_transact(Some(session), vec![op]).await?;
4427 session.add_notice(AdapterNotice::VarDefaultUpdated {
4428 role: None,
4429 var_name: Some(name),
4430 });
4431 Ok(ExecuteResponse::AlteredSystemConfiguration)
4432 }
4433
4434 #[instrument]
4435 pub(super) async fn sequence_alter_system_reset_all(
4436 &mut self,
4437 session: &Session,
4438 _: plan::AlterSystemResetAllPlan,
4439 ) -> Result<ExecuteResponse, AdapterError> {
4440 self.is_user_allowed_to_alter_system(session, None)?;
4441 let op = catalog::Op::ResetAllSystemConfiguration;
4442 self.catalog_transact(Some(session), vec![op]).await?;
4443 session.add_notice(AdapterNotice::VarDefaultUpdated {
4444 role: None,
4445 var_name: None,
4446 });
4447 Ok(ExecuteResponse::AlteredSystemConfiguration)
4448 }
4449
4450 fn is_user_allowed_to_alter_system(
4452 &self,
4453 session: &Session,
4454 var_name: Option<&str>,
4455 ) -> Result<(), AdapterError> {
4456 match (session.user().kind(), var_name) {
4457 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4459 (UserKind::Superuser, Some(name))
4461 if session.user().is_internal()
4462 || self.catalog().system_config().user_modifiable(name) =>
4463 {
4464 let var = self.catalog().system_config().get(name)?;
4467 var.visible(session.user(), self.catalog().system_config())?;
4468 Ok(())
4469 }
4470 (UserKind::Regular, Some(name))
4473 if self.catalog().system_config().user_modifiable(name) =>
4474 {
4475 Err(AdapterError::Unauthorized(
4476 rbac::UnauthorizedError::Superuser {
4477 action: format!("toggle the '{name}' system configuration parameter"),
4478 },
4479 ))
4480 }
4481 _ => Err(AdapterError::Unauthorized(
4482 rbac::UnauthorizedError::MzSystem {
4483 action: "alter system".into(),
4484 },
4485 )),
4486 }
4487 }
4488
4489 fn validate_alter_system_network_policy(
4490 &self,
4491 session: &Session,
4492 policy_value: &plan::VariableValue,
4493 ) -> Result<(), AdapterError> {
4494 let policy_name = match &policy_value {
4495 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4497 plan::VariableValue::Values(values) if values.len() == 1 => {
4498 values.iter().next().cloned()
4499 }
4500 plan::VariableValue::Values(values) => {
4501 tracing::warn!(?values, "can't set multiple network policies at once");
4502 None
4503 }
4504 };
4505 let maybe_network_policy = policy_name
4506 .as_ref()
4507 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4508 let Some(network_policy) = maybe_network_policy else {
4509 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4510 VarError::InvalidParameterValue {
4511 name: NETWORK_POLICY.name(),
4512 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4513 reason: "no network policy with such name exists".to_string(),
4514 },
4515 )));
4516 };
4517 self.validate_alter_network_policy(session, &network_policy.rules)
4518 }
4519
4520 fn validate_alter_network_policy(
4525 &self,
4526 session: &Session,
4527 policy_rules: &Vec<NetworkPolicyRule>,
4528 ) -> Result<(), AdapterError> {
4529 if session.user().is_internal() {
4532 return Ok(());
4533 }
4534 if let Some(ip) = session.meta().client_ip() {
4535 validate_ip_with_policy_rules(ip, policy_rules)
4536 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4537 } else {
4538 return Err(AdapterError::NetworkPolicyDenied(
4541 NetworkPolicyError::MissingIp,
4542 ));
4543 }
4544 Ok(())
4545 }
4546
4547 #[instrument]
4549 pub(super) fn sequence_execute(
4550 &mut self,
4551 session: &mut Session,
4552 plan: plan::ExecutePlan,
4553 ) -> Result<String, AdapterError> {
4554 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4556 let ps = session
4557 .get_prepared_statement_unverified(&plan.name)
4558 .expect("known to exist");
4559 let stmt = ps.stmt().cloned();
4560 let desc = ps.desc().clone();
4561 let revision = ps.catalog_revision;
4562 let logging = Arc::clone(ps.logging());
4563 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), revision)
4564 }
4565
4566 #[instrument]
4567 pub(super) async fn sequence_grant_privileges(
4568 &mut self,
4569 session: &Session,
4570 plan::GrantPrivilegesPlan {
4571 update_privileges,
4572 grantees,
4573 }: plan::GrantPrivilegesPlan,
4574 ) -> Result<ExecuteResponse, AdapterError> {
4575 self.sequence_update_privileges(
4576 session,
4577 update_privileges,
4578 grantees,
4579 UpdatePrivilegeVariant::Grant,
4580 )
4581 .await
4582 }
4583
4584 #[instrument]
4585 pub(super) async fn sequence_revoke_privileges(
4586 &mut self,
4587 session: &Session,
4588 plan::RevokePrivilegesPlan {
4589 update_privileges,
4590 revokees,
4591 }: plan::RevokePrivilegesPlan,
4592 ) -> Result<ExecuteResponse, AdapterError> {
4593 self.sequence_update_privileges(
4594 session,
4595 update_privileges,
4596 revokees,
4597 UpdatePrivilegeVariant::Revoke,
4598 )
4599 .await
4600 }
4601
4602 #[instrument]
4603 async fn sequence_update_privileges(
4604 &mut self,
4605 session: &Session,
4606 update_privileges: Vec<UpdatePrivilege>,
4607 grantees: Vec<RoleId>,
4608 variant: UpdatePrivilegeVariant,
4609 ) -> Result<ExecuteResponse, AdapterError> {
4610 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4611 let mut warnings = Vec::new();
4612 let catalog = self.catalog().for_session(session);
4613
4614 for UpdatePrivilege {
4615 acl_mode,
4616 target_id,
4617 grantor,
4618 } in update_privileges
4619 {
4620 let actual_object_type = catalog.get_system_object_type(&target_id);
4621 if actual_object_type.is_relation() {
4624 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4625 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4626 if !non_applicable_privileges.is_empty() {
4627 let object_description =
4628 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4629 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4630 non_applicable_privileges,
4631 object_description,
4632 })
4633 }
4634 }
4635
4636 if let SystemObjectId::Object(object_id) = &target_id {
4637 self.catalog()
4638 .ensure_not_reserved_object(object_id, session.conn_id())?;
4639 }
4640
4641 let privileges = self
4642 .catalog()
4643 .get_privileges(&target_id, session.conn_id())
4644 .ok_or(AdapterError::Unsupported(
4647 "GRANTs/REVOKEs on an object type with no privileges",
4648 ))?;
4649
4650 for grantee in &grantees {
4651 self.catalog().ensure_not_system_role(grantee)?;
4652 self.catalog().ensure_not_predefined_role(grantee)?;
4653 let existing_privilege = privileges
4654 .get_acl_item(grantee, &grantor)
4655 .map(Cow::Borrowed)
4656 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4657
4658 match variant {
4659 UpdatePrivilegeVariant::Grant
4660 if !existing_privilege.acl_mode.contains(acl_mode) =>
4661 {
4662 ops.push(catalog::Op::UpdatePrivilege {
4663 target_id: target_id.clone(),
4664 privilege: MzAclItem {
4665 grantee: *grantee,
4666 grantor,
4667 acl_mode,
4668 },
4669 variant,
4670 });
4671 }
4672 UpdatePrivilegeVariant::Revoke
4673 if !existing_privilege
4674 .acl_mode
4675 .intersection(acl_mode)
4676 .is_empty() =>
4677 {
4678 ops.push(catalog::Op::UpdatePrivilege {
4679 target_id: target_id.clone(),
4680 privilege: MzAclItem {
4681 grantee: *grantee,
4682 grantor,
4683 acl_mode,
4684 },
4685 variant,
4686 });
4687 }
4688 _ => {}
4690 }
4691 }
4692 }
4693
4694 if ops.is_empty() {
4695 session.add_notices(warnings);
4696 return Ok(variant.into());
4697 }
4698
4699 let res = self
4700 .catalog_transact(Some(session), ops)
4701 .await
4702 .map(|_| match variant {
4703 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4704 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4705 });
4706 if res.is_ok() {
4707 session.add_notices(warnings);
4708 }
4709 res
4710 }
4711
4712 #[instrument]
4713 pub(super) async fn sequence_alter_default_privileges(
4714 &mut self,
4715 session: &Session,
4716 plan::AlterDefaultPrivilegesPlan {
4717 privilege_objects,
4718 privilege_acl_items,
4719 is_grant,
4720 }: plan::AlterDefaultPrivilegesPlan,
4721 ) -> Result<ExecuteResponse, AdapterError> {
4722 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4723 let variant = if is_grant {
4724 UpdatePrivilegeVariant::Grant
4725 } else {
4726 UpdatePrivilegeVariant::Revoke
4727 };
4728 for privilege_object in &privilege_objects {
4729 self.catalog()
4730 .ensure_not_system_role(&privilege_object.role_id)?;
4731 self.catalog()
4732 .ensure_not_predefined_role(&privilege_object.role_id)?;
4733 if let Some(database_id) = privilege_object.database_id {
4734 self.catalog()
4735 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4736 }
4737 if let Some(schema_id) = privilege_object.schema_id {
4738 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4739 let schema_spec: SchemaSpecifier = schema_id.into();
4740
4741 self.catalog().ensure_not_reserved_object(
4742 &(database_spec, schema_spec).into(),
4743 session.conn_id(),
4744 )?;
4745 }
4746 for privilege_acl_item in &privilege_acl_items {
4747 self.catalog()
4748 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4749 self.catalog()
4750 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4751 ops.push(catalog::Op::UpdateDefaultPrivilege {
4752 privilege_object: privilege_object.clone(),
4753 privilege_acl_item: privilege_acl_item.clone(),
4754 variant,
4755 })
4756 }
4757 }
4758
4759 self.catalog_transact(Some(session), ops).await?;
4760 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4761 }
4762
4763 #[instrument]
4764 pub(super) async fn sequence_grant_role(
4765 &mut self,
4766 session: &Session,
4767 plan::GrantRolePlan {
4768 role_ids,
4769 member_ids,
4770 grantor_id,
4771 }: plan::GrantRolePlan,
4772 ) -> Result<ExecuteResponse, AdapterError> {
4773 let catalog = self.catalog();
4774 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4775 for role_id in role_ids {
4776 for member_id in &member_ids {
4777 let member_membership: BTreeSet<_> =
4778 catalog.get_role(member_id).membership().keys().collect();
4779 if member_membership.contains(&role_id) {
4780 let role_name = catalog.get_role(&role_id).name().to_string();
4781 let member_name = catalog.get_role(member_id).name().to_string();
4782 catalog.ensure_not_reserved_role(member_id)?;
4784 catalog.ensure_grantable_role(&role_id)?;
4785 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4786 role_name,
4787 member_name,
4788 });
4789 } else {
4790 ops.push(catalog::Op::GrantRole {
4791 role_id,
4792 member_id: *member_id,
4793 grantor_id,
4794 });
4795 }
4796 }
4797 }
4798
4799 if ops.is_empty() {
4800 return Ok(ExecuteResponse::GrantedRole);
4801 }
4802
4803 self.catalog_transact(Some(session), ops)
4804 .await
4805 .map(|_| ExecuteResponse::GrantedRole)
4806 }
4807
4808 #[instrument]
4809 pub(super) async fn sequence_revoke_role(
4810 &mut self,
4811 session: &Session,
4812 plan::RevokeRolePlan {
4813 role_ids,
4814 member_ids,
4815 grantor_id,
4816 }: plan::RevokeRolePlan,
4817 ) -> Result<ExecuteResponse, AdapterError> {
4818 let catalog = self.catalog();
4819 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4820 for role_id in role_ids {
4821 for member_id in &member_ids {
4822 let member_membership: BTreeSet<_> =
4823 catalog.get_role(member_id).membership().keys().collect();
4824 if !member_membership.contains(&role_id) {
4825 let role_name = catalog.get_role(&role_id).name().to_string();
4826 let member_name = catalog.get_role(member_id).name().to_string();
4827 catalog.ensure_not_reserved_role(member_id)?;
4829 catalog.ensure_grantable_role(&role_id)?;
4830 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4831 role_name,
4832 member_name,
4833 });
4834 } else {
4835 ops.push(catalog::Op::RevokeRole {
4836 role_id,
4837 member_id: *member_id,
4838 grantor_id,
4839 });
4840 }
4841 }
4842 }
4843
4844 if ops.is_empty() {
4845 return Ok(ExecuteResponse::RevokedRole);
4846 }
4847
4848 self.catalog_transact(Some(session), ops)
4849 .await
4850 .map(|_| ExecuteResponse::RevokedRole)
4851 }
4852
4853 #[instrument]
4854 pub(super) async fn sequence_alter_owner(
4855 &mut self,
4856 session: &Session,
4857 plan::AlterOwnerPlan {
4858 id,
4859 object_type,
4860 new_owner,
4861 }: plan::AlterOwnerPlan,
4862 ) -> Result<ExecuteResponse, AdapterError> {
4863 let mut ops = vec![catalog::Op::UpdateOwner {
4864 id: id.clone(),
4865 new_owner,
4866 }];
4867
4868 match &id {
4869 ObjectId::Item(global_id) => {
4870 let entry = self.catalog().get_entry(global_id);
4871
4872 if entry.is_index() {
4874 let name = self
4875 .catalog()
4876 .resolve_full_name(entry.name(), Some(session.conn_id()))
4877 .to_string();
4878 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4879 return Ok(ExecuteResponse::AlteredObject(object_type));
4880 }
4881
4882 let dependent_index_ops = entry
4884 .used_by()
4885 .into_iter()
4886 .filter(|id| self.catalog().get_entry(id).is_index())
4887 .map(|id| catalog::Op::UpdateOwner {
4888 id: ObjectId::Item(*id),
4889 new_owner,
4890 });
4891 ops.extend(dependent_index_ops);
4892
4893 let dependent_subsources =
4895 entry
4896 .progress_id()
4897 .into_iter()
4898 .map(|item_id| catalog::Op::UpdateOwner {
4899 id: ObjectId::Item(item_id),
4900 new_owner,
4901 });
4902 ops.extend(dependent_subsources);
4903 }
4904 ObjectId::Cluster(cluster_id) => {
4905 let cluster = self.catalog().get_cluster(*cluster_id);
4906 let managed_cluster_replica_ops =
4908 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4909 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4910 new_owner,
4911 });
4912 ops.extend(managed_cluster_replica_ops);
4913 }
4914 _ => {}
4915 }
4916
4917 self.catalog_transact(Some(session), ops)
4918 .await
4919 .map(|_| ExecuteResponse::AlteredObject(object_type))
4920 }
4921
4922 #[instrument]
4923 pub(super) async fn sequence_reassign_owned(
4924 &mut self,
4925 session: &Session,
4926 plan::ReassignOwnedPlan {
4927 old_roles,
4928 new_role,
4929 reassign_ids,
4930 }: plan::ReassignOwnedPlan,
4931 ) -> Result<ExecuteResponse, AdapterError> {
4932 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
4933 self.catalog().ensure_not_reserved_role(role_id)?;
4934 }
4935
4936 let ops = reassign_ids
4937 .into_iter()
4938 .map(|id| catalog::Op::UpdateOwner {
4939 id,
4940 new_owner: new_role,
4941 })
4942 .collect();
4943
4944 self.catalog_transact(Some(session), ops)
4945 .await
4946 .map(|_| ExecuteResponse::ReassignOwned)
4947 }
4948
4949 #[instrument]
4950 pub(crate) async fn handle_deferred_statement(&mut self) {
4951 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
4955 return;
4956 };
4957 match ps {
4958 crate::coord::PlanStatement::Statement { stmt, params } => {
4959 self.handle_execute_inner(stmt, params, ctx).await;
4960 }
4961 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
4962 self.sequence_plan(ctx, plan, resolved_ids).await;
4963 }
4964 }
4965 }
4966
4967 #[instrument]
4968 #[allow(clippy::unused_async)]
4970 pub(super) async fn sequence_alter_table(
4971 &mut self,
4972 session: &Session,
4973 plan: plan::AlterTablePlan,
4974 ) -> Result<ExecuteResponse, AdapterError> {
4975 let plan::AlterTablePlan {
4976 relation_id,
4977 column_name,
4978 column_type,
4979 raw_sql_type,
4980 } = plan;
4981
4982 let id_ts = self.get_catalog_write_ts().await;
4984 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
4985 let ops = vec![catalog::Op::AlterAddColumn {
4986 id: relation_id,
4987 new_global_id,
4988 name: column_name,
4989 typ: column_type,
4990 sql: raw_sql_type,
4991 }];
4992
4993 let entry = self.catalog().get_entry(&relation_id);
4994 let CatalogItem::Table(table) = &entry.item else {
4995 let err = format!("expected table, found {:?}", entry.item);
4996 return Err(AdapterError::Internal(err));
4997 };
4998 let expected_version = table.desc.latest_version();
5000 let existing_global_id = table.global_id_writes();
5001
5002 self.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
5003 let entry = coord.catalog().get_entry(&relation_id);
5004 let CatalogItem::Table(table) = &entry.item else {
5005 panic!("programming error, expected table found {:?}", entry.item);
5006 };
5007 let table = table.clone();
5008
5009 let existing_table = crate::CollectionIdBundle {
5013 storage_ids: btreeset![existing_global_id],
5014 compute_ids: BTreeMap::new(),
5015 };
5016 let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5017
5018 let new_version = table.desc.latest_version();
5019 let new_desc = table
5020 .desc
5021 .at_version(RelationVersionSelector::Specific(new_version));
5022 let register_ts = coord.get_local_write_ts().await.timestamp;
5023
5024 coord
5026 .controller
5027 .storage
5028 .alter_table_desc(
5029 existing_global_id,
5030 new_global_id,
5031 new_desc,
5032 expected_version,
5033 register_ts,
5034 )
5035 .await
5036 .expect("failed to alter desc of table");
5037
5038 let compaction_window = table
5040 .custom_logical_compaction_window
5041 .unwrap_or(CompactionWindow::Default);
5042 coord
5043 .initialize_read_policies(
5044 &crate::CollectionIdBundle {
5045 storage_ids: btreeset![new_global_id],
5046 compute_ids: BTreeMap::new(),
5047 },
5048 compaction_window,
5049 )
5050 .await;
5051 coord.apply_local_write(register_ts).await;
5052
5053 drop(existing_table_read_hold);
5055 })
5056 .await?;
5057
5058 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5059 }
5060}
5061
5062#[derive(Debug)]
5063struct CachedStatisticsOracle {
5064 cache: BTreeMap<GlobalId, usize>,
5065}
5066
5067impl CachedStatisticsOracle {
5068 pub async fn new<T: TimelyTimestamp>(
5069 ids: &BTreeSet<GlobalId>,
5070 as_of: &Antichain<T>,
5071 storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5072 ) -> Result<Self, StorageError<T>> {
5073 let mut cache = BTreeMap::new();
5074
5075 for id in ids {
5076 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5077
5078 match stats {
5079 Ok(stats) => {
5080 cache.insert(*id, stats.num_updates);
5081 }
5082 Err(StorageError::IdentifierMissing(id)) => {
5083 ::tracing::debug!("no statistics for {id}")
5084 }
5085 Err(e) => return Err(e),
5086 }
5087 }
5088
5089 Ok(Self { cache })
5090 }
5091}
5092
5093impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5094 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5095 self.cache.get(&id).map(|estimate| *estimate)
5096 }
5097
5098 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5099 self.cache.clone()
5100 }
5101}
5102
5103impl Coordinator {
5104 pub(super) async fn statistics_oracle(
5105 &self,
5106 session: &Session,
5107 source_ids: &BTreeSet<GlobalId>,
5108 query_as_of: &Antichain<Timestamp>,
5109 is_oneshot: bool,
5110 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5111 if !session.vars().enable_session_cardinality_estimates() {
5112 return Ok(Box::new(EmptyStatisticsOracle));
5113 }
5114
5115 let timeout = if is_oneshot {
5116 self.catalog()
5118 .system_config()
5119 .optimizer_oneshot_stats_timeout()
5120 } else {
5121 self.catalog().system_config().optimizer_stats_timeout()
5122 };
5123
5124 let cached_stats = mz_ore::future::timeout(
5125 timeout,
5126 CachedStatisticsOracle::new(
5127 source_ids,
5128 query_as_of,
5129 self.controller.storage_collections.as_ref(),
5130 ),
5131 )
5132 .await;
5133
5134 match cached_stats {
5135 Ok(stats) => Ok(Box::new(stats)),
5136 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5137 warn!(
5138 is_oneshot = is_oneshot,
5139 "optimizer statistics collection timed out after {}ms",
5140 timeout.as_millis()
5141 );
5142
5143 Ok(Box::new(EmptyStatisticsOracle))
5144 }
5145 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5146 }
5147 }
5148}
5149
5150pub(super) fn check_log_reads(
5159 catalog: &Catalog,
5160 cluster: &Cluster,
5161 source_ids: &BTreeSet<GlobalId>,
5162 target_replica: &mut Option<ReplicaId>,
5163 vars: &SessionVars,
5164) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5165where
5166{
5167 let log_names = source_ids
5168 .iter()
5169 .map(|gid| catalog.resolve_item_id(gid))
5170 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5171 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5172 .collect::<Vec<_>>();
5173
5174 if log_names.is_empty() {
5175 return Ok(None);
5176 }
5177
5178 let num_replicas = cluster.replicas().count();
5182 if target_replica.is_none() {
5183 if num_replicas == 1 {
5184 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5185 } else {
5186 return Err(AdapterError::UntargetedLogRead { log_names });
5187 }
5188 }
5189
5190 let replica_id = target_replica.expect("set to `Some` above");
5193 let replica = &cluster.replica(replica_id).expect("Replica must exist");
5194 if !replica.config.compute.logging.enabled() {
5195 return Err(AdapterError::IntrospectionDisabled { log_names });
5196 }
5197
5198 Ok(vars
5199 .emit_introspection_query_notice()
5200 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5201}
5202
5203impl Coordinator {
5204 fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5206 if notices.is_empty() {
5208 return;
5209 }
5210 let humanizer = self.catalog.for_session(session);
5211 let system_vars = self.catalog.system_config();
5212 for notice in notices {
5213 let kind = OptimizerNoticeKind::from(notice);
5214 let notice_enabled = match kind {
5215 OptimizerNoticeKind::IndexAlreadyExists => {
5216 system_vars.enable_notices_for_index_already_exists()
5217 }
5218 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5219 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5220 }
5221 OptimizerNoticeKind::IndexKeyEmpty => {
5222 system_vars.enable_notices_for_index_empty_key()
5223 }
5224 };
5225 if notice_enabled {
5226 session.add_notice(AdapterNotice::OptimizerNotice {
5230 notice: notice.message(&humanizer, false).to_string(),
5231 hint: notice.hint(&humanizer, false).to_string(),
5232 });
5233 }
5234 self.metrics
5235 .optimization_notices
5236 .with_label_values(&[kind.metric_label()])
5237 .inc_by(1);
5238 }
5239 }
5240
5241 async fn process_dataflow_metainfo(
5243 &mut self,
5244 df_meta: DataflowMetainfo,
5245 export_id: GlobalId,
5246 session: &Session,
5247 notice_ids: Vec<GlobalId>,
5248 ) -> Option<BuiltinTableAppendNotify> {
5249 self.emit_optimizer_notices(session, &df_meta.optimizer_notices);
5251
5252 let df_meta = self
5254 .catalog()
5255 .render_notices(df_meta, notice_ids, Some(export_id));
5256
5257 if self.catalog().state().system_config().enable_mz_notices()
5260 && !df_meta.optimizer_notices.is_empty()
5261 {
5262 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5263 self.catalog().state().pack_optimizer_notices(
5264 &mut builtin_table_updates,
5265 df_meta.optimizer_notices.iter(),
5266 Diff::ONE,
5267 );
5268
5269 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5271
5272 Some(
5273 self.builtin_table_update()
5274 .execute(builtin_table_updates)
5275 .await
5276 .0,
5277 )
5278 } else {
5279 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5281
5282 None
5283 }
5284 }
5285}