1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27 CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46 CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47 RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50 AlterSourceAddSubsourceOption, CreateSinkOption, CreateSinkOptionName, CreateSourceOptionName,
51 CreateSubsourceOption, CreateSubsourceOptionName, SqlServerConfigOption,
52 SqlServerConfigOptionName,
53};
54use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
55use mz_sql::catalog::{
56 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
57 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
58 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
59};
60use mz_sql::names::{
61 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
62 SchemaSpecifier, SystemObjectId,
63};
64use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
65use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
66use mz_storage_types::sinks::StorageSinkDesc;
67use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
68use mz_sql::plan::{
70 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
71 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
72 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
73};
74use mz_sql::session::metadata::SessionMetadata;
75use mz_sql::session::user::UserKind;
76use mz_sql::session::vars::{
77 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
78 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
79};
80use mz_sql::{plan, rbac};
81use mz_sql_parser::ast::display::AstDisplay;
82use mz_sql_parser::ast::{
83 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
84 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
85 WithOptionValue,
86};
87use mz_ssh_util::keys::SshKeyPairSet;
88use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
89use mz_storage_types::AlterCompatible;
90use mz_storage_types::connections::inline::IntoInlineConnection;
91use mz_storage_types::controller::StorageError;
92use mz_storage_types::stats::RelationPartStats;
93use mz_transform::EmptyStatisticsOracle;
94use mz_transform::dataflow::DataflowMetainfo;
95use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
96use smallvec::SmallVec;
97use timely::progress::Antichain;
98use timely::progress::Timestamp as TimelyTimestamp;
99use tokio::sync::{oneshot, watch};
100use tracing::{Instrument, Span, info, warn};
101
102use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
103use crate::command::{ExecuteResponse, Response};
104use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
105use crate::coord::{
106 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
107 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
108 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
109 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
110 validate_ip_with_policy_rules,
111};
112use crate::error::AdapterError;
113use crate::notice::{AdapterNotice, DroppedInUseIndex};
114use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
115use crate::optimize::{self, Optimize};
116use crate::session::{
117 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
118 WriteLocks, WriteOp,
119};
120use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
121use crate::{PeekResponseUnary, ReadHolds};
122
123mod cluster;
124mod copy_from;
125mod create_continual_task;
126mod create_index;
127mod create_materialized_view;
128mod create_view;
129mod explain_timestamp;
130mod peek;
131mod secret;
132mod subscribe;
133
134macro_rules! return_if_err {
137 ($expr:expr, $ctx:expr) => {
138 match $expr {
139 Ok(v) => v,
140 Err(e) => return $ctx.retire(Err(e.into())),
141 }
142 };
143}
144
145pub(super) use return_if_err;
146
147struct DropOps {
148 ops: Vec<catalog::Op>,
149 dropped_active_db: bool,
150 dropped_active_cluster: bool,
151 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
152}
153
154struct CreateSourceInner {
156 ops: Vec<catalog::Op>,
157 sources: Vec<(CatalogItemId, Source)>,
158 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
159}
160
161impl Coordinator {
162 pub(crate) async fn sequence_staged<S>(
167 &mut self,
168 mut ctx: S::Ctx,
169 parent_span: Span,
170 mut stage: S,
171 ) where
172 S: Staged + 'static,
173 S::Ctx: Send + 'static,
174 {
175 return_if_err!(stage.validity().check(self.catalog()), ctx);
176 loop {
177 let mut cancel_enabled = stage.cancel_enabled();
178 if let Some(session) = ctx.session() {
179 if cancel_enabled {
180 if let Some((_prev_tx, prev_rx)) = self
183 .staged_cancellation
184 .insert(session.conn_id().clone(), watch::channel(false))
185 {
186 let was_canceled = *prev_rx.borrow();
187 if was_canceled {
188 ctx.retire(Err(AdapterError::Canceled));
189 return;
190 }
191 }
192 } else {
193 self.staged_cancellation.remove(session.conn_id());
196 }
197 } else {
198 cancel_enabled = false
199 };
200 let next = stage
201 .stage(self, &mut ctx)
202 .instrument(parent_span.clone())
203 .await;
204 let res = return_if_err!(next, ctx);
205 stage = match res {
206 StageResult::Handle(handle) => {
207 let internal_cmd_tx = self.internal_cmd_tx.clone();
208 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
209 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
210 });
211 return;
212 }
213 StageResult::HandleRetire(handle) => {
214 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
215 ctx.retire(Ok(resp));
216 });
217 return;
218 }
219 StageResult::Response(resp) => {
220 ctx.retire(Ok(resp));
221 return;
222 }
223 StageResult::Immediate(stage) => *stage,
224 }
225 }
226 }
227
228 fn handle_spawn<C, T, F>(
229 &self,
230 ctx: C,
231 handle: JoinHandle<Result<T, AdapterError>>,
232 cancel_enabled: bool,
233 f: F,
234 ) where
235 C: StagedContext + Send + 'static,
236 T: Send + 'static,
237 F: FnOnce(C, T) + Send + 'static,
238 {
239 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
240 .session()
241 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
242 {
243 let mut rx = rx.clone();
244 Box::pin(async move {
245 let _ = rx.wait_for(|v| *v).await;
247 ()
248 })
249 } else {
250 Box::pin(future::pending())
251 };
252 spawn(|| "sequence_staged", async move {
253 tokio::select! {
254 res = handle => {
255 let next = match res {
256 Ok(next) => return_if_err!(next, ctx),
257 Err(err) => {
258 tracing::error!("sequence_staged join error {err}");
259 ctx.retire(Err(AdapterError::Internal(
260 "sequence_staged join error".into(),
261 )));
262 return;
263 }
264 };
265 f(ctx, next);
266 }
267 _ = rx, if cancel_enabled => {
268 ctx.retire(Err(AdapterError::Canceled));
269 }
270 }
271 });
272 }
273
274 async fn create_source_inner(
275 &self,
276 session: &Session,
277 plans: Vec<plan::CreateSourcePlanBundle>,
278 ) -> Result<CreateSourceInner, AdapterError> {
279 let mut ops = vec![];
280 let mut sources = vec![];
281
282 let if_not_exists_ids = plans
283 .iter()
284 .filter_map(
285 |plan::CreateSourcePlanBundle {
286 item_id,
287 global_id: _,
288 plan,
289 resolved_ids: _,
290 available_source_references: _,
291 }| {
292 if plan.if_not_exists {
293 Some((*item_id, plan.name.clone()))
294 } else {
295 None
296 }
297 },
298 )
299 .collect::<BTreeMap<_, _>>();
300
301 for plan::CreateSourcePlanBundle {
302 item_id,
303 global_id,
304 mut plan,
305 resolved_ids,
306 available_source_references,
307 } in plans
308 {
309 let name = plan.name.clone();
310
311 match plan.source.data_source {
312 plan::DataSourceDesc::Ingestion(ref desc)
313 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
314 let cluster_id = plan
315 .in_cluster
316 .expect("ingestion plans must specify cluster");
317 match desc.connection {
318 GenericSourceConnection::Postgres(_)
319 | GenericSourceConnection::MySql(_)
320 | GenericSourceConnection::SqlServer(_)
321 | GenericSourceConnection::Kafka(_)
322 | GenericSourceConnection::LoadGenerator(_) => {
323 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
324 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
325 .get(self.catalog().system_config().dyncfgs());
326
327 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
328 {
329 return Err(AdapterError::Unsupported(
330 "sources in clusters with >1 replicas",
331 ));
332 }
333 }
334 }
335 }
336 }
337 plan::DataSourceDesc::Webhook { .. } => {
338 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
339 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
340 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
341 .get(self.catalog().system_config().dyncfgs());
342
343 if !enable_multi_replica_sources {
344 if cluster.replica_ids().len() > 1 {
345 return Err(AdapterError::Unsupported(
346 "webhook sources in clusters with >1 replicas",
347 ));
348 }
349 }
350 }
351 }
352 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
353 }
354
355 if let mz_sql::plan::DataSourceDesc::Webhook {
357 validate_using: Some(validate),
358 ..
359 } = &mut plan.source.data_source
360 {
361 if let Err(reason) = validate.reduce_expression().await {
362 self.metrics
363 .webhook_validation_reduce_failures
364 .with_label_values(&[reason])
365 .inc();
366 return Err(AdapterError::Internal(format!(
367 "failed to reduce check expression, {reason}"
368 )));
369 }
370 }
371
372 let mut reference_ops = vec![];
375 if let Some(references) = &available_source_references {
376 reference_ops.push(catalog::Op::UpdateSourceReferences {
377 source_id: item_id,
378 references: references.clone().into(),
379 });
380 }
381
382 let source = Source::new(plan, global_id, resolved_ids, None, false);
383 ops.push(catalog::Op::CreateItem {
384 id: item_id,
385 name,
386 item: CatalogItem::Source(source.clone()),
387 owner_id: *session.current_role_id(),
388 });
389 sources.push((item_id, source));
390 ops.extend(reference_ops);
392 }
393
394 Ok(CreateSourceInner {
395 ops,
396 sources,
397 if_not_exists_ids,
398 })
399 }
400
401 pub(crate) fn plan_subsource(
409 &self,
410 session: &Session,
411 params: &mz_sql::plan::Params,
412 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
413 item_id: CatalogItemId,
414 global_id: GlobalId,
415 ) -> Result<CreateSourcePlanBundle, AdapterError> {
416 let catalog = self.catalog().for_session(session);
417 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
418
419 let plan = self.plan_statement(
420 session,
421 Statement::CreateSubsource(subsource_stmt),
422 params,
423 &resolved_ids,
424 )?;
425 let plan = match plan {
426 Plan::CreateSource(plan) => plan,
427 _ => unreachable!(),
428 };
429 Ok(CreateSourcePlanBundle {
430 item_id,
431 global_id,
432 plan,
433 resolved_ids,
434 available_source_references: None,
435 })
436 }
437
438 pub(crate) async fn plan_purified_alter_source_add_subsource(
440 &mut self,
441 session: &Session,
442 params: Params,
443 source_name: ResolvedItemName,
444 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
445 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
446 ) -> Result<(Plan, ResolvedIds), AdapterError> {
447 let mut subsource_plans = Vec::with_capacity(subsources.len());
448
449 let conn_catalog = self.catalog().for_system_session();
451 let pcx = plan::PlanContext::zero();
452 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
453
454 let entry = self.catalog().get_entry(source_name.item_id());
455 let source = entry.source().ok_or_else(|| {
456 AdapterError::internal(
457 "plan alter source",
458 format!("expected Source found {entry:?}"),
459 )
460 })?;
461
462 let item_id = entry.id();
463 let ingestion_id = source.global_id();
464 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
465
466 let id_ts = self.get_catalog_write_ts().await;
467 let ids = self
468 .catalog()
469 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
470 .await?;
471 for (subsource_stmt, (item_id, global_id)) in
472 subsource_stmts.into_iter().zip_eq(ids.into_iter())
473 {
474 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
475 subsource_plans.push(s);
476 }
477
478 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
479 subsources: subsource_plans,
480 options,
481 };
482
483 Ok((
484 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
485 item_id,
486 ingestion_id,
487 action,
488 }),
489 ResolvedIds::empty(),
490 ))
491 }
492
493 pub(crate) fn plan_purified_alter_source_refresh_references(
495 &self,
496 _session: &Session,
497 _params: Params,
498 source_name: ResolvedItemName,
499 available_source_references: plan::SourceReferences,
500 ) -> Result<(Plan, ResolvedIds), AdapterError> {
501 let entry = self.catalog().get_entry(source_name.item_id());
502 let source = entry.source().ok_or_else(|| {
503 AdapterError::internal(
504 "plan alter source",
505 format!("expected Source found {entry:?}"),
506 )
507 })?;
508 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
509 references: available_source_references,
510 };
511
512 Ok((
513 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
514 item_id: entry.id(),
515 ingestion_id: source.global_id(),
516 action,
517 }),
518 ResolvedIds::empty(),
519 ))
520 }
521
522 pub(crate) async fn plan_purified_create_source(
526 &mut self,
527 ctx: &ExecuteContext,
528 params: Params,
529 progress_stmt: Option<CreateSubsourceStatement<Aug>>,
530 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
531 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
532 available_source_references: plan::SourceReferences,
533 ) -> Result<(Plan, ResolvedIds), AdapterError> {
534 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
535
536 if let Some(progress_stmt) = progress_stmt {
538 assert_none!(progress_stmt.of_source);
543 let id_ts = self.get_catalog_write_ts().await;
544 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
545 let progress_plan =
546 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
547 let progress_full_name = self
548 .catalog()
549 .resolve_full_name(&progress_plan.plan.name, None);
550 let progress_subsource = ResolvedItemName::Item {
551 id: progress_plan.item_id,
552 qualifiers: progress_plan.plan.name.qualifiers.clone(),
553 full_name: progress_full_name,
554 print_id: true,
555 version: RelationVersionSelector::Latest,
556 };
557
558 create_source_plans.push(progress_plan);
559
560 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
561 }
562
563 let catalog = self.catalog().for_session(ctx.session());
564 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
565
566 let propagated_with_options: Vec<_> = source_stmt
567 .with_options
568 .iter()
569 .filter_map(|opt| match opt.name {
570 CreateSourceOptionName::TimestampInterval => None,
571 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
572 name: CreateSubsourceOptionName::RetainHistory,
573 value: opt.value.clone(),
574 }),
575 })
576 .collect();
577
578 let source_plan = match self.plan_statement(
580 ctx.session(),
581 Statement::CreateSource(source_stmt),
582 ¶ms,
583 &resolved_ids,
584 )? {
585 Plan::CreateSource(plan) => plan,
586 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
587 };
588
589 let id_ts = self.get_catalog_write_ts().await;
590 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
591
592 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
593 let of_source = ResolvedItemName::Item {
594 id: item_id,
595 qualifiers: source_plan.name.qualifiers.clone(),
596 full_name: source_full_name,
597 print_id: true,
598 version: RelationVersionSelector::Latest,
599 };
600
601 let conn_catalog = self.catalog().for_system_session();
603 let pcx = plan::PlanContext::zero();
604 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
605
606 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
607
608 for subsource_stmt in subsource_stmts.iter_mut() {
609 subsource_stmt
610 .with_options
611 .extend(propagated_with_options.iter().cloned())
612 }
613
614 create_source_plans.push(CreateSourcePlanBundle {
615 item_id,
616 global_id,
617 plan: source_plan,
618 resolved_ids: resolved_ids.clone(),
619 available_source_references: Some(available_source_references),
620 });
621
622 let id_ts = self.get_catalog_write_ts().await;
624 let ids = self
625 .catalog()
626 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
627 .await?;
628 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
629 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
630 create_source_plans.push(plan);
631 }
632
633 Ok((
634 Plan::CreateSources(create_source_plans),
635 ResolvedIds::empty(),
636 ))
637 }
638
639 #[instrument]
640 pub(super) async fn sequence_create_source(
641 &mut self,
642 ctx: &mut ExecuteContext,
643 plans: Vec<plan::CreateSourcePlanBundle>,
644 ) -> Result<ExecuteResponse, AdapterError> {
645 let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
646 let CreateSourceInner {
647 ops,
648 sources,
649 if_not_exists_ids,
650 } = self.create_source_inner(ctx.session(), plans).await?;
651
652 let transact_result = self
653 .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
654 Box::pin(async move {
655 let mut collections = Vec::with_capacity(sources.len());
667
668 for (item_id, source) in sources {
669 let source_status_item_id =
670 coord.catalog().resolve_builtin_storage_collection(
671 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
672 );
673 let source_status_collection_id = Some(
674 coord
675 .catalog()
676 .get_entry(&source_status_item_id)
677 .latest_global_id(),
678 );
679
680 let (data_source, status_collection_id) = match source.data_source {
681 DataSourceDesc::Ingestion { desc, cluster_id } => {
682 let desc = desc.into_inline_connection(coord.catalog().state());
683 let item_global_id =
684 coord.catalog().get_entry(&item_id).latest_global_id();
685
686 let ingestion =
687 IngestionDescription::new(desc, cluster_id, item_global_id);
688
689 (
690 DataSource::Ingestion(ingestion),
691 source_status_collection_id,
692 )
693 }
694 DataSourceDesc::OldSyntaxIngestion {
695 desc,
696 progress_subsource,
697 data_config,
698 details,
699 cluster_id,
700 } => {
701 let desc = desc.into_inline_connection(coord.catalog().state());
702 let data_config =
703 data_config.into_inline_connection(coord.catalog().state());
704 let progress_subsource = coord
707 .catalog()
708 .get_entry(&progress_subsource)
709 .latest_global_id();
710
711 let mut ingestion =
712 IngestionDescription::new(desc, cluster_id, progress_subsource);
713 let legacy_export = SourceExport {
714 storage_metadata: (),
715 data_config,
716 details,
717 };
718 ingestion
719 .source_exports
720 .insert(source.global_id, legacy_export);
721
722 (
723 DataSource::Ingestion(ingestion),
724 source_status_collection_id,
725 )
726 }
727 DataSourceDesc::IngestionExport {
728 ingestion_id,
729 external_reference: _,
730 details,
731 data_config,
732 } => {
733 let ingestion_id =
736 coord.catalog().get_entry(&ingestion_id).latest_global_id();
737 (
738 DataSource::IngestionExport {
739 ingestion_id,
740 details,
741 data_config: data_config
742 .into_inline_connection(coord.catalog().state()),
743 },
744 source_status_collection_id,
745 )
746 }
747 DataSourceDesc::Progress => (DataSource::Progress, None),
748 DataSourceDesc::Webhook { .. } => {
749 if let Some(url) =
750 coord.catalog().state().try_get_webhook_url(&item_id)
751 {
752 if let Some(ctx) = ctx.as_ref() {
753 ctx.session()
754 .add_notice(AdapterNotice::WebhookSourceCreated { url })
755 }
756 }
757
758 (DataSource::Webhook, None)
759 }
760 DataSourceDesc::Introspection(_) => {
761 unreachable!(
762 "cannot create sources with introspection data sources"
763 )
764 }
765 };
766
767 collections.push((
768 source.global_id,
769 CollectionDescription::<Timestamp> {
770 desc: source.desc.clone(),
771 data_source,
772 timeline: Some(source.timeline),
773 since: None,
774 status_collection_id,
775 },
776 ));
777 }
778
779 let storage_metadata = coord.catalog.state().storage_metadata();
780
781 coord
782 .controller
783 .storage
784 .create_collections(storage_metadata, None, collections)
785 .await
786 .unwrap_or_terminate("cannot fail to create collections");
787
788 let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
804 for (compaction_window, storage_policies) in read_policies {
805 coord
806 .initialize_storage_read_policies(storage_policies, compaction_window)
807 .await;
808 }
809 })
810 })
811 .await;
812
813 match transact_result {
814 Ok(()) => Ok(ExecuteResponse::CreatedSource),
815 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
816 kind:
817 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
818 })) if if_not_exists_ids.contains_key(&id) => {
819 ctx.session()
820 .add_notice(AdapterNotice::ObjectAlreadyExists {
821 name: if_not_exists_ids[&id].item.clone(),
822 ty: "source",
823 });
824 Ok(ExecuteResponse::CreatedSource)
825 }
826 Err(err) => Err(err),
827 }
828 }
829
830 #[instrument]
831 pub(super) async fn sequence_create_connection(
832 &mut self,
833 mut ctx: ExecuteContext,
834 plan: plan::CreateConnectionPlan,
835 resolved_ids: ResolvedIds,
836 ) {
837 let id_ts = self.get_catalog_write_ts().await;
838 let (connection_id, connection_gid) = match self.catalog().allocate_user_id(id_ts).await {
839 Ok(item_id) => item_id,
840 Err(err) => return ctx.retire(Err(err.into())),
841 };
842
843 match &plan.connection.details {
844 ConnectionDetails::Ssh { key_1, key_2, .. } => {
845 let key_1 = match key_1.as_key_pair() {
846 Some(key_1) => key_1.clone(),
847 None => {
848 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
849 "the PUBLIC KEY 1 option cannot be explicitly specified"
850 ))));
851 }
852 };
853
854 let key_2 = match key_2.as_key_pair() {
855 Some(key_2) => key_2.clone(),
856 None => {
857 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
858 "the PUBLIC KEY 2 option cannot be explicitly specified"
859 ))));
860 }
861 };
862
863 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
864 let secret = key_set.to_bytes();
865 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
866 return ctx.retire(Err(err.into()));
867 }
868 }
869 _ => (),
870 };
871
872 if plan.validate {
873 let internal_cmd_tx = self.internal_cmd_tx.clone();
874 let transient_revision = self.catalog().transient_revision();
875 let conn_id = ctx.session().conn_id().clone();
876 let otel_ctx = OpenTelemetryContext::obtain();
877 let role_metadata = ctx.session().role_metadata().clone();
878
879 let connection = plan
880 .connection
881 .details
882 .to_connection()
883 .into_inline_connection(self.catalog().state());
884
885 let current_storage_parameters = self.controller.storage.config().clone();
886 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
887 let result = match connection
888 .validate(connection_id, ¤t_storage_parameters)
889 .await
890 {
891 Ok(()) => Ok(plan),
892 Err(err) => Err(err.into()),
893 };
894
895 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
897 CreateConnectionValidationReady {
898 ctx,
899 result,
900 connection_id,
901 connection_gid,
902 plan_validity: PlanValidity::new(
903 transient_revision,
904 resolved_ids.items().copied().collect(),
905 None,
906 None,
907 role_metadata,
908 ),
909 otel_ctx,
910 resolved_ids: resolved_ids.clone(),
911 },
912 ));
913 if let Err(e) = result {
914 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
915 }
916 });
917 } else {
918 let result = self
919 .sequence_create_connection_stage_finish(
920 &mut ctx,
921 connection_id,
922 connection_gid,
923 plan,
924 resolved_ids,
925 )
926 .await;
927 ctx.retire(result);
928 }
929 }
930
931 #[instrument]
932 pub(crate) async fn sequence_create_connection_stage_finish(
933 &mut self,
934 ctx: &mut ExecuteContext,
935 connection_id: CatalogItemId,
936 connection_gid: GlobalId,
937 plan: plan::CreateConnectionPlan,
938 resolved_ids: ResolvedIds,
939 ) -> Result<ExecuteResponse, AdapterError> {
940 let ops = vec![catalog::Op::CreateItem {
941 id: connection_id,
942 name: plan.name.clone(),
943 item: CatalogItem::Connection(Connection {
944 create_sql: plan.connection.create_sql,
945 global_id: connection_gid,
946 details: plan.connection.details.clone(),
947 resolved_ids,
948 }),
949 owner_id: *ctx.session().current_role_id(),
950 }];
951
952 let transact_result = self
953 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
954 Box::pin(async move {
955 match plan.connection.details {
956 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
957 let spec = VpcEndpointConfig {
958 aws_service_name: privatelink.service_name.to_owned(),
959 availability_zone_ids: privatelink.availability_zones.to_owned(),
960 };
961 let cloud_resource_controller =
962 match coord.cloud_resource_controller.as_ref().cloned() {
963 Some(controller) => controller,
964 None => {
965 tracing::warn!("AWS PrivateLink connections unsupported");
966 return;
967 }
968 };
969 if let Err(err) = cloud_resource_controller
970 .ensure_vpc_endpoint(connection_id, spec)
971 .await
972 {
973 tracing::warn!(?err, "failed to ensure vpc endpoint!");
974 }
975 }
976 _ => {}
977 }
978 })
979 })
980 .await;
981
982 match transact_result {
983 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
984 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
985 kind:
986 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
987 })) if plan.if_not_exists => {
988 ctx.session()
989 .add_notice(AdapterNotice::ObjectAlreadyExists {
990 name: plan.name.item,
991 ty: "connection",
992 });
993 Ok(ExecuteResponse::CreatedConnection)
994 }
995 Err(err) => Err(err),
996 }
997 }
998
999 #[instrument]
1000 pub(super) async fn sequence_create_database(
1001 &mut self,
1002 session: &mut Session,
1003 plan: plan::CreateDatabasePlan,
1004 ) -> Result<ExecuteResponse, AdapterError> {
1005 let ops = vec![catalog::Op::CreateDatabase {
1006 name: plan.name.clone(),
1007 owner_id: *session.current_role_id(),
1008 }];
1009 match self.catalog_transact(Some(session), ops).await {
1010 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
1011 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1012 kind:
1013 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
1014 })) if plan.if_not_exists => {
1015 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
1016 Ok(ExecuteResponse::CreatedDatabase)
1017 }
1018 Err(err) => Err(err),
1019 }
1020 }
1021
1022 #[instrument]
1023 pub(super) async fn sequence_create_schema(
1024 &mut self,
1025 session: &mut Session,
1026 plan: plan::CreateSchemaPlan,
1027 ) -> Result<ExecuteResponse, AdapterError> {
1028 let op = catalog::Op::CreateSchema {
1029 database_id: plan.database_spec,
1030 schema_name: plan.schema_name.clone(),
1031 owner_id: *session.current_role_id(),
1032 };
1033 match self.catalog_transact(Some(session), vec![op]).await {
1034 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1035 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1036 kind:
1037 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1038 })) if plan.if_not_exists => {
1039 session.add_notice(AdapterNotice::SchemaAlreadyExists {
1040 name: plan.schema_name,
1041 });
1042 Ok(ExecuteResponse::CreatedSchema)
1043 }
1044 Err(err) => Err(err),
1045 }
1046 }
1047
1048 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
1050 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1051 if attributes.superuser.is_some()
1052 || attributes.password.is_some()
1053 || attributes.login.is_some()
1054 {
1055 return Err(AdapterError::UnavailableFeature {
1056 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1057 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1058 });
1059 }
1060 }
1061 Ok(())
1062 }
1063
1064 #[instrument]
1065 pub(super) async fn sequence_create_role(
1066 &mut self,
1067 conn_id: Option<&ConnectionId>,
1068 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1069 ) -> Result<ExecuteResponse, AdapterError> {
1070 self.validate_role_attributes(&attributes.clone())?;
1071 let op = catalog::Op::CreateRole { name, attributes };
1072 self.catalog_transact_conn(conn_id, vec![op])
1073 .await
1074 .map(|_| ExecuteResponse::CreatedRole)
1075 }
1076
1077 #[instrument]
1078 pub(super) async fn sequence_create_network_policy(
1079 &mut self,
1080 session: &Session,
1081 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1082 ) -> Result<ExecuteResponse, AdapterError> {
1083 let op = catalog::Op::CreateNetworkPolicy {
1084 rules,
1085 name,
1086 owner_id: *session.current_role_id(),
1087 };
1088 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1089 .await
1090 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1091 }
1092
1093 #[instrument]
1094 pub(super) async fn sequence_alter_network_policy(
1095 &mut self,
1096 session: &Session,
1097 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1098 ) -> Result<ExecuteResponse, AdapterError> {
1099 let current_network_policy_name =
1101 self.catalog().system_config().default_network_policy_name();
1102 if current_network_policy_name == name {
1104 self.validate_alter_network_policy(session, &rules)?;
1105 }
1106
1107 let op = catalog::Op::AlterNetworkPolicy {
1108 id,
1109 rules,
1110 name,
1111 owner_id: *session.current_role_id(),
1112 };
1113 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1114 .await
1115 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1116 }
1117
1118 #[instrument]
1119 pub(super) async fn sequence_create_table(
1120 &mut self,
1121 ctx: &mut ExecuteContext,
1122 plan: plan::CreateTablePlan,
1123 resolved_ids: ResolvedIds,
1124 ) -> Result<ExecuteResponse, AdapterError> {
1125 let plan::CreateTablePlan {
1126 name,
1127 table,
1128 if_not_exists,
1129 } = plan;
1130
1131 let conn_id = if table.temporary {
1132 Some(ctx.session().conn_id())
1133 } else {
1134 None
1135 };
1136 let id_ts = self.get_catalog_write_ts().await;
1137 let (table_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1138 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1139
1140 let data_source = match table.data_source {
1141 plan::TableDataSource::TableWrites { defaults } => {
1142 TableDataSource::TableWrites { defaults }
1143 }
1144 plan::TableDataSource::DataSource {
1145 desc: data_source_plan,
1146 timeline,
1147 } => match data_source_plan {
1148 plan::DataSourceDesc::IngestionExport {
1149 ingestion_id,
1150 external_reference,
1151 details,
1152 data_config,
1153 } => TableDataSource::DataSource {
1154 desc: DataSourceDesc::IngestionExport {
1155 ingestion_id,
1156 external_reference,
1157 details,
1158 data_config,
1159 },
1160 timeline,
1161 },
1162 plan::DataSourceDesc::Webhook {
1163 validate_using,
1164 body_format,
1165 headers,
1166 cluster_id,
1167 } => TableDataSource::DataSource {
1168 desc: DataSourceDesc::Webhook {
1169 validate_using,
1170 body_format,
1171 headers,
1172 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1173 },
1174 timeline,
1175 },
1176 o => {
1177 unreachable!("CREATE TABLE data source got {:?}", o)
1178 }
1179 },
1180 };
1181 let table = Table {
1182 create_sql: Some(table.create_sql),
1183 desc: table.desc,
1184 collections,
1185 conn_id: conn_id.cloned(),
1186 resolved_ids,
1187 custom_logical_compaction_window: table.compaction_window,
1188 is_retained_metrics_object: false,
1189 data_source,
1190 };
1191 let ops = vec![catalog::Op::CreateItem {
1192 id: table_id,
1193 name: name.clone(),
1194 item: CatalogItem::Table(table.clone()),
1195 owner_id: *ctx.session().current_role_id(),
1196 }];
1197
1198 let catalog_result = self
1199 .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1200 Box::pin(async move {
1201 let (collections, register_ts, read_policies) = match table.data_source {
1205 TableDataSource::TableWrites { defaults: _ } => {
1206 let register_ts = coord.get_local_write_ts().await.timestamp;
1208
1209 coord
1216 .catalog
1217 .confirm_leadership()
1218 .await
1219 .unwrap_or_terminate("unable to confirm leadership");
1220
1221 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1222 coord.set_statement_execution_timestamp(id, register_ts);
1223 }
1224
1225 let relation_version = RelationVersion::root();
1227 assert_eq!(table.desc.latest_version(), relation_version);
1228 let relation_desc = table
1229 .desc
1230 .at_version(RelationVersionSelector::Specific(relation_version));
1231 let collection_desc =
1233 CollectionDescription::for_table(relation_desc, None);
1234 let collections = vec![(global_id, collection_desc)];
1235
1236 let compaction_window = table
1237 .custom_logical_compaction_window
1238 .unwrap_or(CompactionWindow::Default);
1239 let read_policies =
1240 BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1241
1242 (collections, Some(register_ts), read_policies)
1243 }
1244 TableDataSource::DataSource {
1245 desc: data_source,
1246 timeline,
1247 } => {
1248 match data_source {
1249 DataSourceDesc::IngestionExport {
1250 ingestion_id,
1251 external_reference: _,
1252 details,
1253 data_config,
1254 } => {
1255 let source_status_item_id =
1259 coord.catalog().resolve_builtin_storage_collection(
1260 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1261 );
1262 let status_collection_id = Some(
1263 coord
1264 .catalog()
1265 .get_entry(&source_status_item_id)
1266 .latest_global_id(),
1267 );
1268 let ingestion_id =
1271 coord.catalog().get_entry(&ingestion_id).latest_global_id();
1272 let collection_desc = CollectionDescription::<Timestamp> {
1274 desc: table
1275 .desc
1276 .at_version(RelationVersionSelector::Latest),
1277 data_source: DataSource::IngestionExport {
1278 ingestion_id,
1279 details,
1280 data_config: data_config
1281 .into_inline_connection(coord.catalog.state()),
1282 },
1283 since: None,
1284 status_collection_id,
1285 timeline: Some(timeline.clone()),
1286 };
1287
1288 let collections = vec![(global_id, collection_desc)];
1289 let read_policies = coord
1290 .catalog()
1291 .state()
1292 .source_compaction_windows(vec![table_id]);
1293
1294 (collections, None, read_policies)
1295 }
1296 DataSourceDesc::Webhook { .. } => {
1297 if let Some(url) =
1298 coord.catalog().state().try_get_webhook_url(&table_id)
1299 {
1300 if let Some(ctx) = ctx.as_ref() {
1301 ctx.session().add_notice(
1302 AdapterNotice::WebhookSourceCreated { url },
1303 )
1304 }
1305 }
1306
1307 assert_eq!(
1309 table.desc.latest_version(),
1310 RelationVersion::root(),
1311 "found webhook with more than 1 relation version, {:?}",
1312 table.desc
1313 );
1314 let desc = table.desc.latest();
1315
1316 let collection_desc = CollectionDescription {
1317 desc,
1318 data_source: DataSource::Webhook,
1319 since: None,
1320 status_collection_id: None,
1321 timeline: Some(timeline.clone()),
1322 };
1323 let collections = vec![(global_id, collection_desc)];
1324 let read_policies = coord
1325 .catalog()
1326 .state()
1327 .source_compaction_windows(vec![table_id]);
1328
1329 (collections, None, read_policies)
1330 }
1331 _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1332 }
1333 }
1334 };
1335
1336 let storage_metadata = coord.catalog.state().storage_metadata();
1338 coord
1339 .controller
1340 .storage
1341 .create_collections(storage_metadata, register_ts, collections)
1342 .await
1343 .unwrap_or_terminate("cannot fail to create collections");
1344
1345 if let Some(register_ts) = register_ts {
1347 coord.apply_local_write(register_ts).await;
1348 }
1349
1350 for (compaction_window, storage_policies) in read_policies {
1352 coord
1353 .initialize_storage_read_policies(storage_policies, compaction_window)
1354 .await;
1355 }
1356 })
1357 })
1358 .await;
1359
1360 match catalog_result {
1361 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1362 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1363 kind:
1364 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1365 })) if if_not_exists => {
1366 ctx.session_mut()
1367 .add_notice(AdapterNotice::ObjectAlreadyExists {
1368 name: name.item,
1369 ty: "table",
1370 });
1371 Ok(ExecuteResponse::CreatedTable)
1372 }
1373 Err(err) => Err(err),
1374 }
1375 }
1376
1377 #[instrument]
1378 pub(super) async fn sequence_create_sink(
1379 &mut self,
1380 ctx: ExecuteContext,
1381 plan: plan::CreateSinkPlan,
1382 resolved_ids: ResolvedIds,
1383 ) {
1384 let plan::CreateSinkPlan {
1385 name,
1386 sink,
1387 with_snapshot,
1388 if_not_exists,
1389 in_cluster,
1390 } = plan;
1391
1392 let id_ts = self.get_catalog_write_ts().await;
1394 let (item_id, global_id) =
1395 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
1396
1397 let catalog_sink = Sink {
1398 create_sql: sink.create_sql,
1399 global_id,
1400 from: sink.from,
1401 connection: sink.connection,
1402 envelope: sink.envelope,
1403 version: sink.version,
1404 with_snapshot,
1405 resolved_ids,
1406 cluster_id: in_cluster,
1407 };
1408
1409 let ops = vec![catalog::Op::CreateItem {
1410 id: item_id,
1411 name: name.clone(),
1412 item: CatalogItem::Sink(catalog_sink.clone()),
1413 owner_id: *ctx.session().current_role_id(),
1414 }];
1415
1416 let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1417 if let Err(e) = self
1418 .controller
1419 .storage
1420 .check_exists(sink.from)
1421 .map_err(|e| match e {
1422 StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1423 "{} is a {}, which cannot be exported as a sink",
1424 from.name().item.clone(),
1425 from.item().typ(),
1426 )),
1427 e => AdapterError::Storage(e),
1428 })
1429 {
1430 ctx.retire(Err(e));
1431 return;
1432 }
1433
1434 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1435
1436 match result {
1437 Ok(()) => {}
1438 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1439 kind:
1440 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1441 })) if if_not_exists => {
1442 ctx.session()
1443 .add_notice(AdapterNotice::ObjectAlreadyExists {
1444 name: name.item,
1445 ty: "sink",
1446 });
1447 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1448 return;
1449 }
1450 Err(e) => {
1451 ctx.retire(Err(e));
1452 return;
1453 }
1454 };
1455
1456 self.create_storage_export(global_id, &catalog_sink)
1457 .await
1458 .unwrap_or_terminate("cannot fail to create exports");
1459
1460 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1461 .await;
1462
1463 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1464 }
1465
1466 pub(super) fn validate_system_column_references(
1489 &self,
1490 uses_ambiguous_columns: bool,
1491 depends_on: &BTreeSet<GlobalId>,
1492 ) -> Result<(), AdapterError> {
1493 if uses_ambiguous_columns
1494 && depends_on
1495 .iter()
1496 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1497 {
1498 Err(AdapterError::AmbiguousSystemColumnReference)
1499 } else {
1500 Ok(())
1501 }
1502 }
1503
1504 #[instrument]
1505 pub(super) async fn sequence_create_type(
1506 &mut self,
1507 session: &Session,
1508 plan: plan::CreateTypePlan,
1509 resolved_ids: ResolvedIds,
1510 ) -> Result<ExecuteResponse, AdapterError> {
1511 let id_ts = self.get_catalog_write_ts().await;
1512 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
1513 let typ = Type {
1514 create_sql: Some(plan.typ.create_sql),
1515 global_id,
1516 desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1517 details: CatalogTypeDetails {
1518 array_id: None,
1519 typ: plan.typ.inner,
1520 pg_metadata: None,
1521 },
1522 resolved_ids,
1523 };
1524 let op = catalog::Op::CreateItem {
1525 id: item_id,
1526 name: plan.name,
1527 item: CatalogItem::Type(typ),
1528 owner_id: *session.current_role_id(),
1529 };
1530 match self.catalog_transact(Some(session), vec![op]).await {
1531 Ok(()) => Ok(ExecuteResponse::CreatedType),
1532 Err(err) => Err(err),
1533 }
1534 }
1535
1536 #[instrument]
1537 pub(super) async fn sequence_comment_on(
1538 &mut self,
1539 session: &Session,
1540 plan: plan::CommentPlan,
1541 ) -> Result<ExecuteResponse, AdapterError> {
1542 let op = catalog::Op::Comment {
1543 object_id: plan.object_id,
1544 sub_component: plan.sub_component,
1545 comment: plan.comment,
1546 };
1547 self.catalog_transact(Some(session), vec![op]).await?;
1548 Ok(ExecuteResponse::Comment)
1549 }
1550
1551 #[instrument]
1552 pub(super) async fn sequence_drop_objects(
1553 &mut self,
1554 session: &Session,
1555 plan::DropObjectsPlan {
1556 drop_ids,
1557 object_type,
1558 referenced_ids,
1559 }: plan::DropObjectsPlan,
1560 ) -> Result<ExecuteResponse, AdapterError> {
1561 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1562 let mut objects = Vec::new();
1563 for obj_id in &drop_ids {
1564 if !referenced_ids_hashset.contains(obj_id) {
1565 let object_info = ErrorMessageObjectDescription::from_id(
1566 obj_id,
1567 &self.catalog().for_session(session),
1568 )
1569 .to_string();
1570 objects.push(object_info);
1571 }
1572 }
1573
1574 if !objects.is_empty() {
1575 session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1576 }
1577
1578 let DropOps {
1579 ops,
1580 dropped_active_db,
1581 dropped_active_cluster,
1582 dropped_in_use_indexes,
1583 } = self.sequence_drop_common(session, drop_ids)?;
1584
1585 self.catalog_transact(Some(session), ops).await?;
1586
1587 fail::fail_point!("after_sequencer_drop_replica");
1588
1589 if dropped_active_db {
1590 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1591 name: session.vars().database().to_string(),
1592 });
1593 }
1594 if dropped_active_cluster {
1595 session.add_notice(AdapterNotice::DroppedActiveCluster {
1596 name: session.vars().cluster().to_string(),
1597 });
1598 }
1599 for dropped_in_use_index in dropped_in_use_indexes {
1600 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1601 self.metrics
1602 .optimization_notices
1603 .with_label_values(&["DroppedInUseIndex"])
1604 .inc_by(1);
1605 }
1606 Ok(ExecuteResponse::DroppedObject(object_type))
1607 }
1608
1609 fn validate_dropped_role_ownership(
1610 &self,
1611 session: &Session,
1612 dropped_roles: &BTreeMap<RoleId, &str>,
1613 ) -> Result<(), AdapterError> {
1614 fn privilege_check(
1615 privileges: &PrivilegeMap,
1616 dropped_roles: &BTreeMap<RoleId, &str>,
1617 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1618 object_id: &SystemObjectId,
1619 catalog: &ConnCatalog,
1620 ) {
1621 for privilege in privileges.all_values() {
1622 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1623 let grantor_name = catalog.get_role(&privilege.grantor).name();
1624 let object_description =
1625 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1626 dependent_objects
1627 .entry(role_name.to_string())
1628 .or_default()
1629 .push(format!(
1630 "privileges on {object_description} granted by {grantor_name}",
1631 ));
1632 }
1633 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1634 let grantee_name = catalog.get_role(&privilege.grantee).name();
1635 let object_description =
1636 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1637 dependent_objects
1638 .entry(role_name.to_string())
1639 .or_default()
1640 .push(format!(
1641 "privileges granted on {object_description} to {grantee_name}"
1642 ));
1643 }
1644 }
1645 }
1646
1647 let catalog = self.catalog().for_session(session);
1648 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1649 for entry in self.catalog.entries() {
1650 let id = SystemObjectId::Object(entry.id().into());
1651 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1652 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1653 dependent_objects
1654 .entry(role_name.to_string())
1655 .or_default()
1656 .push(format!("owner of {object_description}"));
1657 }
1658 privilege_check(
1659 entry.privileges(),
1660 dropped_roles,
1661 &mut dependent_objects,
1662 &id,
1663 &catalog,
1664 );
1665 }
1666 for database in self.catalog.databases() {
1667 let database_id = SystemObjectId::Object(database.id().into());
1668 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1669 let object_description =
1670 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1671 dependent_objects
1672 .entry(role_name.to_string())
1673 .or_default()
1674 .push(format!("owner of {object_description}"));
1675 }
1676 privilege_check(
1677 &database.privileges,
1678 dropped_roles,
1679 &mut dependent_objects,
1680 &database_id,
1681 &catalog,
1682 );
1683 for schema in database.schemas_by_id.values() {
1684 let schema_id = SystemObjectId::Object(
1685 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1686 );
1687 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1688 let object_description =
1689 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1690 dependent_objects
1691 .entry(role_name.to_string())
1692 .or_default()
1693 .push(format!("owner of {object_description}"));
1694 }
1695 privilege_check(
1696 &schema.privileges,
1697 dropped_roles,
1698 &mut dependent_objects,
1699 &schema_id,
1700 &catalog,
1701 );
1702 }
1703 }
1704 for cluster in self.catalog.clusters() {
1705 let cluster_id = SystemObjectId::Object(cluster.id().into());
1706 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1707 let object_description =
1708 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1709 dependent_objects
1710 .entry(role_name.to_string())
1711 .or_default()
1712 .push(format!("owner of {object_description}"));
1713 }
1714 privilege_check(
1715 &cluster.privileges,
1716 dropped_roles,
1717 &mut dependent_objects,
1718 &cluster_id,
1719 &catalog,
1720 );
1721 for replica in cluster.replicas() {
1722 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1723 let replica_id =
1724 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1725 let object_description =
1726 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1727 dependent_objects
1728 .entry(role_name.to_string())
1729 .or_default()
1730 .push(format!("owner of {object_description}"));
1731 }
1732 }
1733 }
1734 privilege_check(
1735 self.catalog().system_privileges(),
1736 dropped_roles,
1737 &mut dependent_objects,
1738 &SystemObjectId::System,
1739 &catalog,
1740 );
1741 for (default_privilege_object, default_privilege_acl_items) in
1742 self.catalog.default_privileges()
1743 {
1744 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1745 dependent_objects
1746 .entry(role_name.to_string())
1747 .or_default()
1748 .push(format!(
1749 "default privileges on {}S created by {}",
1750 default_privilege_object.object_type, role_name
1751 ));
1752 }
1753 for default_privilege_acl_item in default_privilege_acl_items {
1754 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1755 dependent_objects
1756 .entry(role_name.to_string())
1757 .or_default()
1758 .push(format!(
1759 "default privileges on {}S granted to {}",
1760 default_privilege_object.object_type, role_name
1761 ));
1762 }
1763 }
1764 }
1765
1766 if !dependent_objects.is_empty() {
1767 Err(AdapterError::DependentObject(dependent_objects))
1768 } else {
1769 Ok(())
1770 }
1771 }
1772
1773 #[instrument]
1774 pub(super) async fn sequence_drop_owned(
1775 &mut self,
1776 session: &Session,
1777 plan: plan::DropOwnedPlan,
1778 ) -> Result<ExecuteResponse, AdapterError> {
1779 for role_id in &plan.role_ids {
1780 self.catalog().ensure_not_reserved_role(role_id)?;
1781 }
1782
1783 let mut privilege_revokes = plan.privilege_revokes;
1784
1785 let session_catalog = self.catalog().for_session(session);
1787 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1788 && !session.is_superuser()
1789 {
1790 let role_membership =
1792 session_catalog.collect_role_membership(session.current_role_id());
1793 let invalid_revokes: BTreeSet<_> = privilege_revokes
1794 .extract_if(.., |(_, privilege)| {
1795 !role_membership.contains(&privilege.grantor)
1796 })
1797 .map(|(object_id, _)| object_id)
1798 .collect();
1799 for invalid_revoke in invalid_revokes {
1800 let object_description =
1801 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1802 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1803 }
1804 }
1805
1806 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1807 catalog::Op::UpdatePrivilege {
1808 target_id: object_id,
1809 privilege,
1810 variant: UpdatePrivilegeVariant::Revoke,
1811 }
1812 });
1813 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1814 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1815 privilege_object,
1816 privilege_acl_item,
1817 variant: UpdatePrivilegeVariant::Revoke,
1818 },
1819 );
1820 let DropOps {
1821 ops: drop_ops,
1822 dropped_active_db,
1823 dropped_active_cluster,
1824 dropped_in_use_indexes,
1825 } = self.sequence_drop_common(session, plan.drop_ids)?;
1826
1827 let ops = privilege_revoke_ops
1828 .chain(default_privilege_revoke_ops)
1829 .chain(drop_ops.into_iter())
1830 .collect();
1831
1832 self.catalog_transact(Some(session), ops).await?;
1833
1834 if dropped_active_db {
1835 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1836 name: session.vars().database().to_string(),
1837 });
1838 }
1839 if dropped_active_cluster {
1840 session.add_notice(AdapterNotice::DroppedActiveCluster {
1841 name: session.vars().cluster().to_string(),
1842 });
1843 }
1844 for dropped_in_use_index in dropped_in_use_indexes {
1845 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1846 }
1847 Ok(ExecuteResponse::DroppedOwned)
1848 }
1849
1850 fn sequence_drop_common(
1851 &self,
1852 session: &Session,
1853 ids: Vec<ObjectId>,
1854 ) -> Result<DropOps, AdapterError> {
1855 let mut dropped_active_db = false;
1856 let mut dropped_active_cluster = false;
1857 let mut dropped_in_use_indexes = Vec::new();
1858 let mut dropped_roles = BTreeMap::new();
1859 let mut dropped_databases = BTreeSet::new();
1860 let mut dropped_schemas = BTreeSet::new();
1861 let mut role_revokes = BTreeSet::new();
1865 let mut default_privilege_revokes = BTreeSet::new();
1868
1869 let mut clusters_to_drop = BTreeSet::new();
1871
1872 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1873 for id in &ids {
1874 match id {
1875 ObjectId::Database(id) => {
1876 let name = self.catalog().get_database(id).name();
1877 if name == session.vars().database() {
1878 dropped_active_db = true;
1879 }
1880 dropped_databases.insert(id);
1881 }
1882 ObjectId::Schema((_, spec)) => {
1883 if let SchemaSpecifier::Id(id) = spec {
1884 dropped_schemas.insert(id);
1885 }
1886 }
1887 ObjectId::Cluster(id) => {
1888 clusters_to_drop.insert(*id);
1889 if let Some(active_id) = self
1890 .catalog()
1891 .active_cluster(session)
1892 .ok()
1893 .map(|cluster| cluster.id())
1894 {
1895 if id == &active_id {
1896 dropped_active_cluster = true;
1897 }
1898 }
1899 }
1900 ObjectId::Role(id) => {
1901 let role = self.catalog().get_role(id);
1902 let name = role.name();
1903 dropped_roles.insert(*id, name);
1904 for (group_id, grantor_id) in &role.membership.map {
1906 role_revokes.insert((*group_id, *id, *grantor_id));
1907 }
1908 }
1909 ObjectId::Item(id) => {
1910 if let Some(index) = self.catalog().get_entry(id).index() {
1911 let humanizer = self.catalog().for_session(session);
1912 let dependants = self
1913 .controller
1914 .compute
1915 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1916 .ok()
1917 .into_iter()
1918 .flatten()
1919 .filter(|dependant_id| {
1920 if dependant_id.is_transient() {
1927 return false;
1928 }
1929 let Some(dependent_id) = humanizer
1931 .try_get_item_by_global_id(dependant_id)
1932 .map(|item| item.id())
1933 else {
1934 return false;
1935 };
1936 !ids_set.contains(&ObjectId::Item(dependent_id))
1939 })
1940 .flat_map(|dependant_id| {
1941 humanizer.humanize_id(dependant_id)
1945 })
1946 .collect_vec();
1947 if !dependants.is_empty() {
1948 dropped_in_use_indexes.push(DroppedInUseIndex {
1949 index_name: humanizer
1950 .humanize_id(index.global_id())
1951 .unwrap_or_else(|| id.to_string()),
1952 dependant_objects: dependants,
1953 });
1954 }
1955 }
1956 }
1957 _ => {}
1958 }
1959 }
1960
1961 for id in &ids {
1962 match id {
1963 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1967 if !clusters_to_drop.contains(cluster_id) {
1968 let cluster = self.catalog.get_cluster(*cluster_id);
1969 if cluster.is_managed() {
1970 let replica =
1971 cluster.replica(*replica_id).expect("Catalog out of sync");
1972 if !replica.config.location.internal() {
1973 coord_bail!("cannot drop replica of managed cluster");
1974 }
1975 }
1976 }
1977 }
1978 _ => {}
1979 }
1980 }
1981
1982 for role_id in dropped_roles.keys() {
1983 self.catalog().ensure_not_reserved_role(role_id)?;
1984 }
1985 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1986 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1988 for role in self.catalog().user_roles() {
1989 for dropped_role_id in
1990 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1991 {
1992 role_revokes.insert((
1993 **dropped_role_id,
1994 role.id(),
1995 *role
1996 .membership
1997 .map
1998 .get(*dropped_role_id)
1999 .expect("included in keys above"),
2000 ));
2001 }
2002 }
2003
2004 for (default_privilege_object, default_privilege_acls) in
2005 self.catalog().default_privileges()
2006 {
2007 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
2008 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
2009 {
2010 for default_privilege_acl in default_privilege_acls {
2011 default_privilege_revokes.insert((
2012 default_privilege_object.clone(),
2013 default_privilege_acl.clone(),
2014 ));
2015 }
2016 }
2017 }
2018
2019 let ops = role_revokes
2020 .into_iter()
2021 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2022 role_id,
2023 member_id,
2024 grantor_id,
2025 })
2026 .chain(default_privilege_revokes.into_iter().map(
2027 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2028 privilege_object,
2029 privilege_acl_item,
2030 variant: UpdatePrivilegeVariant::Revoke,
2031 },
2032 ))
2033 .chain(iter::once(catalog::Op::DropObjects(
2034 ids.into_iter()
2035 .map(DropObjectInfo::manual_drop_from_object_id)
2036 .collect(),
2037 )))
2038 .collect();
2039
2040 Ok(DropOps {
2041 ops,
2042 dropped_active_db,
2043 dropped_active_cluster,
2044 dropped_in_use_indexes,
2045 })
2046 }
2047
2048 pub(super) fn sequence_explain_schema(
2049 &self,
2050 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2051 ) -> Result<ExecuteResponse, AdapterError> {
2052 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2053 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2054 })?;
2055
2056 let json_string = json_string(&json_value);
2057 let row = Row::pack_slice(&[Datum::String(&json_string)]);
2058 Ok(Self::send_immediate_rows(row))
2059 }
2060
2061 pub(super) fn sequence_show_all_variables(
2062 &self,
2063 session: &Session,
2064 ) -> Result<ExecuteResponse, AdapterError> {
2065 let mut rows = viewable_variables(self.catalog().state(), session)
2066 .map(|v| (v.name(), v.value(), v.description()))
2067 .collect::<Vec<_>>();
2068 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2069
2070 let rows: Vec<_> = rows
2072 .into_iter()
2073 .map(|(name, val, desc)| {
2074 Row::pack_slice(&[
2075 Datum::String(name),
2076 Datum::String(&val),
2077 Datum::String(desc),
2078 ])
2079 })
2080 .collect();
2081 Ok(Self::send_immediate_rows(rows))
2082 }
2083
2084 pub(super) fn sequence_show_variable(
2085 &self,
2086 session: &Session,
2087 plan: plan::ShowVariablePlan,
2088 ) -> Result<ExecuteResponse, AdapterError> {
2089 if &plan.name == SCHEMA_ALIAS {
2090 let schemas = self.catalog.resolve_search_path(session);
2091 let schema = schemas.first();
2092 return match schema {
2093 Some((database_spec, schema_spec)) => {
2094 let schema_name = &self
2095 .catalog
2096 .get_schema(database_spec, schema_spec, session.conn_id())
2097 .name()
2098 .schema;
2099 let row = Row::pack_slice(&[Datum::String(schema_name)]);
2100 Ok(Self::send_immediate_rows(row))
2101 }
2102 None => {
2103 if session.vars().current_object_missing_warnings() {
2104 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2105 search_path: session
2106 .vars()
2107 .search_path()
2108 .into_iter()
2109 .map(|schema| schema.to_string())
2110 .collect(),
2111 });
2112 }
2113 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2114 }
2115 };
2116 }
2117
2118 let variable = session
2119 .vars()
2120 .get(self.catalog().system_config(), &plan.name)
2121 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2122
2123 variable.visible(session.user(), self.catalog().system_config())?;
2126
2127 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2128 if variable.name() == vars::DATABASE.name()
2129 && matches!(
2130 self.catalog().resolve_database(&variable.value()),
2131 Err(CatalogError::UnknownDatabase(_))
2132 )
2133 && session.vars().current_object_missing_warnings()
2134 {
2135 let name = variable.value();
2136 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2137 } else if variable.name() == vars::CLUSTER.name()
2138 && matches!(
2139 self.catalog().resolve_cluster(&variable.value()),
2140 Err(CatalogError::UnknownCluster(_))
2141 )
2142 && session.vars().current_object_missing_warnings()
2143 {
2144 let name = variable.value();
2145 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2146 }
2147 Ok(Self::send_immediate_rows(row))
2148 }
2149
2150 #[instrument]
2151 pub(super) async fn sequence_inspect_shard(
2152 &self,
2153 session: &Session,
2154 plan: plan::InspectShardPlan,
2155 ) -> Result<ExecuteResponse, AdapterError> {
2156 if !session.user().is_internal() {
2159 return Err(AdapterError::Unauthorized(
2160 rbac::UnauthorizedError::MzSystem {
2161 action: "inspect".into(),
2162 },
2163 ));
2164 }
2165 let state = self
2166 .controller
2167 .storage
2168 .inspect_persist_state(plan.id)
2169 .await?;
2170 let jsonb = Jsonb::from_serde_json(state)?;
2171 Ok(Self::send_immediate_rows(jsonb.into_row()))
2172 }
2173
2174 #[instrument]
2175 pub(super) fn sequence_set_variable(
2176 &self,
2177 session: &mut Session,
2178 plan: plan::SetVariablePlan,
2179 ) -> Result<ExecuteResponse, AdapterError> {
2180 let (name, local) = (plan.name, plan.local);
2181 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2182 self.validate_set_isolation_level(session)?;
2183 }
2184 if &name == vars::CLUSTER.name() {
2185 self.validate_set_cluster(session)?;
2186 }
2187
2188 let vars = session.vars_mut();
2189 let values = match plan.value {
2190 plan::VariableValue::Default => None,
2191 plan::VariableValue::Values(values) => Some(values),
2192 };
2193
2194 match values {
2195 Some(values) => {
2196 vars.set(
2197 self.catalog().system_config(),
2198 &name,
2199 VarInput::SqlSet(&values),
2200 local,
2201 )?;
2202
2203 let vars = session.vars();
2204
2205 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2208 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2209 } else if name == vars::CLUSTER.name()
2210 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2211 {
2212 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2213 }
2214
2215 if name.as_str() == vars::DATABASE.name()
2217 && matches!(
2218 self.catalog().resolve_database(vars.database()),
2219 Err(CatalogError::UnknownDatabase(_))
2220 )
2221 && session.vars().current_object_missing_warnings()
2222 {
2223 let name = vars.database().to_string();
2224 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2225 } else if name.as_str() == vars::CLUSTER.name()
2226 && matches!(
2227 self.catalog().resolve_cluster(vars.cluster()),
2228 Err(CatalogError::UnknownCluster(_))
2229 )
2230 && session.vars().current_object_missing_warnings()
2231 {
2232 let name = vars.cluster().to_string();
2233 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2234 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2235 let v = values.into_first().to_lowercase();
2236 if v == IsolationLevel::ReadUncommitted.as_str()
2237 || v == IsolationLevel::ReadCommitted.as_str()
2238 || v == IsolationLevel::RepeatableRead.as_str()
2239 {
2240 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2241 isolation_level: v,
2242 });
2243 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2244 session.add_notice(AdapterNotice::StrongSessionSerializable);
2245 }
2246 }
2247 }
2248 None => vars.reset(self.catalog().system_config(), &name, local)?,
2249 }
2250
2251 Ok(ExecuteResponse::SetVariable { name, reset: false })
2252 }
2253
2254 pub(super) fn sequence_reset_variable(
2255 &self,
2256 session: &mut Session,
2257 plan: plan::ResetVariablePlan,
2258 ) -> Result<ExecuteResponse, AdapterError> {
2259 let name = plan.name;
2260 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2261 self.validate_set_isolation_level(session)?;
2262 }
2263 if &name == vars::CLUSTER.name() {
2264 self.validate_set_cluster(session)?;
2265 }
2266 session
2267 .vars_mut()
2268 .reset(self.catalog().system_config(), &name, false)?;
2269 Ok(ExecuteResponse::SetVariable { name, reset: true })
2270 }
2271
2272 pub(super) fn sequence_set_transaction(
2273 &self,
2274 session: &mut Session,
2275 plan: plan::SetTransactionPlan,
2276 ) -> Result<ExecuteResponse, AdapterError> {
2277 for mode in plan.modes {
2279 match mode {
2280 TransactionMode::AccessMode(_) => {
2281 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2282 }
2283 TransactionMode::IsolationLevel(isolation_level) => {
2284 self.validate_set_isolation_level(session)?;
2285
2286 session.vars_mut().set(
2287 self.catalog().system_config(),
2288 TRANSACTION_ISOLATION_VAR_NAME,
2289 VarInput::Flat(&isolation_level.to_ast_string_stable()),
2290 plan.local,
2291 )?
2292 }
2293 }
2294 }
2295 Ok(ExecuteResponse::SetVariable {
2296 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2297 reset: false,
2298 })
2299 }
2300
2301 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2302 if session.transaction().contains_ops() {
2303 Err(AdapterError::InvalidSetIsolationLevel)
2304 } else {
2305 Ok(())
2306 }
2307 }
2308
2309 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2310 if session.transaction().contains_ops() {
2311 Err(AdapterError::InvalidSetCluster)
2312 } else {
2313 Ok(())
2314 }
2315 }
2316
2317 #[instrument]
2318 pub(super) async fn sequence_end_transaction(
2319 &mut self,
2320 mut ctx: ExecuteContext,
2321 mut action: EndTransactionAction,
2322 ) {
2323 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2325 (&action, ctx.session().transaction())
2326 {
2327 action = EndTransactionAction::Rollback;
2328 }
2329 let response = match action {
2330 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2331 params: BTreeMap::new(),
2332 }),
2333 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2334 params: BTreeMap::new(),
2335 }),
2336 };
2337
2338 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2339
2340 let (response, action) = match result {
2341 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2342 (response, action)
2343 }
2344 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2345 let validated_locks = match write_lock_guards {
2349 None => None,
2350 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2351 Ok(locks) => Some(locks),
2352 Err(missing) => {
2353 tracing::error!(?missing, "programming error, missing write locks");
2354 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2355 }
2356 },
2357 };
2358
2359 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2360 for WriteOp { id, rows } in writes {
2361 let total_rows = collected_writes.entry(id).or_default();
2362 total_rows.push(rows);
2363 }
2364
2365 self.submit_write(PendingWriteTxn::User {
2366 span: Span::current(),
2367 writes: collected_writes,
2368 write_locks: validated_locks,
2369 pending_txn: PendingTxn {
2370 ctx,
2371 response,
2372 action,
2373 },
2374 });
2375 return;
2376 }
2377 Ok((
2378 Some(TransactionOps::Peeks {
2379 determination,
2380 requires_linearization: RequireLinearization::Required,
2381 ..
2382 }),
2383 _,
2384 )) if ctx.session().vars().transaction_isolation()
2385 == &IsolationLevel::StrictSerializable =>
2386 {
2387 let conn_id = ctx.session().conn_id().clone();
2388 let pending_read_txn = PendingReadTxn {
2389 txn: PendingRead::Read {
2390 txn: PendingTxn {
2391 ctx,
2392 response,
2393 action,
2394 },
2395 },
2396 timestamp_context: determination.timestamp_context,
2397 created: Instant::now(),
2398 num_requeues: 0,
2399 otel_ctx: OpenTelemetryContext::obtain(),
2400 };
2401 self.strict_serializable_reads_tx
2402 .send((conn_id, pending_read_txn))
2403 .expect("sending to strict_serializable_reads_tx cannot fail");
2404 return;
2405 }
2406 Ok((
2407 Some(TransactionOps::Peeks {
2408 determination,
2409 requires_linearization: RequireLinearization::Required,
2410 ..
2411 }),
2412 _,
2413 )) if ctx.session().vars().transaction_isolation()
2414 == &IsolationLevel::StrongSessionSerializable =>
2415 {
2416 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2417 ctx.session_mut()
2418 .ensure_timestamp_oracle(timeline.clone())
2419 .apply_write(*ts);
2420 }
2421 (response, action)
2422 }
2423 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2424 self.internal_cmd_tx
2425 .send(Message::ExecuteSingleStatementTransaction {
2426 ctx,
2427 otel_ctx: OpenTelemetryContext::obtain(),
2428 stmt,
2429 params,
2430 })
2431 .expect("must send");
2432 return;
2433 }
2434 Ok((_, _)) => (response, action),
2435 Err(err) => (Err(err), EndTransactionAction::Rollback),
2436 };
2437 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2438 let response = response.map(|mut r| {
2440 r.extend_params(changed);
2441 ExecuteResponse::from(r)
2442 });
2443
2444 ctx.retire(response);
2445 }
2446
2447 #[instrument]
2448 async fn sequence_end_transaction_inner(
2449 &mut self,
2450 ctx: &mut ExecuteContext,
2451 action: EndTransactionAction,
2452 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2453 let txn = self.clear_transaction(ctx.session_mut()).await;
2454
2455 if let EndTransactionAction::Commit = action {
2456 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2457 match &mut ops {
2458 TransactionOps::Writes(writes) => {
2459 for WriteOp { id, .. } in &mut writes.iter() {
2460 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2462 AdapterError::Catalog(mz_catalog::memory::error::Error {
2463 kind: mz_catalog::memory::error::ErrorKind::Sql(
2464 CatalogError::UnknownItem(id.to_string()),
2465 ),
2466 })
2467 })?;
2468 }
2469
2470 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2472 }
2473 TransactionOps::DDL {
2474 ops,
2475 state: _,
2476 side_effects,
2477 revision,
2478 } => {
2479 if *revision != self.catalog().transient_revision() {
2481 return Err(AdapterError::DDLTransactionRace);
2482 }
2483 let ops = std::mem::take(ops);
2485 let side_effects = std::mem::take(side_effects);
2486 self.catalog_transact_with_side_effects(
2487 Some(ctx),
2488 ops,
2489 move |a, mut ctx| {
2490 Box::pin(async move {
2491 for side_effect in side_effects {
2492 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2493 }
2494 })
2495 },
2496 )
2497 .await?;
2498 }
2499 _ => (),
2500 }
2501 return Ok((Some(ops), write_lock_guards));
2502 }
2503 }
2504
2505 Ok((None, None))
2506 }
2507
2508 pub(super) async fn sequence_side_effecting_func(
2509 &mut self,
2510 ctx: ExecuteContext,
2511 plan: SideEffectingFunc,
2512 ) {
2513 match plan {
2514 SideEffectingFunc::PgCancelBackend { connection_id } => {
2515 if ctx.session().conn_id().unhandled() == connection_id {
2516 ctx.retire(Err(AdapterError::Canceled));
2520 return;
2521 }
2522
2523 let res = if let Some((id_handle, _conn_meta)) =
2524 self.active_conns.get_key_value(&connection_id)
2525 {
2526 self.handle_privileged_cancel(id_handle.clone()).await;
2528 Datum::True
2529 } else {
2530 Datum::False
2531 };
2532 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2533 }
2534 }
2535 }
2536
2537 pub(super) async fn determine_real_time_recent_timestamp(
2540 &self,
2541 session: &Session,
2542 source_ids: impl Iterator<Item = CatalogItemId>,
2543 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2544 {
2545 let vars = session.vars();
2546
2547 let r = if vars.real_time_recency()
2552 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2553 && !session.contains_read_timestamp()
2554 {
2555 let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2561 if to_visit.is_empty() {
2564 return Ok(None);
2565 }
2566
2567 let mut timestamp_objects = BTreeSet::new();
2568
2569 while let Some(id) = to_visit.pop_front() {
2570 timestamp_objects.insert(id);
2571 to_visit.extend(
2572 self.catalog()
2573 .get_entry(&id)
2574 .uses()
2575 .into_iter()
2576 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2577 );
2578 }
2579 let timestamp_objects = timestamp_objects
2580 .into_iter()
2581 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2582 .collect();
2583
2584 let r = self
2585 .controller
2586 .determine_real_time_recent_timestamp(
2587 timestamp_objects,
2588 *vars.real_time_recency_timeout(),
2589 )
2590 .await?;
2591
2592 Some(r)
2593 } else {
2594 None
2595 };
2596
2597 Ok(r)
2598 }
2599
2600 #[instrument]
2601 pub(super) async fn sequence_explain_plan(
2602 &mut self,
2603 ctx: ExecuteContext,
2604 plan: plan::ExplainPlanPlan,
2605 target_cluster: TargetCluster,
2606 ) {
2607 match &plan.explainee {
2608 plan::Explainee::Statement(stmt) => match stmt {
2609 plan::ExplaineeStatement::CreateView { .. } => {
2610 self.explain_create_view(ctx, plan).await;
2611 }
2612 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2613 self.explain_create_materialized_view(ctx, plan).await;
2614 }
2615 plan::ExplaineeStatement::CreateIndex { .. } => {
2616 self.explain_create_index(ctx, plan).await;
2617 }
2618 plan::ExplaineeStatement::Select { .. } => {
2619 self.explain_peek(ctx, plan, target_cluster).await;
2620 }
2621 },
2622 plan::Explainee::View(_) => {
2623 let result = self.explain_view(&ctx, plan);
2624 ctx.retire(result);
2625 }
2626 plan::Explainee::MaterializedView(_) => {
2627 let result = self.explain_materialized_view(&ctx, plan);
2628 ctx.retire(result);
2629 }
2630 plan::Explainee::Index(_) => {
2631 let result = self.explain_index(&ctx, plan);
2632 ctx.retire(result);
2633 }
2634 plan::Explainee::ReplanView(_) => {
2635 self.explain_replan_view(ctx, plan).await;
2636 }
2637 plan::Explainee::ReplanMaterializedView(_) => {
2638 self.explain_replan_materialized_view(ctx, plan).await;
2639 }
2640 plan::Explainee::ReplanIndex(_) => {
2641 self.explain_replan_index(ctx, plan).await;
2642 }
2643 };
2644 }
2645
2646 pub(super) async fn sequence_explain_pushdown(
2647 &mut self,
2648 ctx: ExecuteContext,
2649 plan: plan::ExplainPushdownPlan,
2650 target_cluster: TargetCluster,
2651 ) {
2652 match plan.explainee {
2653 Explainee::Statement(ExplaineeStatement::Select {
2654 broken: false,
2655 plan,
2656 desc: _,
2657 }) => {
2658 let stage = return_if_err!(
2659 self.peek_validate(
2660 ctx.session(),
2661 plan,
2662 target_cluster,
2663 None,
2664 ExplainContext::Pushdown,
2665 Some(ctx.session().vars().max_query_result_size()),
2666 ),
2667 ctx
2668 );
2669 self.sequence_staged(ctx, Span::current(), stage).await;
2670 }
2671 Explainee::MaterializedView(item_id) => {
2672 self.explain_pushdown_materialized_view(ctx, item_id).await;
2673 }
2674 _ => {
2675 ctx.retire(Err(AdapterError::Unsupported(
2676 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2677 )));
2678 }
2679 };
2680 }
2681
2682 async fn render_explain_pushdown(
2683 &self,
2684 ctx: ExecuteContext,
2685 as_of: Antichain<Timestamp>,
2686 mz_now: ResultSpec<'static>,
2687 read_holds: Option<ReadHolds<Timestamp>>,
2688 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2689 ) {
2690 let fut = self
2691 .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2692 .await;
2693 task::spawn(|| "render explain pushdown", async move {
2694 let _read_holds = read_holds;
2696 let res = fut.await;
2697 ctx.retire(res);
2698 });
2699 }
2700
2701 async fn render_explain_pushdown_prepare<
2702 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2703 >(
2704 &self,
2705 session: &Session,
2706 as_of: Antichain<Timestamp>,
2707 mz_now: ResultSpec<'static>,
2708 imports: I,
2709 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2710 let explain_timeout = *session.vars().statement_timeout();
2711 let mut futures = FuturesOrdered::new();
2712 for (id, mfp) in imports {
2713 let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2714 let full_name = self
2715 .catalog
2716 .for_session(session)
2717 .resolve_full_name(&catalog_entry.name);
2718 let name = format!("{}", full_name);
2719 let relation_desc = catalog_entry
2720 .desc_opt()
2721 .expect("source should have a proper desc")
2722 .into_owned();
2723 let stats_future = self
2724 .controller
2725 .storage_collections
2726 .snapshot_parts_stats(id, as_of.clone())
2727 .await;
2728
2729 let mz_now = mz_now.clone();
2730 futures.push_back(async move {
2733 let snapshot_stats = match stats_future.await {
2734 Ok(stats) => stats,
2735 Err(e) => return Err(e),
2736 };
2737 let mut total_bytes = 0;
2738 let mut total_parts = 0;
2739 let mut selected_bytes = 0;
2740 let mut selected_parts = 0;
2741 for SnapshotPartStats {
2742 encoded_size_bytes: bytes,
2743 stats,
2744 } in &snapshot_stats.parts
2745 {
2746 let bytes = u64::cast_from(*bytes);
2747 total_bytes += bytes;
2748 total_parts += 1u64;
2749 let selected = match stats {
2750 None => true,
2751 Some(stats) => {
2752 let stats = stats.decode();
2753 let stats = RelationPartStats::new(
2754 name.as_str(),
2755 &snapshot_stats.metrics.pushdown.part_stats,
2756 &relation_desc,
2757 &stats,
2758 );
2759 stats.may_match_mfp(mz_now.clone(), &mfp)
2760 }
2761 };
2762
2763 if selected {
2764 selected_bytes += bytes;
2765 selected_parts += 1u64;
2766 }
2767 }
2768 Ok(Row::pack_slice(&[
2769 name.as_str().into(),
2770 total_bytes.into(),
2771 selected_bytes.into(),
2772 total_parts.into(),
2773 selected_parts.into(),
2774 ]))
2775 });
2776 }
2777
2778 let fut = async move {
2779 match tokio::time::timeout(
2780 explain_timeout,
2781 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2782 )
2783 .await
2784 {
2785 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2786 rows: Box::new(rows.into_row_iter()),
2787 }),
2788 Ok(Err(err)) => Err(err.into()),
2789 Err(_) => Err(AdapterError::StatementTimeout),
2790 }
2791 };
2792 fut
2793 }
2794
2795 #[instrument]
2796 pub(super) async fn sequence_insert(
2797 &mut self,
2798 mut ctx: ExecuteContext,
2799 plan: plan::InsertPlan,
2800 ) {
2801 if !ctx.session_mut().transaction().allows_writes() {
2809 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2810 return;
2811 }
2812
2813 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2827 let expr = return_if_err!(
2830 plan.values
2831 .clone()
2832 .lower(self.catalog().system_config(), None),
2833 ctx
2834 );
2835 OptimizedMirRelationExpr(expr)
2836 } else {
2837 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2839
2840 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2842
2843 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2845 };
2846
2847 match optimized_mir.into_inner() {
2848 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2849 let catalog = self.owned_catalog();
2850 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2851 let result =
2852 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2853 ctx.retire(result);
2854 });
2855 }
2856 _ => {
2858 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2859 Some(table) => {
2860 let fullname = self
2861 .catalog()
2862 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2863 table
2865 .desc_latest(&fullname)
2866 .expect("desc called on table")
2867 .arity()
2868 }
2869 None => {
2870 ctx.retire(Err(AdapterError::Catalog(
2871 mz_catalog::memory::error::Error {
2872 kind: mz_catalog::memory::error::ErrorKind::Sql(
2873 CatalogError::UnknownItem(plan.id.to_string()),
2874 ),
2875 },
2876 )));
2877 return;
2878 }
2879 };
2880
2881 let finishing = RowSetFinishing {
2882 order_by: vec![],
2883 limit: None,
2884 offset: 0,
2885 project: (0..desc_arity).collect(),
2886 };
2887
2888 let read_then_write_plan = plan::ReadThenWritePlan {
2889 id: plan.id,
2890 selection: plan.values,
2891 finishing,
2892 assignments: BTreeMap::new(),
2893 kind: MutationKind::Insert,
2894 returning: plan.returning,
2895 };
2896
2897 self.sequence_read_then_write(ctx, read_then_write_plan)
2898 .await;
2899 }
2900 }
2901 }
2902
2903 #[instrument]
2908 pub(super) async fn sequence_read_then_write(
2909 &mut self,
2910 mut ctx: ExecuteContext,
2911 plan: plan::ReadThenWritePlan,
2912 ) {
2913 let mut source_ids: BTreeSet<_> = plan
2914 .selection
2915 .depends_on()
2916 .into_iter()
2917 .map(|gid| self.catalog().resolve_item_id(&gid))
2918 .collect();
2919 source_ids.insert(plan.id);
2920
2921 if ctx.session().transaction().write_locks().is_none() {
2923 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2925
2926 for id in &source_ids {
2928 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2929 write_locks.insert_lock(*id, lock);
2930 }
2931 }
2932
2933 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2935 Ok(locks) => locks,
2936 Err(missing) => {
2937 let role_metadata = ctx.session().role_metadata().clone();
2939 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2940 let plan = DeferredPlan {
2941 ctx,
2942 plan: Plan::ReadThenWrite(plan),
2943 validity: PlanValidity::new(
2944 self.catalog.transient_revision(),
2945 source_ids.clone(),
2946 None,
2947 None,
2948 role_metadata,
2949 ),
2950 requires_locks: source_ids,
2951 };
2952 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2953 }
2954 };
2955
2956 ctx.session_mut()
2957 .try_grant_write_locks(write_locks)
2958 .expect("session has already been granted write locks");
2959 }
2960
2961 let plan::ReadThenWritePlan {
2962 id,
2963 kind,
2964 selection,
2965 mut assignments,
2966 finishing,
2967 mut returning,
2968 } = plan;
2969
2970 let desc = match self.catalog().try_get_entry(&id) {
2972 Some(table) => {
2973 let full_name = self
2974 .catalog()
2975 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2976 table
2978 .desc_latest(&full_name)
2979 .expect("desc called on table")
2980 .into_owned()
2981 }
2982 None => {
2983 ctx.retire(Err(AdapterError::Catalog(
2984 mz_catalog::memory::error::Error {
2985 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2986 id.to_string(),
2987 )),
2988 },
2989 )));
2990 return;
2991 }
2992 };
2993
2994 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2996 || assignments.values().any(|e| e.contains_temporal())
2997 || returning.iter().any(|e| e.contains_temporal());
2998 if contains_temporal {
2999 ctx.retire(Err(AdapterError::Unsupported(
3000 "calls to mz_now in write statements",
3001 )));
3002 return;
3003 }
3004
3005 fn validate_read_dependencies(
3013 catalog: &Catalog,
3014 id: &CatalogItemId,
3015 ) -> Result<(), AdapterError> {
3016 use CatalogItemType::*;
3017 use mz_catalog::memory::objects;
3018 let mut ids_to_check = Vec::new();
3019 let valid = match catalog.try_get_entry(id) {
3020 Some(entry) => {
3021 if let CatalogItem::View(objects::View { optimized_expr, .. })
3022 | CatalogItem::MaterializedView(objects::MaterializedView {
3023 optimized_expr,
3024 ..
3025 }) = entry.item()
3026 {
3027 if optimized_expr.contains_temporal() {
3028 return Err(AdapterError::Unsupported(
3029 "calls to mz_now in write statements",
3030 ));
3031 }
3032 }
3033 match entry.item().typ() {
3034 typ @ (Func | View | MaterializedView | ContinualTask) => {
3035 ids_to_check.extend(entry.uses());
3036 let valid_id = id.is_user() || matches!(typ, Func);
3037 valid_id
3038 }
3039 Source | Secret | Connection => false,
3040 Sink | Index => unreachable!(),
3042 Table => {
3043 if !id.is_user() {
3044 false
3046 } else {
3047 entry.source_export_details().is_none()
3049 }
3050 }
3051 Type => true,
3052 }
3053 }
3054 None => false,
3055 };
3056 if !valid {
3057 return Err(AdapterError::InvalidTableMutationSelection);
3058 }
3059 for id in ids_to_check {
3060 validate_read_dependencies(catalog, &id)?;
3061 }
3062 Ok(())
3063 }
3064
3065 for gid in selection.depends_on() {
3066 let item_id = self.catalog().resolve_item_id(&gid);
3067 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3068 ctx.retire(Err(err));
3069 return;
3070 }
3071 }
3072
3073 let (peek_tx, peek_rx) = oneshot::channel();
3074 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3075 let (tx, _, session, extra) = ctx.into_parts();
3076 let peek_ctx = ExecuteContext::from_parts(
3088 peek_client_tx,
3089 self.internal_cmd_tx.clone(),
3090 session,
3091 Default::default(),
3092 );
3093
3094 self.sequence_peek(
3095 peek_ctx,
3096 plan::SelectPlan {
3097 select: None,
3098 source: selection,
3099 when: QueryWhen::FreshestTableWrite,
3100 finishing,
3101 copy_to: None,
3102 },
3103 TargetCluster::Active,
3104 None,
3105 )
3106 .await;
3107
3108 let internal_cmd_tx = self.internal_cmd_tx.clone();
3109 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3110 let catalog = self.owned_catalog();
3111 let max_result_size = self.catalog().system_config().max_result_size();
3112
3113 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3114 let (peek_response, session) = match peek_rx.await {
3115 Ok(Response {
3116 result: Ok(resp),
3117 session,
3118 otel_ctx,
3119 }) => {
3120 otel_ctx.attach_as_parent();
3121 (resp, session)
3122 }
3123 Ok(Response {
3124 result: Err(e),
3125 session,
3126 otel_ctx,
3127 }) => {
3128 let ctx =
3129 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3130 otel_ctx.attach_as_parent();
3131 ctx.retire(Err(e));
3132 return;
3133 }
3134 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3136 };
3137 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3138 let mut timeout_dur = *ctx.session().vars().statement_timeout();
3139
3140 if timeout_dur == Duration::ZERO {
3142 timeout_dur = Duration::MAX;
3143 }
3144
3145 let style = ExprPrepStyle::OneShot {
3146 logical_time: EvalTime::NotAvailable, session: ctx.session(),
3148 catalog_state: catalog.state(),
3149 };
3150 for expr in assignments.values_mut().chain(returning.iter_mut()) {
3151 return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3152 }
3153
3154 let make_diffs =
3155 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3156 let arena = RowArena::new();
3157 let mut diffs = Vec::new();
3158 let mut datum_vec = mz_repr::DatumVec::new();
3159
3160 while let Some(row) = rows.next() {
3161 if !assignments.is_empty() {
3162 assert!(
3163 matches!(kind, MutationKind::Update),
3164 "only updates support assignments"
3165 );
3166 let mut datums = datum_vec.borrow_with(row);
3167 let mut updates = vec![];
3168 for (idx, expr) in &assignments {
3169 let updated = match expr.eval(&datums, &arena) {
3170 Ok(updated) => updated,
3171 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3172 };
3173 updates.push((*idx, updated));
3174 }
3175 for (idx, new_value) in updates {
3176 datums[idx] = new_value;
3177 }
3178 let updated = Row::pack_slice(&datums);
3179 diffs.push((updated, Diff::ONE));
3180 }
3181 match kind {
3182 MutationKind::Update | MutationKind::Delete => {
3186 diffs.push((row.to_owned(), Diff::MINUS_ONE))
3187 }
3188 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3189 }
3190 }
3191
3192 let mut byte_size: u64 = 0;
3195 for (row, diff) in &diffs {
3196 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3197 if diff.is_positive() {
3198 for (idx, datum) in row.iter().enumerate() {
3199 desc.constraints_met(idx, &datum)?;
3200 }
3201 }
3202 }
3203 Ok((diffs, byte_size))
3204 };
3205
3206 let diffs = match peek_response {
3207 ExecuteResponse::SendingRowsStreaming {
3208 rows: mut rows_stream,
3209 ..
3210 } => {
3211 let mut byte_size: u64 = 0;
3212 let mut diffs = Vec::new();
3213 let result = loop {
3214 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3215 Ok(Some(res)) => match res {
3216 PeekResponseUnary::Rows(new_rows) => {
3217 match make_diffs(new_rows) {
3218 Ok((mut new_diffs, new_byte_size)) => {
3219 byte_size = byte_size.saturating_add(new_byte_size);
3220 if byte_size > max_result_size {
3221 break Err(AdapterError::ResultSize(format!(
3222 "result exceeds max size of {max_result_size}"
3223 )));
3224 }
3225 diffs.append(&mut new_diffs)
3226 }
3227 Err(e) => break Err(e),
3228 };
3229 }
3230 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3231 PeekResponseUnary::Error(e) => {
3232 break Err(AdapterError::Unstructured(anyhow!(e)));
3233 }
3234 },
3235 Ok(None) => break Ok(diffs),
3236 Err(_) => {
3237 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3242 conn_id: ctx.session().conn_id().clone(),
3243 });
3244 if let Err(e) = result {
3245 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3246 }
3247 break Err(AdapterError::StatementTimeout);
3248 }
3249 }
3250 };
3251
3252 result
3253 }
3254 ExecuteResponse::SendingRowsImmediate { rows } => {
3255 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3256 }
3257 resp => Err(AdapterError::Unstructured(anyhow!(
3258 "unexpected peek response: {resp:?}"
3259 ))),
3260 };
3261
3262 let mut returning_rows = Vec::new();
3263 let mut diff_err: Option<AdapterError> = None;
3264 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3265 let arena = RowArena::new();
3266 for (row, diff) in diffs {
3267 if !diff.is_positive() {
3268 continue;
3269 }
3270 let mut returning_row = Row::with_capacity(returning.len());
3271 let mut packer = returning_row.packer();
3272 for expr in &returning {
3273 let datums: Vec<_> = row.iter().collect();
3274 match expr.eval(&datums, &arena) {
3275 Ok(datum) => {
3276 packer.push(datum);
3277 }
3278 Err(err) => {
3279 diff_err = Some(err.into());
3280 break;
3281 }
3282 }
3283 }
3284 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3285 let diff = match NonZeroUsize::try_from(diff) {
3286 Ok(diff) => diff,
3287 Err(err) => {
3288 diff_err = Some(err.into());
3289 break;
3290 }
3291 };
3292 returning_rows.push((returning_row, diff));
3293 if diff_err.is_some() {
3294 break;
3295 }
3296 }
3297 }
3298 let diffs = if let Some(err) = diff_err {
3299 Err(err)
3300 } else {
3301 diffs
3302 };
3303
3304 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3307 if let Some(timestamp_context) = timestamp_context {
3316 let (tx, rx) = tokio::sync::oneshot::channel();
3317 let conn_id = ctx.session().conn_id().clone();
3318 let pending_read_txn = PendingReadTxn {
3319 txn: PendingRead::ReadThenWrite { ctx, tx },
3320 timestamp_context,
3321 created: Instant::now(),
3322 num_requeues: 0,
3323 otel_ctx: OpenTelemetryContext::obtain(),
3324 };
3325 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3326 if let Err(e) = result {
3328 warn!(
3329 "strict_serializable_reads_tx dropped before we could send: {:?}",
3330 e
3331 );
3332 return;
3333 }
3334 let result = rx.await;
3335 ctx = match result {
3337 Ok(Some(ctx)) => ctx,
3338 Ok(None) => {
3339 return;
3342 }
3343 Err(e) => {
3344 warn!(
3345 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3346 e
3347 );
3348 return;
3349 }
3350 };
3351 }
3352
3353 match diffs {
3354 Ok(diffs) => {
3355 let result = Self::send_diffs(
3356 ctx.session_mut(),
3357 plan::SendDiffsPlan {
3358 id,
3359 updates: diffs,
3360 kind,
3361 returning: returning_rows,
3362 max_result_size,
3363 },
3364 );
3365 ctx.retire(result);
3366 }
3367 Err(e) => {
3368 ctx.retire(Err(e));
3369 }
3370 }
3371 });
3372 }
3373
3374 #[instrument]
3375 pub(super) async fn sequence_alter_item_rename(
3376 &mut self,
3377 ctx: &mut ExecuteContext,
3378 plan: plan::AlterItemRenamePlan,
3379 ) -> Result<ExecuteResponse, AdapterError> {
3380 let op = catalog::Op::RenameItem {
3381 id: plan.id,
3382 current_full_name: plan.current_full_name,
3383 to_name: plan.to_name,
3384 };
3385 match self
3386 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3387 .await
3388 {
3389 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3390 Err(err) => Err(err),
3391 }
3392 }
3393
3394 #[instrument]
3395 pub(super) async fn sequence_alter_retain_history(
3396 &mut self,
3397 ctx: &mut ExecuteContext,
3398 plan: plan::AlterRetainHistoryPlan,
3399 ) -> Result<ExecuteResponse, AdapterError> {
3400 let ops = vec![catalog::Op::AlterRetainHistory {
3401 id: plan.id,
3402 value: plan.value,
3403 window: plan.window,
3404 }];
3405 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3406 Box::pin(async move {
3407 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3408 let cluster = match catalog_item {
3409 CatalogItem::Table(_)
3410 | CatalogItem::MaterializedView(_)
3411 | CatalogItem::Source(_)
3412 | CatalogItem::ContinualTask(_) => None,
3413 CatalogItem::Index(index) => Some(index.cluster_id),
3414 CatalogItem::Log(_)
3415 | CatalogItem::View(_)
3416 | CatalogItem::Sink(_)
3417 | CatalogItem::Type(_)
3418 | CatalogItem::Func(_)
3419 | CatalogItem::Secret(_)
3420 | CatalogItem::Connection(_) => unreachable!(),
3421 };
3422 match cluster {
3423 Some(cluster) => {
3424 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3425 }
3426 None => {
3427 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3428 }
3429 }
3430 })
3431 })
3432 .await?;
3433 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3434 }
3435
3436 #[instrument]
3437 pub(super) async fn sequence_alter_schema_rename(
3438 &mut self,
3439 ctx: &mut ExecuteContext,
3440 plan: plan::AlterSchemaRenamePlan,
3441 ) -> Result<ExecuteResponse, AdapterError> {
3442 let (database_spec, schema_spec) = plan.cur_schema_spec;
3443 let op = catalog::Op::RenameSchema {
3444 database_spec,
3445 schema_spec,
3446 new_name: plan.new_schema_name,
3447 check_reserved_names: true,
3448 };
3449 match self
3450 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3451 .await
3452 {
3453 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3454 Err(err) => Err(err),
3455 }
3456 }
3457
3458 #[instrument]
3459 pub(super) async fn sequence_alter_schema_swap(
3460 &mut self,
3461 ctx: &mut ExecuteContext,
3462 plan: plan::AlterSchemaSwapPlan,
3463 ) -> Result<ExecuteResponse, AdapterError> {
3464 let plan::AlterSchemaSwapPlan {
3465 schema_a_spec: (schema_a_db, schema_a),
3466 schema_a_name,
3467 schema_b_spec: (schema_b_db, schema_b),
3468 schema_b_name,
3469 name_temp,
3470 } = plan;
3471
3472 let op_a = catalog::Op::RenameSchema {
3473 database_spec: schema_a_db,
3474 schema_spec: schema_a,
3475 new_name: name_temp,
3476 check_reserved_names: false,
3477 };
3478 let op_b = catalog::Op::RenameSchema {
3479 database_spec: schema_b_db,
3480 schema_spec: schema_b,
3481 new_name: schema_a_name,
3482 check_reserved_names: false,
3483 };
3484 let op_c = catalog::Op::RenameSchema {
3485 database_spec: schema_a_db,
3486 schema_spec: schema_a,
3487 new_name: schema_b_name,
3488 check_reserved_names: false,
3489 };
3490
3491 match self
3492 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3493 Box::pin(async {})
3494 })
3495 .await
3496 {
3497 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3498 Err(err) => Err(err),
3499 }
3500 }
3501
3502 #[instrument]
3503 pub(super) async fn sequence_alter_role(
3504 &mut self,
3505 session: &Session,
3506 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3507 ) -> Result<ExecuteResponse, AdapterError> {
3508 let catalog = self.catalog().for_session(session);
3509 let role = catalog.get_role(&id);
3510
3511 let mut notices = vec![];
3513
3514 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3516 let mut vars = role.vars().clone();
3517
3518 let mut nopassword = false;
3521
3522 match option {
3524 PlannedAlterRoleOption::Attributes(attrs) => {
3525 self.validate_role_attributes(&attrs.clone().into())?;
3526
3527 if let Some(inherit) = attrs.inherit {
3528 attributes.inherit = inherit;
3529 }
3530
3531 if let Some(password) = attrs.password {
3532 attributes.password = Some(password);
3533 }
3534
3535 if let Some(superuser) = attrs.superuser {
3536 attributes.superuser = Some(superuser);
3537 }
3538
3539 if let Some(login) = attrs.login {
3540 attributes.login = Some(login);
3541 }
3542
3543 if attrs.nopassword.unwrap_or(false) {
3544 nopassword = true;
3545 }
3546
3547 if let Some(notice) = self.should_emit_rbac_notice(session) {
3548 notices.push(notice);
3549 }
3550 }
3551 PlannedAlterRoleOption::Variable(variable) => {
3552 let session_var = session.vars().inspect(variable.name())?;
3554 session_var.visible(session.user(), catalog.system_vars())?;
3556
3557 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3560 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3561 } else if let PlannedRoleVariable::Set {
3562 name,
3563 value: VariableValue::Values(vals),
3564 } = &variable
3565 {
3566 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3567 notices.push(AdapterNotice::IntrospectionClusterUsage);
3568 }
3569 }
3570
3571 let var_name = match variable {
3572 PlannedRoleVariable::Set { name, value } => {
3573 match &value {
3575 VariableValue::Default => {
3576 vars.remove(&name);
3577 }
3578 VariableValue::Values(vals) => {
3579 let var = match &vals[..] {
3580 [val] => OwnedVarInput::Flat(val.clone()),
3581 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3582 };
3583 session_var.check(var.borrow())?;
3585
3586 vars.insert(name.clone(), var);
3587 }
3588 };
3589 name
3590 }
3591 PlannedRoleVariable::Reset { name } => {
3592 vars.remove(&name);
3594 name
3595 }
3596 };
3597
3598 notices.push(AdapterNotice::VarDefaultUpdated {
3600 role: Some(name.clone()),
3601 var_name: Some(var_name),
3602 });
3603 }
3604 }
3605
3606 let op = catalog::Op::AlterRole {
3607 id,
3608 name,
3609 attributes,
3610 nopassword,
3611 vars: RoleVars { map: vars },
3612 };
3613 let response = self
3614 .catalog_transact(Some(session), vec![op])
3615 .await
3616 .map(|_| ExecuteResponse::AlteredRole)?;
3617
3618 session.add_notices(notices);
3620
3621 Ok(response)
3622 }
3623
3624 #[instrument]
3625 pub(super) async fn sequence_alter_sink_prepare(
3626 &mut self,
3627 ctx: ExecuteContext,
3628 plan: plan::AlterSinkPlan,
3629 ) {
3630 let id_bundle = crate::CollectionIdBundle {
3632 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3633 compute_ids: BTreeMap::new(),
3634 };
3635 let read_hold = self.acquire_read_holds(&id_bundle);
3636
3637 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3638 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3639 return;
3640 };
3641
3642 let otel_ctx = OpenTelemetryContext::obtain();
3643 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3644
3645 let plan_validity = PlanValidity::new(
3646 self.catalog().transient_revision(),
3647 BTreeSet::from_iter([plan.item_id, from_item_id]),
3648 Some(plan.in_cluster),
3649 None,
3650 ctx.session().role_metadata().clone(),
3651 );
3652
3653 info!(
3654 "preparing alter sink for {}: frontiers={:?} export={:?}",
3655 plan.global_id,
3656 self.controller
3657 .storage_collections
3658 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3659 self.controller.storage.export(plan.global_id)
3660 );
3661
3662 self.install_storage_watch_set(
3668 ctx.session().conn_id().clone(),
3669 BTreeSet::from_iter([plan.global_id]),
3670 read_ts,
3671 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3672 ctx: Some(ctx),
3673 otel_ctx,
3674 plan,
3675 plan_validity,
3676 read_hold,
3677 }),
3678 );
3679 }
3680
3681 #[instrument]
3682 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3683 ctx.otel_ctx.attach_as_parent();
3684
3685 let plan::AlterSinkPlan {
3686 item_id,
3687 global_id,
3688 sink: sink_plan,
3689 with_snapshot,
3690 in_cluster,
3691 } = ctx.plan.clone();
3692
3693 match ctx.plan_validity.check(self.catalog()) {
3702 Ok(()) => {}
3703 Err(err) => {
3704 ctx.retire(Err(err));
3705 return;
3706 }
3707 }
3708
3709 let entry = self.catalog().get_entry(&item_id);
3710 let CatalogItem::Sink(old_sink) = entry.item() else {
3711 panic!("invalid item kind for `AlterSinkPlan`");
3712 };
3713
3714 if sink_plan.version != old_sink.version + 1 {
3715 ctx.retire(Err(AdapterError::ChangedPlan(
3716 "sink was altered concurrently".into(),
3717 )));
3718 return;
3719 }
3720
3721 info!(
3722 "finishing alter sink for {global_id}: frontiers={:?} export={:?}",
3723 self.controller
3724 .storage_collections
3725 .collections_frontiers(vec![global_id, sink_plan.from]),
3726 self.controller.storage.export(global_id),
3727 );
3728
3729 let write_frontier = &self
3732 .controller
3733 .storage
3734 .export(global_id)
3735 .expect("sink known to exist")
3736 .write_frontier;
3737 let as_of = ctx.read_hold.least_valid_read();
3738 assert!(
3739 write_frontier.iter().all(|t| as_of.less_than(t)),
3740 "{:?} should be strictly less than {:?}",
3741 &*as_of,
3742 &**write_frontier
3743 );
3744
3745 let create_sql = &old_sink.create_sql;
3751 let parsed = mz_sql::parse::parse(create_sql).expect("valid create_sql");
3752 let Statement::CreateSink(mut stmt) = parsed.into_element().ast else {
3753 unreachable!("invalid statement kind for sink");
3754 };
3755
3756 stmt.with_options
3758 .retain(|o| o.name != CreateSinkOptionName::Version);
3759 stmt.with_options.push(CreateSinkOption {
3760 name: CreateSinkOptionName::Version,
3761 value: Some(WithOptionValue::Value(mz_sql::ast::Value::Number(
3762 sink_plan.version.to_string(),
3763 ))),
3764 });
3765
3766 let conn_catalog = self.catalog().for_system_session();
3767 let (mut stmt, resolved_ids) =
3768 mz_sql::names::resolve(&conn_catalog, stmt).expect("resolvable create_sql");
3769
3770 let from_entry = self.catalog().get_entry_by_global_id(&sink_plan.from);
3772 let full_name = self.catalog().resolve_full_name(from_entry.name(), None);
3773 stmt.from = ResolvedItemName::Item {
3774 id: from_entry.id(),
3775 qualifiers: from_entry.name.qualifiers.clone(),
3776 full_name,
3777 print_id: true,
3778 version: from_entry.version,
3779 };
3780
3781 let new_sink = Sink {
3782 create_sql: stmt.to_ast_string_stable(),
3783 global_id,
3784 from: sink_plan.from,
3785 connection: sink_plan.connection.clone(),
3786 envelope: sink_plan.envelope,
3787 version: sink_plan.version,
3788 with_snapshot,
3789 resolved_ids: resolved_ids.clone(),
3790 cluster_id: in_cluster,
3791 };
3792
3793 let ops = vec![catalog::Op::UpdateItem {
3794 id: item_id,
3795 name: entry.name().clone(),
3796 to_item: CatalogItem::Sink(new_sink),
3797 }];
3798
3799 match self
3800 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3801 .await
3802 {
3803 Ok(()) => {}
3804 Err(err) => {
3805 ctx.retire(Err(err));
3806 return;
3807 }
3808 }
3809
3810 let storage_sink_desc = StorageSinkDesc {
3811 from: sink_plan.from,
3812 from_desc: from_entry
3813 .desc_opt()
3814 .expect("sinks can only be built on items with descs")
3815 .into_owned(),
3816 connection: sink_plan
3817 .connection
3818 .clone()
3819 .into_inline_connection(self.catalog().state()),
3820 envelope: sink_plan.envelope,
3821 as_of,
3822 with_snapshot,
3823 version: sink_plan.version,
3824 from_storage_metadata: (),
3825 to_storage_metadata: (),
3826 };
3827
3828 self.controller
3829 .storage
3830 .alter_export(
3831 global_id,
3832 ExportDescription {
3833 sink: storage_sink_desc,
3834 instance_id: in_cluster,
3835 },
3836 )
3837 .await
3838 .unwrap_or_terminate("cannot fail to alter source desc");
3839
3840 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3841 }
3842
3843 #[instrument]
3844 pub(super) async fn sequence_alter_connection(
3845 &mut self,
3846 ctx: ExecuteContext,
3847 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3848 ) {
3849 match action {
3850 AlterConnectionAction::RotateKeys => {
3851 self.sequence_rotate_keys(ctx, id).await;
3852 }
3853 AlterConnectionAction::AlterOptions {
3854 set_options,
3855 drop_options,
3856 validate,
3857 } => {
3858 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3859 .await
3860 }
3861 }
3862 }
3863
3864 #[instrument]
3865 async fn sequence_alter_connection_options(
3866 &mut self,
3867 mut ctx: ExecuteContext,
3868 id: CatalogItemId,
3869 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3870 drop_options: BTreeSet<ConnectionOptionName>,
3871 validate: bool,
3872 ) {
3873 let cur_entry = self.catalog().get_entry(&id);
3874 let cur_conn = cur_entry.connection().expect("known to be connection");
3875 let connection_gid = cur_conn.global_id();
3876
3877 let inner = || -> Result<Connection, AdapterError> {
3878 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3880 .expect("invalid create sql persisted to catalog")
3881 .into_element()
3882 .ast
3883 {
3884 Statement::CreateConnection(stmt) => stmt,
3885 _ => unreachable!("proved type is source"),
3886 };
3887
3888 let catalog = self.catalog().for_system_session();
3889
3890 let (mut create_conn_stmt, resolved_ids) =
3892 mz_sql::names::resolve(&catalog, create_conn_stmt)
3893 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3894
3895 create_conn_stmt
3897 .values
3898 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3899
3900 create_conn_stmt.values.extend(
3902 set_options
3903 .into_iter()
3904 .map(|(name, value)| ConnectionOption { name, value }),
3905 );
3906
3907 let mut catalog = self.catalog().for_system_session();
3910 catalog.mark_id_unresolvable_for_replanning(id);
3911
3912 let plan = match mz_sql::plan::plan(
3914 None,
3915 &catalog,
3916 Statement::CreateConnection(create_conn_stmt),
3917 &Params::empty(),
3918 &resolved_ids,
3919 )
3920 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3921 {
3922 Plan::CreateConnection(plan) => plan,
3923 _ => unreachable!("create source plan is only valid response"),
3924 };
3925
3926 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3928 .expect("invalid create sql persisted to catalog")
3929 .into_element()
3930 .ast
3931 {
3932 Statement::CreateConnection(stmt) => stmt,
3933 _ => unreachable!("proved type is source"),
3934 };
3935
3936 let catalog = self.catalog().for_system_session();
3937
3938 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3940 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3941
3942 Ok(Connection {
3943 create_sql: plan.connection.create_sql,
3944 global_id: cur_conn.global_id,
3945 details: plan.connection.details,
3946 resolved_ids: new_deps,
3947 })
3948 };
3949
3950 let conn = match inner() {
3951 Ok(conn) => conn,
3952 Err(e) => {
3953 return ctx.retire(Err(e));
3954 }
3955 };
3956
3957 if validate {
3958 let connection = conn
3959 .details
3960 .to_connection()
3961 .into_inline_connection(self.catalog().state());
3962
3963 let internal_cmd_tx = self.internal_cmd_tx.clone();
3964 let transient_revision = self.catalog().transient_revision();
3965 let conn_id = ctx.session().conn_id().clone();
3966 let otel_ctx = OpenTelemetryContext::obtain();
3967 let role_metadata = ctx.session().role_metadata().clone();
3968 let current_storage_parameters = self.controller.storage.config().clone();
3969
3970 task::spawn(
3971 || format!("validate_alter_connection:{conn_id}"),
3972 async move {
3973 let resolved_ids = conn.resolved_ids.clone();
3974 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3975 let result = match connection.validate(id, ¤t_storage_parameters).await {
3976 Ok(()) => Ok(conn),
3977 Err(err) => Err(err.into()),
3978 };
3979
3980 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3982 AlterConnectionValidationReady {
3983 ctx,
3984 result,
3985 connection_id: id,
3986 connection_gid,
3987 plan_validity: PlanValidity::new(
3988 transient_revision,
3989 dependency_ids.clone(),
3990 None,
3991 None,
3992 role_metadata,
3993 ),
3994 otel_ctx,
3995 resolved_ids,
3996 },
3997 ));
3998 if let Err(e) = result {
3999 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
4000 }
4001 },
4002 );
4003 } else {
4004 let result = self
4005 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
4006 .await;
4007 ctx.retire(result);
4008 }
4009 }
4010
4011 #[instrument]
4012 pub(crate) async fn sequence_alter_connection_stage_finish(
4013 &mut self,
4014 session: &mut Session,
4015 id: CatalogItemId,
4016 connection: Connection,
4017 ) -> Result<ExecuteResponse, AdapterError> {
4018 match self.catalog.get_entry(&id).item() {
4019 CatalogItem::Connection(curr_conn) => {
4020 curr_conn
4021 .details
4022 .to_connection()
4023 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
4024 .map_err(StorageError::from)?;
4025 }
4026 _ => unreachable!("known to be a connection"),
4027 };
4028
4029 let ops = vec![catalog::Op::UpdateItem {
4030 id,
4031 name: self.catalog.get_entry(&id).name().clone(),
4032 to_item: CatalogItem::Connection(connection.clone()),
4033 }];
4034
4035 self.catalog_transact(Some(session), ops).await?;
4036
4037 match connection.details {
4038 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
4039 let spec = VpcEndpointConfig {
4040 aws_service_name: privatelink.service_name.to_owned(),
4041 availability_zone_ids: privatelink.availability_zones.to_owned(),
4042 };
4043 self.cloud_resource_controller
4044 .as_ref()
4045 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
4046 .ensure_vpc_endpoint(id, spec)
4047 .await?;
4048 }
4049 _ => {}
4050 };
4051
4052 let entry = self.catalog().get_entry(&id);
4053
4054 let mut connections = VecDeque::new();
4055 connections.push_front(entry.id());
4056
4057 let mut source_connections = BTreeMap::new();
4058 let mut sink_connections = BTreeMap::new();
4059 let mut source_export_data_configs = BTreeMap::new();
4060
4061 while let Some(id) = connections.pop_front() {
4062 for id in self.catalog.get_entry(&id).used_by() {
4063 let entry = self.catalog.get_entry(id);
4064 match entry.item() {
4065 CatalogItem::Connection(_) => connections.push_back(*id),
4066 CatalogItem::Source(source) => {
4067 let desc = match &entry.source().expect("known to be source").data_source {
4068 DataSourceDesc::Ingestion { desc, .. }
4069 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4070 desc.clone().into_inline_connection(self.catalog().state())
4071 }
4072 _ => unreachable!("only ingestions reference connections"),
4073 };
4074
4075 source_connections.insert(source.global_id, desc.connection);
4076 }
4077 CatalogItem::Sink(sink) => {
4078 let export = entry.sink().expect("known to be sink");
4079 sink_connections.insert(
4080 sink.global_id,
4081 export
4082 .connection
4083 .clone()
4084 .into_inline_connection(self.catalog().state()),
4085 );
4086 }
4087 CatalogItem::Table(table) => {
4088 if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4091 let data_config = export_data_config.clone();
4092 source_export_data_configs.insert(
4093 table.global_id_writes(),
4094 data_config.into_inline_connection(self.catalog().state()),
4095 );
4096 }
4097 }
4098 t => unreachable!("connection dependency not expected on {:?}", t),
4099 }
4100 }
4101 }
4102
4103 if !source_connections.is_empty() {
4104 self.controller
4105 .storage
4106 .alter_ingestion_connections(source_connections)
4107 .await
4108 .unwrap_or_terminate("cannot fail to alter ingestion connection");
4109 }
4110
4111 if !sink_connections.is_empty() {
4112 self.controller
4113 .storage
4114 .alter_export_connections(sink_connections)
4115 .await
4116 .unwrap_or_terminate("altering exports after txn must succeed");
4117 }
4118
4119 if !source_export_data_configs.is_empty() {
4120 self.controller
4121 .storage
4122 .alter_ingestion_export_data_configs(source_export_data_configs)
4123 .await
4124 .unwrap_or_terminate("altering source export data configs after txn must succeed");
4125 }
4126
4127 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4128 }
4129
4130 #[instrument]
4131 pub(super) async fn sequence_alter_source(
4132 &mut self,
4133 session: &Session,
4134 plan::AlterSourcePlan {
4135 item_id,
4136 ingestion_id,
4137 action,
4138 }: plan::AlterSourcePlan,
4139 ) -> Result<ExecuteResponse, AdapterError> {
4140 let cur_entry = self.catalog().get_entry(&item_id);
4141 let cur_source = cur_entry.source().expect("known to be source");
4142
4143 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4144 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4146 .expect("invalid create sql persisted to catalog")
4147 .into_element()
4148 .ast
4149 {
4150 Statement::CreateSource(stmt) => stmt,
4151 _ => unreachable!("proved type is source"),
4152 };
4153
4154 let catalog = coord.catalog().for_system_session();
4155
4156 mz_sql::names::resolve(&catalog, create_source_stmt)
4158 .map_err(|e| AdapterError::internal(err_cx, e))
4159 };
4160
4161 match action {
4162 plan::AlterSourceAction::AddSubsourceExports {
4163 subsources,
4164 options,
4165 } => {
4166 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4167
4168 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4169 text_columns: mut new_text_columns,
4170 exclude_columns: mut new_exclude_columns,
4171 ..
4172 } = options.try_into()?;
4173
4174 let (mut create_source_stmt, resolved_ids) =
4176 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4177
4178 let catalog = self.catalog();
4180 let curr_references: BTreeSet<_> = catalog
4181 .get_entry(&item_id)
4182 .used_by()
4183 .into_iter()
4184 .filter_map(|subsource| {
4185 catalog
4186 .get_entry(subsource)
4187 .subsource_details()
4188 .map(|(_id, reference, _details)| reference)
4189 })
4190 .collect();
4191
4192 let purification_err =
4195 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4196
4197 match &mut create_source_stmt.connection {
4201 CreateSourceConnection::Postgres {
4202 options: curr_options,
4203 ..
4204 } => {
4205 let mz_sql::plan::PgConfigOptionExtracted {
4206 mut text_columns, ..
4207 } = curr_options.clone().try_into()?;
4208
4209 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4212
4213 text_columns.retain(|column_qualified_reference| {
4215 mz_ore::soft_assert_eq_or_log!(
4216 column_qualified_reference.0.len(),
4217 4,
4218 "all TEXT COLUMNS values must be column-qualified references"
4219 );
4220 let mut table = column_qualified_reference.clone();
4221 table.0.truncate(3);
4222 curr_references.contains(&table)
4223 });
4224
4225 new_text_columns.extend(text_columns);
4227
4228 if !new_text_columns.is_empty() {
4230 new_text_columns.sort();
4231 let new_text_columns = new_text_columns
4232 .into_iter()
4233 .map(WithOptionValue::UnresolvedItemName)
4234 .collect();
4235
4236 curr_options.push(PgConfigOption {
4237 name: PgConfigOptionName::TextColumns,
4238 value: Some(WithOptionValue::Sequence(new_text_columns)),
4239 });
4240 }
4241 }
4242 CreateSourceConnection::MySql {
4243 options: curr_options,
4244 ..
4245 } => {
4246 let mz_sql::plan::MySqlConfigOptionExtracted {
4247 mut text_columns,
4248 mut exclude_columns,
4249 ..
4250 } = curr_options.clone().try_into()?;
4251
4252 curr_options.retain(|o| {
4255 !matches!(
4256 o.name,
4257 MySqlConfigOptionName::TextColumns
4258 | MySqlConfigOptionName::ExcludeColumns
4259 )
4260 });
4261
4262 let column_referenced =
4264 |column_qualified_reference: &UnresolvedItemName| {
4265 mz_ore::soft_assert_eq_or_log!(
4266 column_qualified_reference.0.len(),
4267 3,
4268 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4269 );
4270 let mut table = column_qualified_reference.clone();
4271 table.0.truncate(2);
4272 curr_references.contains(&table)
4273 };
4274 text_columns.retain(column_referenced);
4275 exclude_columns.retain(column_referenced);
4276
4277 new_text_columns.extend(text_columns);
4279 new_exclude_columns.extend(exclude_columns);
4280
4281 if !new_text_columns.is_empty() {
4283 new_text_columns.sort();
4284 let new_text_columns = new_text_columns
4285 .into_iter()
4286 .map(WithOptionValue::UnresolvedItemName)
4287 .collect();
4288
4289 curr_options.push(MySqlConfigOption {
4290 name: MySqlConfigOptionName::TextColumns,
4291 value: Some(WithOptionValue::Sequence(new_text_columns)),
4292 });
4293 }
4294 if !new_exclude_columns.is_empty() {
4296 new_exclude_columns.sort();
4297 let new_exclude_columns = new_exclude_columns
4298 .into_iter()
4299 .map(WithOptionValue::UnresolvedItemName)
4300 .collect();
4301
4302 curr_options.push(MySqlConfigOption {
4303 name: MySqlConfigOptionName::ExcludeColumns,
4304 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4305 });
4306 }
4307 }
4308 CreateSourceConnection::SqlServer {
4309 options: curr_options,
4310 ..
4311 } => {
4312 let mz_sql::plan::SqlServerConfigOptionExtracted {
4313 mut text_columns,
4314 mut exclude_columns,
4315 ..
4316 } = curr_options.clone().try_into()?;
4317
4318 curr_options.retain(|o| {
4321 !matches!(
4322 o.name,
4323 SqlServerConfigOptionName::TextColumns
4324 | SqlServerConfigOptionName::ExcludeColumns
4325 )
4326 });
4327
4328 let column_referenced =
4330 |column_qualified_reference: &UnresolvedItemName| {
4331 mz_ore::soft_assert_eq_or_log!(
4332 column_qualified_reference.0.len(),
4333 3,
4334 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4335 );
4336 let mut table = column_qualified_reference.clone();
4337 table.0.truncate(2);
4338 curr_references.contains(&table)
4339 };
4340 text_columns.retain(column_referenced);
4341 exclude_columns.retain(column_referenced);
4342
4343 new_text_columns.extend(text_columns);
4345 new_exclude_columns.extend(exclude_columns);
4346
4347 if !new_text_columns.is_empty() {
4349 new_text_columns.sort();
4350 let new_text_columns = new_text_columns
4351 .into_iter()
4352 .map(WithOptionValue::UnresolvedItemName)
4353 .collect();
4354
4355 curr_options.push(SqlServerConfigOption {
4356 name: SqlServerConfigOptionName::TextColumns,
4357 value: Some(WithOptionValue::Sequence(new_text_columns)),
4358 });
4359 }
4360 if !new_exclude_columns.is_empty() {
4362 new_exclude_columns.sort();
4363 let new_exclude_columns = new_exclude_columns
4364 .into_iter()
4365 .map(WithOptionValue::UnresolvedItemName)
4366 .collect();
4367
4368 curr_options.push(SqlServerConfigOption {
4369 name: SqlServerConfigOptionName::ExcludeColumns,
4370 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4371 });
4372 }
4373 }
4374 _ => return Err(purification_err()),
4375 };
4376
4377 let mut catalog = self.catalog().for_system_session();
4378 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4379
4380 let plan = match mz_sql::plan::plan(
4382 None,
4383 &catalog,
4384 Statement::CreateSource(create_source_stmt),
4385 &Params::empty(),
4386 &resolved_ids,
4387 )
4388 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4389 {
4390 Plan::CreateSource(plan) => plan,
4391 _ => unreachable!("create source plan is only valid response"),
4392 };
4393
4394 let source = Source::new(
4398 plan,
4399 cur_source.global_id,
4400 resolved_ids,
4401 cur_source.custom_logical_compaction_window,
4402 cur_source.is_retained_metrics_object,
4403 );
4404
4405 let source_compaction_window = source.custom_logical_compaction_window;
4406
4407 let desc = match &source.data_source {
4409 DataSourceDesc::Ingestion { desc, .. }
4410 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4411 desc.clone().into_inline_connection(self.catalog().state())
4412 }
4413 _ => unreachable!("already verified of type ingestion"),
4414 };
4415
4416 self.controller
4417 .storage
4418 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4419 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4420
4421 let mut ops = vec![catalog::Op::UpdateItem {
4424 id: item_id,
4425 name: self.catalog.get_entry(&item_id).name().clone(),
4428 to_item: CatalogItem::Source(source),
4429 }];
4430
4431 let CreateSourceInner {
4432 ops: new_ops,
4433 sources,
4434 if_not_exists_ids,
4435 } = self.create_source_inner(session, subsources).await?;
4436
4437 ops.extend(new_ops.into_iter());
4438
4439 assert!(
4440 if_not_exists_ids.is_empty(),
4441 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4442 );
4443
4444 self.catalog_transact(Some(session), ops).await?;
4445
4446 let mut item_ids = BTreeSet::new();
4447 let mut collections = Vec::with_capacity(sources.len());
4448 for (item_id, source) in sources {
4449 let status_id = self.catalog().resolve_builtin_storage_collection(
4450 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4451 );
4452 let source_status_collection_id =
4453 Some(self.catalog().get_entry(&status_id).latest_global_id());
4454
4455 let (data_source, status_collection_id) = match source.data_source {
4456 DataSourceDesc::IngestionExport {
4458 ingestion_id,
4459 external_reference: _,
4460 details,
4461 data_config,
4462 } => {
4463 let ingestion_id =
4466 self.catalog().get_entry(&ingestion_id).latest_global_id();
4467 (
4468 DataSource::IngestionExport {
4469 ingestion_id,
4470 details,
4471 data_config: data_config
4472 .into_inline_connection(self.catalog().state()),
4473 },
4474 source_status_collection_id,
4475 )
4476 }
4477 o => {
4478 unreachable!(
4479 "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4480 o
4481 )
4482 }
4483 };
4484
4485 collections.push((
4486 source.global_id,
4487 CollectionDescription {
4488 desc: source.desc.clone(),
4489 data_source,
4490 since: None,
4491 status_collection_id,
4492 timeline: Some(source.timeline.clone()),
4493 },
4494 ));
4495
4496 item_ids.insert(item_id);
4497 }
4498
4499 let storage_metadata = self.catalog.state().storage_metadata();
4500
4501 self.controller
4502 .storage
4503 .create_collections(storage_metadata, None, collections)
4504 .await
4505 .unwrap_or_terminate("cannot fail to create collections");
4506
4507 self.initialize_storage_read_policies(
4508 item_ids,
4509 source_compaction_window.unwrap_or(CompactionWindow::Default),
4510 )
4511 .await;
4512 }
4513 plan::AlterSourceAction::RefreshReferences { references } => {
4514 self.catalog_transact(
4515 Some(session),
4516 vec![catalog::Op::UpdateSourceReferences {
4517 source_id: item_id,
4518 references: references.into(),
4519 }],
4520 )
4521 .await?;
4522 }
4523 }
4524
4525 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4526 }
4527
4528 #[instrument]
4529 pub(super) async fn sequence_alter_system_set(
4530 &mut self,
4531 session: &Session,
4532 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4533 ) -> Result<ExecuteResponse, AdapterError> {
4534 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4535 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4537 self.validate_alter_system_network_policy(session, &value)?;
4538 }
4539
4540 let op = match value {
4541 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4542 name: name.clone(),
4543 value: OwnedVarInput::SqlSet(values),
4544 },
4545 plan::VariableValue::Default => {
4546 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4547 }
4548 };
4549 self.catalog_transact(Some(session), vec![op]).await?;
4550
4551 session.add_notice(AdapterNotice::VarDefaultUpdated {
4552 role: None,
4553 var_name: Some(name),
4554 });
4555 Ok(ExecuteResponse::AlteredSystemConfiguration)
4556 }
4557
4558 #[instrument]
4559 pub(super) async fn sequence_alter_system_reset(
4560 &mut self,
4561 session: &Session,
4562 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4563 ) -> Result<ExecuteResponse, AdapterError> {
4564 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4565 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4566 self.catalog_transact(Some(session), vec![op]).await?;
4567 session.add_notice(AdapterNotice::VarDefaultUpdated {
4568 role: None,
4569 var_name: Some(name),
4570 });
4571 Ok(ExecuteResponse::AlteredSystemConfiguration)
4572 }
4573
4574 #[instrument]
4575 pub(super) async fn sequence_alter_system_reset_all(
4576 &mut self,
4577 session: &Session,
4578 _: plan::AlterSystemResetAllPlan,
4579 ) -> Result<ExecuteResponse, AdapterError> {
4580 self.is_user_allowed_to_alter_system(session, None)?;
4581 let op = catalog::Op::ResetAllSystemConfiguration;
4582 self.catalog_transact(Some(session), vec![op]).await?;
4583 session.add_notice(AdapterNotice::VarDefaultUpdated {
4584 role: None,
4585 var_name: None,
4586 });
4587 Ok(ExecuteResponse::AlteredSystemConfiguration)
4588 }
4589
4590 fn is_user_allowed_to_alter_system(
4592 &self,
4593 session: &Session,
4594 var_name: Option<&str>,
4595 ) -> Result<(), AdapterError> {
4596 match (session.user().kind(), var_name) {
4597 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4599 (UserKind::Superuser, Some(name))
4601 if session.user().is_internal()
4602 || self.catalog().system_config().user_modifiable(name) =>
4603 {
4604 let var = self.catalog().system_config().get(name)?;
4607 var.visible(session.user(), self.catalog().system_config())?;
4608 Ok(())
4609 }
4610 (UserKind::Regular, Some(name))
4613 if self.catalog().system_config().user_modifiable(name) =>
4614 {
4615 Err(AdapterError::Unauthorized(
4616 rbac::UnauthorizedError::Superuser {
4617 action: format!("toggle the '{name}' system configuration parameter"),
4618 },
4619 ))
4620 }
4621 _ => Err(AdapterError::Unauthorized(
4622 rbac::UnauthorizedError::MzSystem {
4623 action: "alter system".into(),
4624 },
4625 )),
4626 }
4627 }
4628
4629 fn validate_alter_system_network_policy(
4630 &self,
4631 session: &Session,
4632 policy_value: &plan::VariableValue,
4633 ) -> Result<(), AdapterError> {
4634 let policy_name = match &policy_value {
4635 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4637 plan::VariableValue::Values(values) if values.len() == 1 => {
4638 values.iter().next().cloned()
4639 }
4640 plan::VariableValue::Values(values) => {
4641 tracing::warn!(?values, "can't set multiple network policies at once");
4642 None
4643 }
4644 };
4645 let maybe_network_policy = policy_name
4646 .as_ref()
4647 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4648 let Some(network_policy) = maybe_network_policy else {
4649 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4650 VarError::InvalidParameterValue {
4651 name: NETWORK_POLICY.name(),
4652 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4653 reason: "no network policy with such name exists".to_string(),
4654 },
4655 )));
4656 };
4657 self.validate_alter_network_policy(session, &network_policy.rules)
4658 }
4659
4660 fn validate_alter_network_policy(
4665 &self,
4666 session: &Session,
4667 policy_rules: &Vec<NetworkPolicyRule>,
4668 ) -> Result<(), AdapterError> {
4669 if session.user().is_internal() {
4672 return Ok(());
4673 }
4674 if let Some(ip) = session.meta().client_ip() {
4675 validate_ip_with_policy_rules(ip, policy_rules)
4676 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4677 } else {
4678 return Err(AdapterError::NetworkPolicyDenied(
4681 NetworkPolicyError::MissingIp,
4682 ));
4683 }
4684 Ok(())
4685 }
4686
4687 #[instrument]
4689 pub(super) fn sequence_execute(
4690 &mut self,
4691 session: &mut Session,
4692 plan: plan::ExecutePlan,
4693 ) -> Result<String, AdapterError> {
4694 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4696 let ps = session
4697 .get_prepared_statement_unverified(&plan.name)
4698 .expect("known to exist");
4699 let stmt = ps.stmt().cloned();
4700 let desc = ps.desc().clone();
4701 let state_revision = ps.state_revision;
4702 let logging = Arc::clone(ps.logging());
4703 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4704 }
4705
4706 #[instrument]
4707 pub(super) async fn sequence_grant_privileges(
4708 &mut self,
4709 session: &Session,
4710 plan::GrantPrivilegesPlan {
4711 update_privileges,
4712 grantees,
4713 }: plan::GrantPrivilegesPlan,
4714 ) -> Result<ExecuteResponse, AdapterError> {
4715 self.sequence_update_privileges(
4716 session,
4717 update_privileges,
4718 grantees,
4719 UpdatePrivilegeVariant::Grant,
4720 )
4721 .await
4722 }
4723
4724 #[instrument]
4725 pub(super) async fn sequence_revoke_privileges(
4726 &mut self,
4727 session: &Session,
4728 plan::RevokePrivilegesPlan {
4729 update_privileges,
4730 revokees,
4731 }: plan::RevokePrivilegesPlan,
4732 ) -> Result<ExecuteResponse, AdapterError> {
4733 self.sequence_update_privileges(
4734 session,
4735 update_privileges,
4736 revokees,
4737 UpdatePrivilegeVariant::Revoke,
4738 )
4739 .await
4740 }
4741
4742 #[instrument]
4743 async fn sequence_update_privileges(
4744 &mut self,
4745 session: &Session,
4746 update_privileges: Vec<UpdatePrivilege>,
4747 grantees: Vec<RoleId>,
4748 variant: UpdatePrivilegeVariant,
4749 ) -> Result<ExecuteResponse, AdapterError> {
4750 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4751 let mut warnings = Vec::new();
4752 let catalog = self.catalog().for_session(session);
4753
4754 for UpdatePrivilege {
4755 acl_mode,
4756 target_id,
4757 grantor,
4758 } in update_privileges
4759 {
4760 let actual_object_type = catalog.get_system_object_type(&target_id);
4761 if actual_object_type.is_relation() {
4764 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4765 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4766 if !non_applicable_privileges.is_empty() {
4767 let object_description =
4768 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4769 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4770 non_applicable_privileges,
4771 object_description,
4772 })
4773 }
4774 }
4775
4776 if let SystemObjectId::Object(object_id) = &target_id {
4777 self.catalog()
4778 .ensure_not_reserved_object(object_id, session.conn_id())?;
4779 }
4780
4781 let privileges = self
4782 .catalog()
4783 .get_privileges(&target_id, session.conn_id())
4784 .ok_or(AdapterError::Unsupported(
4787 "GRANTs/REVOKEs on an object type with no privileges",
4788 ))?;
4789
4790 for grantee in &grantees {
4791 self.catalog().ensure_not_system_role(grantee)?;
4792 self.catalog().ensure_not_predefined_role(grantee)?;
4793 let existing_privilege = privileges
4794 .get_acl_item(grantee, &grantor)
4795 .map(Cow::Borrowed)
4796 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4797
4798 match variant {
4799 UpdatePrivilegeVariant::Grant
4800 if !existing_privilege.acl_mode.contains(acl_mode) =>
4801 {
4802 ops.push(catalog::Op::UpdatePrivilege {
4803 target_id: target_id.clone(),
4804 privilege: MzAclItem {
4805 grantee: *grantee,
4806 grantor,
4807 acl_mode,
4808 },
4809 variant,
4810 });
4811 }
4812 UpdatePrivilegeVariant::Revoke
4813 if !existing_privilege
4814 .acl_mode
4815 .intersection(acl_mode)
4816 .is_empty() =>
4817 {
4818 ops.push(catalog::Op::UpdatePrivilege {
4819 target_id: target_id.clone(),
4820 privilege: MzAclItem {
4821 grantee: *grantee,
4822 grantor,
4823 acl_mode,
4824 },
4825 variant,
4826 });
4827 }
4828 _ => {}
4830 }
4831 }
4832 }
4833
4834 if ops.is_empty() {
4835 session.add_notices(warnings);
4836 return Ok(variant.into());
4837 }
4838
4839 let res = self
4840 .catalog_transact(Some(session), ops)
4841 .await
4842 .map(|_| match variant {
4843 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4844 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4845 });
4846 if res.is_ok() {
4847 session.add_notices(warnings);
4848 }
4849 res
4850 }
4851
4852 #[instrument]
4853 pub(super) async fn sequence_alter_default_privileges(
4854 &mut self,
4855 session: &Session,
4856 plan::AlterDefaultPrivilegesPlan {
4857 privilege_objects,
4858 privilege_acl_items,
4859 is_grant,
4860 }: plan::AlterDefaultPrivilegesPlan,
4861 ) -> Result<ExecuteResponse, AdapterError> {
4862 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4863 let variant = if is_grant {
4864 UpdatePrivilegeVariant::Grant
4865 } else {
4866 UpdatePrivilegeVariant::Revoke
4867 };
4868 for privilege_object in &privilege_objects {
4869 self.catalog()
4870 .ensure_not_system_role(&privilege_object.role_id)?;
4871 self.catalog()
4872 .ensure_not_predefined_role(&privilege_object.role_id)?;
4873 if let Some(database_id) = privilege_object.database_id {
4874 self.catalog()
4875 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4876 }
4877 if let Some(schema_id) = privilege_object.schema_id {
4878 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4879 let schema_spec: SchemaSpecifier = schema_id.into();
4880
4881 self.catalog().ensure_not_reserved_object(
4882 &(database_spec, schema_spec).into(),
4883 session.conn_id(),
4884 )?;
4885 }
4886 for privilege_acl_item in &privilege_acl_items {
4887 self.catalog()
4888 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4889 self.catalog()
4890 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4891 ops.push(catalog::Op::UpdateDefaultPrivilege {
4892 privilege_object: privilege_object.clone(),
4893 privilege_acl_item: privilege_acl_item.clone(),
4894 variant,
4895 })
4896 }
4897 }
4898
4899 self.catalog_transact(Some(session), ops).await?;
4900 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4901 }
4902
4903 #[instrument]
4904 pub(super) async fn sequence_grant_role(
4905 &mut self,
4906 session: &Session,
4907 plan::GrantRolePlan {
4908 role_ids,
4909 member_ids,
4910 grantor_id,
4911 }: plan::GrantRolePlan,
4912 ) -> Result<ExecuteResponse, AdapterError> {
4913 let catalog = self.catalog();
4914 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4915 for role_id in role_ids {
4916 for member_id in &member_ids {
4917 let member_membership: BTreeSet<_> =
4918 catalog.get_role(member_id).membership().keys().collect();
4919 if member_membership.contains(&role_id) {
4920 let role_name = catalog.get_role(&role_id).name().to_string();
4921 let member_name = catalog.get_role(member_id).name().to_string();
4922 catalog.ensure_not_reserved_role(member_id)?;
4924 catalog.ensure_grantable_role(&role_id)?;
4925 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4926 role_name,
4927 member_name,
4928 });
4929 } else {
4930 ops.push(catalog::Op::GrantRole {
4931 role_id,
4932 member_id: *member_id,
4933 grantor_id,
4934 });
4935 }
4936 }
4937 }
4938
4939 if ops.is_empty() {
4940 return Ok(ExecuteResponse::GrantedRole);
4941 }
4942
4943 self.catalog_transact(Some(session), ops)
4944 .await
4945 .map(|_| ExecuteResponse::GrantedRole)
4946 }
4947
4948 #[instrument]
4949 pub(super) async fn sequence_revoke_role(
4950 &mut self,
4951 session: &Session,
4952 plan::RevokeRolePlan {
4953 role_ids,
4954 member_ids,
4955 grantor_id,
4956 }: plan::RevokeRolePlan,
4957 ) -> Result<ExecuteResponse, AdapterError> {
4958 let catalog = self.catalog();
4959 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4960 for role_id in role_ids {
4961 for member_id in &member_ids {
4962 let member_membership: BTreeSet<_> =
4963 catalog.get_role(member_id).membership().keys().collect();
4964 if !member_membership.contains(&role_id) {
4965 let role_name = catalog.get_role(&role_id).name().to_string();
4966 let member_name = catalog.get_role(member_id).name().to_string();
4967 catalog.ensure_not_reserved_role(member_id)?;
4969 catalog.ensure_grantable_role(&role_id)?;
4970 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4971 role_name,
4972 member_name,
4973 });
4974 } else {
4975 ops.push(catalog::Op::RevokeRole {
4976 role_id,
4977 member_id: *member_id,
4978 grantor_id,
4979 });
4980 }
4981 }
4982 }
4983
4984 if ops.is_empty() {
4985 return Ok(ExecuteResponse::RevokedRole);
4986 }
4987
4988 self.catalog_transact(Some(session), ops)
4989 .await
4990 .map(|_| ExecuteResponse::RevokedRole)
4991 }
4992
4993 #[instrument]
4994 pub(super) async fn sequence_alter_owner(
4995 &mut self,
4996 session: &Session,
4997 plan::AlterOwnerPlan {
4998 id,
4999 object_type,
5000 new_owner,
5001 }: plan::AlterOwnerPlan,
5002 ) -> Result<ExecuteResponse, AdapterError> {
5003 let mut ops = vec![catalog::Op::UpdateOwner {
5004 id: id.clone(),
5005 new_owner,
5006 }];
5007
5008 match &id {
5009 ObjectId::Item(global_id) => {
5010 let entry = self.catalog().get_entry(global_id);
5011
5012 if entry.is_index() {
5014 let name = self
5015 .catalog()
5016 .resolve_full_name(entry.name(), Some(session.conn_id()))
5017 .to_string();
5018 session.add_notice(AdapterNotice::AlterIndexOwner { name });
5019 return Ok(ExecuteResponse::AlteredObject(object_type));
5020 }
5021
5022 let dependent_index_ops = entry
5024 .used_by()
5025 .into_iter()
5026 .filter(|id| self.catalog().get_entry(id).is_index())
5027 .map(|id| catalog::Op::UpdateOwner {
5028 id: ObjectId::Item(*id),
5029 new_owner,
5030 });
5031 ops.extend(dependent_index_ops);
5032
5033 let dependent_subsources =
5035 entry
5036 .progress_id()
5037 .into_iter()
5038 .map(|item_id| catalog::Op::UpdateOwner {
5039 id: ObjectId::Item(item_id),
5040 new_owner,
5041 });
5042 ops.extend(dependent_subsources);
5043 }
5044 ObjectId::Cluster(cluster_id) => {
5045 let cluster = self.catalog().get_cluster(*cluster_id);
5046 let managed_cluster_replica_ops =
5048 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
5049 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
5050 new_owner,
5051 });
5052 ops.extend(managed_cluster_replica_ops);
5053 }
5054 _ => {}
5055 }
5056
5057 self.catalog_transact(Some(session), ops)
5058 .await
5059 .map(|_| ExecuteResponse::AlteredObject(object_type))
5060 }
5061
5062 #[instrument]
5063 pub(super) async fn sequence_reassign_owned(
5064 &mut self,
5065 session: &Session,
5066 plan::ReassignOwnedPlan {
5067 old_roles,
5068 new_role,
5069 reassign_ids,
5070 }: plan::ReassignOwnedPlan,
5071 ) -> Result<ExecuteResponse, AdapterError> {
5072 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5073 self.catalog().ensure_not_reserved_role(role_id)?;
5074 }
5075
5076 let ops = reassign_ids
5077 .into_iter()
5078 .map(|id| catalog::Op::UpdateOwner {
5079 id,
5080 new_owner: new_role,
5081 })
5082 .collect();
5083
5084 self.catalog_transact(Some(session), ops)
5085 .await
5086 .map(|_| ExecuteResponse::ReassignOwned)
5087 }
5088
5089 #[instrument]
5090 pub(crate) async fn handle_deferred_statement(&mut self) {
5091 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5095 return;
5096 };
5097 match ps {
5098 crate::coord::PlanStatement::Statement { stmt, params } => {
5099 self.handle_execute_inner(stmt, params, ctx).await;
5100 }
5101 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5102 self.sequence_plan(ctx, plan, resolved_ids).await;
5103 }
5104 }
5105 }
5106
5107 #[instrument]
5108 #[allow(clippy::unused_async)]
5110 pub(super) async fn sequence_alter_table(
5111 &mut self,
5112 ctx: &mut ExecuteContext,
5113 plan: plan::AlterTablePlan,
5114 ) -> Result<ExecuteResponse, AdapterError> {
5115 let plan::AlterTablePlan {
5116 relation_id,
5117 column_name,
5118 column_type,
5119 raw_sql_type,
5120 } = plan;
5121
5122 let id_ts = self.get_catalog_write_ts().await;
5124 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5125 let ops = vec![catalog::Op::AlterAddColumn {
5126 id: relation_id,
5127 new_global_id,
5128 name: column_name,
5129 typ: column_type,
5130 sql: raw_sql_type,
5131 }];
5132
5133 let entry = self.catalog().get_entry(&relation_id);
5134 let CatalogItem::Table(table) = &entry.item else {
5135 let err = format!("expected table, found {:?}", entry.item);
5136 return Err(AdapterError::Internal(err));
5137 };
5138 let expected_version = table.desc.latest_version();
5140 let existing_global_id = table.global_id_writes();
5141
5142 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5143 Box::pin(async move {
5144 let entry = coord.catalog().get_entry(&relation_id);
5145 let CatalogItem::Table(table) = &entry.item else {
5146 panic!("programming error, expected table found {:?}", entry.item);
5147 };
5148 let table = table.clone();
5149
5150 let existing_table = crate::CollectionIdBundle {
5154 storage_ids: btreeset![existing_global_id],
5155 compute_ids: BTreeMap::new(),
5156 };
5157 let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5158
5159 let new_version = table.desc.latest_version();
5160 let new_desc = table
5161 .desc
5162 .at_version(RelationVersionSelector::Specific(new_version));
5163 let register_ts = coord.get_local_write_ts().await.timestamp;
5164
5165 coord
5167 .controller
5168 .storage
5169 .alter_table_desc(
5170 existing_global_id,
5171 new_global_id,
5172 new_desc,
5173 expected_version,
5174 register_ts,
5175 )
5176 .await
5177 .expect("failed to alter desc of table");
5178
5179 let compaction_window = table
5181 .custom_logical_compaction_window
5182 .unwrap_or(CompactionWindow::Default);
5183 coord
5184 .initialize_read_policies(
5185 &crate::CollectionIdBundle {
5186 storage_ids: btreeset![new_global_id],
5187 compute_ids: BTreeMap::new(),
5188 },
5189 compaction_window,
5190 )
5191 .await;
5192 coord.apply_local_write(register_ts).await;
5193
5194 drop(existing_table_read_hold);
5196 })
5197 })
5198 .await?;
5199
5200 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5201 }
5202}
5203
5204#[derive(Debug)]
5205struct CachedStatisticsOracle {
5206 cache: BTreeMap<GlobalId, usize>,
5207}
5208
5209impl CachedStatisticsOracle {
5210 pub async fn new<T: TimelyTimestamp>(
5211 ids: &BTreeSet<GlobalId>,
5212 as_of: &Antichain<T>,
5213 storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5214 ) -> Result<Self, StorageError<T>> {
5215 let mut cache = BTreeMap::new();
5216
5217 for id in ids {
5218 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5219
5220 match stats {
5221 Ok(stats) => {
5222 cache.insert(*id, stats.num_updates);
5223 }
5224 Err(StorageError::IdentifierMissing(id)) => {
5225 ::tracing::debug!("no statistics for {id}")
5226 }
5227 Err(e) => return Err(e),
5228 }
5229 }
5230
5231 Ok(Self { cache })
5232 }
5233}
5234
5235impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5236 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5237 self.cache.get(&id).map(|estimate| *estimate)
5238 }
5239
5240 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5241 self.cache.clone()
5242 }
5243}
5244
5245impl Coordinator {
5246 pub(super) async fn statistics_oracle(
5247 &self,
5248 session: &Session,
5249 source_ids: &BTreeSet<GlobalId>,
5250 query_as_of: &Antichain<Timestamp>,
5251 is_oneshot: bool,
5252 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5253 if !session.vars().enable_session_cardinality_estimates() {
5254 return Ok(Box::new(EmptyStatisticsOracle));
5255 }
5256
5257 let timeout = if is_oneshot {
5258 self.catalog()
5260 .system_config()
5261 .optimizer_oneshot_stats_timeout()
5262 } else {
5263 self.catalog().system_config().optimizer_stats_timeout()
5264 };
5265
5266 let cached_stats = mz_ore::future::timeout(
5267 timeout,
5268 CachedStatisticsOracle::new(
5269 source_ids,
5270 query_as_of,
5271 self.controller.storage_collections.as_ref(),
5272 ),
5273 )
5274 .await;
5275
5276 match cached_stats {
5277 Ok(stats) => Ok(Box::new(stats)),
5278 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5279 warn!(
5280 is_oneshot = is_oneshot,
5281 "optimizer statistics collection timed out after {}ms",
5282 timeout.as_millis()
5283 );
5284
5285 Ok(Box::new(EmptyStatisticsOracle))
5286 }
5287 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5288 }
5289 }
5290}
5291
5292pub(super) fn check_log_reads(
5301 catalog: &Catalog,
5302 cluster: &Cluster,
5303 source_ids: &BTreeSet<GlobalId>,
5304 target_replica: &mut Option<ReplicaId>,
5305 vars: &SessionVars,
5306) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5307where
5308{
5309 let log_names = source_ids
5310 .iter()
5311 .map(|gid| catalog.resolve_item_id(gid))
5312 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5313 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5314 .collect::<Vec<_>>();
5315
5316 if log_names.is_empty() {
5317 return Ok(None);
5318 }
5319
5320 let num_replicas = cluster.replicas().count();
5324 if target_replica.is_none() {
5325 if num_replicas == 1 {
5326 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5327 } else {
5328 return Err(AdapterError::UntargetedLogRead { log_names });
5329 }
5330 }
5331
5332 let replica_id = target_replica.expect("set to `Some` above");
5335 let replica = &cluster.replica(replica_id).expect("Replica must exist");
5336 if !replica.config.compute.logging.enabled() {
5337 return Err(AdapterError::IntrospectionDisabled { log_names });
5338 }
5339
5340 Ok(vars
5341 .emit_introspection_query_notice()
5342 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5343}
5344
5345impl Coordinator {
5346 fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5348 if notices.is_empty() {
5350 return;
5351 }
5352 let humanizer = self.catalog.for_session(session);
5353 let system_vars = self.catalog.system_config();
5354 for notice in notices {
5355 let kind = OptimizerNoticeKind::from(notice);
5356 let notice_enabled = match kind {
5357 OptimizerNoticeKind::IndexAlreadyExists => {
5358 system_vars.enable_notices_for_index_already_exists()
5359 }
5360 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5361 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5362 }
5363 OptimizerNoticeKind::IndexKeyEmpty => {
5364 system_vars.enable_notices_for_index_empty_key()
5365 }
5366 };
5367 if notice_enabled {
5368 session.add_notice(AdapterNotice::OptimizerNotice {
5372 notice: notice.message(&humanizer, false).to_string(),
5373 hint: notice.hint(&humanizer, false).to_string(),
5374 });
5375 }
5376 self.metrics
5377 .optimization_notices
5378 .with_label_values(&[kind.metric_label()])
5379 .inc_by(1);
5380 }
5381 }
5382
5383 async fn process_dataflow_metainfo(
5385 &mut self,
5386 df_meta: DataflowMetainfo,
5387 export_id: GlobalId,
5388 ctx: Option<&mut ExecuteContext>,
5389 notice_ids: Vec<GlobalId>,
5390 ) -> Option<BuiltinTableAppendNotify> {
5391 if let Some(ctx) = ctx {
5393 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5394 }
5395
5396 let df_meta = self
5398 .catalog()
5399 .render_notices(df_meta, notice_ids, Some(export_id));
5400
5401 if self.catalog().state().system_config().enable_mz_notices()
5404 && !df_meta.optimizer_notices.is_empty()
5405 {
5406 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5407 self.catalog().state().pack_optimizer_notices(
5408 &mut builtin_table_updates,
5409 df_meta.optimizer_notices.iter(),
5410 Diff::ONE,
5411 );
5412
5413 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5415
5416 Some(
5417 self.builtin_table_update()
5418 .execute(builtin_table_updates)
5419 .await
5420 .0,
5421 )
5422 } else {
5423 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5425
5426 None
5427 }
5428 }
5429}