1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27 CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46 CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47 RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50 AlterSourceAddSubsourceOption, CreateSourceOptionName, CreateSubsourceOption,
51 CreateSubsourceOptionName, SqlServerConfigOption, SqlServerConfigOptionName,
52};
53use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
54use mz_sql::catalog::{
55 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
56 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
57 ErrorMessageObjectDescription, ObjectType, RoleAttributesRaw, RoleVars, SessionCatalog,
58};
59use mz_sql::names::{
60 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
61 SchemaSpecifier, SystemObjectId,
62};
63use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_storage_types::stats::RelationPartStats;
92use mz_transform::EmptyStatisticsOracle;
93use mz_transform::dataflow::DataflowMetainfo;
94use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
95use smallvec::SmallVec;
96use timely::progress::Antichain;
97use timely::progress::Timestamp as TimelyTimestamp;
98use tokio::sync::{oneshot, watch};
99use tracing::{Instrument, Span, info, warn};
100
101use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
102use crate::command::{ExecuteResponse, Response};
103use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
104use crate::coord::{
105 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
106 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
107 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
108 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
109 validate_ip_with_policy_rules,
110};
111use crate::error::AdapterError;
112use crate::notice::{AdapterNotice, DroppedInUseIndex};
113use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
114use crate::optimize::{self, Optimize};
115use crate::session::{
116 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
117 WriteLocks, WriteOp,
118};
119use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
120use crate::{PeekResponseUnary, ReadHolds};
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = match res {
255 Ok(next) => return_if_err!(next, ctx),
256 Err(err) => {
257 tracing::error!("sequence_staged join error {err}");
258 ctx.retire(Err(AdapterError::Internal(
259 "sequence_staged join error".into(),
260 )));
261 return;
262 }
263 };
264 f(ctx, next);
265 }
266 _ = rx, if cancel_enabled => {
267 ctx.retire(Err(AdapterError::Canceled));
268 }
269 }
270 });
271 }
272
273 async fn create_source_inner(
274 &self,
275 session: &Session,
276 plans: Vec<plan::CreateSourcePlanBundle>,
277 ) -> Result<CreateSourceInner, AdapterError> {
278 let mut ops = vec![];
279 let mut sources = vec![];
280
281 let if_not_exists_ids = plans
282 .iter()
283 .filter_map(
284 |plan::CreateSourcePlanBundle {
285 item_id,
286 global_id: _,
287 plan,
288 resolved_ids: _,
289 available_source_references: _,
290 }| {
291 if plan.if_not_exists {
292 Some((*item_id, plan.name.clone()))
293 } else {
294 None
295 }
296 },
297 )
298 .collect::<BTreeMap<_, _>>();
299
300 for plan::CreateSourcePlanBundle {
301 item_id,
302 global_id,
303 mut plan,
304 resolved_ids,
305 available_source_references,
306 } in plans
307 {
308 let name = plan.name.clone();
309
310 match plan.source.data_source {
311 plan::DataSourceDesc::Ingestion(ref desc)
312 | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
313 let cluster_id = plan
314 .in_cluster
315 .expect("ingestion plans must specify cluster");
316 match desc.connection {
317 GenericSourceConnection::Postgres(_)
318 | GenericSourceConnection::MySql(_)
319 | GenericSourceConnection::SqlServer(_)
320 | GenericSourceConnection::Kafka(_)
321 | GenericSourceConnection::LoadGenerator(_) => {
322 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
323 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
324 .get(self.catalog().system_config().dyncfgs());
325
326 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
327 {
328 return Err(AdapterError::Unsupported(
329 "sources in clusters with >1 replicas",
330 ));
331 }
332 }
333 }
334 }
335 }
336 plan::DataSourceDesc::Webhook { .. } => {
337 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
338 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
339 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
340 .get(self.catalog().system_config().dyncfgs());
341
342 if !enable_multi_replica_sources {
343 if cluster.replica_ids().len() > 1 {
344 return Err(AdapterError::Unsupported(
345 "webhook sources in clusters with >1 replicas",
346 ));
347 }
348 }
349 }
350 }
351 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
352 }
353
354 if let mz_sql::plan::DataSourceDesc::Webhook {
356 validate_using: Some(validate),
357 ..
358 } = &mut plan.source.data_source
359 {
360 if let Err(reason) = validate.reduce_expression().await {
361 self.metrics
362 .webhook_validation_reduce_failures
363 .with_label_values(&[reason])
364 .inc();
365 return Err(AdapterError::Internal(format!(
366 "failed to reduce check expression, {reason}"
367 )));
368 }
369 }
370
371 let mut reference_ops = vec![];
374 if let Some(references) = &available_source_references {
375 reference_ops.push(catalog::Op::UpdateSourceReferences {
376 source_id: item_id,
377 references: references.clone().into(),
378 });
379 }
380
381 let source = Source::new(plan, global_id, resolved_ids, None, false);
382 ops.push(catalog::Op::CreateItem {
383 id: item_id,
384 name,
385 item: CatalogItem::Source(source.clone()),
386 owner_id: *session.current_role_id(),
387 });
388 sources.push((item_id, source));
389 ops.extend(reference_ops);
391 }
392
393 Ok(CreateSourceInner {
394 ops,
395 sources,
396 if_not_exists_ids,
397 })
398 }
399
400 pub(crate) fn plan_subsource(
408 &self,
409 session: &Session,
410 params: &mz_sql::plan::Params,
411 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
412 item_id: CatalogItemId,
413 global_id: GlobalId,
414 ) -> Result<CreateSourcePlanBundle, AdapterError> {
415 let catalog = self.catalog().for_session(session);
416 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
417
418 let plan = self.plan_statement(
419 session,
420 Statement::CreateSubsource(subsource_stmt),
421 params,
422 &resolved_ids,
423 )?;
424 let plan = match plan {
425 Plan::CreateSource(plan) => plan,
426 _ => unreachable!(),
427 };
428 Ok(CreateSourcePlanBundle {
429 item_id,
430 global_id,
431 plan,
432 resolved_ids,
433 available_source_references: None,
434 })
435 }
436
437 pub(crate) async fn plan_purified_alter_source_add_subsource(
439 &mut self,
440 session: &Session,
441 params: Params,
442 source_name: ResolvedItemName,
443 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
444 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
445 ) -> Result<(Plan, ResolvedIds), AdapterError> {
446 let mut subsource_plans = Vec::with_capacity(subsources.len());
447
448 let conn_catalog = self.catalog().for_system_session();
450 let pcx = plan::PlanContext::zero();
451 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
452
453 let entry = self.catalog().get_entry(source_name.item_id());
454 let source = entry.source().ok_or_else(|| {
455 AdapterError::internal(
456 "plan alter source",
457 format!("expected Source found {entry:?}"),
458 )
459 })?;
460
461 let item_id = entry.id();
462 let ingestion_id = source.global_id();
463 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
464
465 let id_ts = self.get_catalog_write_ts().await;
466 let ids = self
467 .catalog_mut()
468 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
469 .await?;
470 for (subsource_stmt, (item_id, global_id)) in
471 subsource_stmts.into_iter().zip_eq(ids.into_iter())
472 {
473 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
474 subsource_plans.push(s);
475 }
476
477 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
478 subsources: subsource_plans,
479 options,
480 };
481
482 Ok((
483 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
484 item_id,
485 ingestion_id,
486 action,
487 }),
488 ResolvedIds::empty(),
489 ))
490 }
491
492 pub(crate) fn plan_purified_alter_source_refresh_references(
494 &self,
495 _session: &Session,
496 _params: Params,
497 source_name: ResolvedItemName,
498 available_source_references: plan::SourceReferences,
499 ) -> Result<(Plan, ResolvedIds), AdapterError> {
500 let entry = self.catalog().get_entry(source_name.item_id());
501 let source = entry.source().ok_or_else(|| {
502 AdapterError::internal(
503 "plan alter source",
504 format!("expected Source found {entry:?}"),
505 )
506 })?;
507 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
508 references: available_source_references,
509 };
510
511 Ok((
512 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
513 item_id: entry.id(),
514 ingestion_id: source.global_id(),
515 action,
516 }),
517 ResolvedIds::empty(),
518 ))
519 }
520
521 pub(crate) async fn plan_purified_create_source(
525 &mut self,
526 ctx: &ExecuteContext,
527 params: Params,
528 progress_stmt: CreateSubsourceStatement<Aug>,
529 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
530 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
531 available_source_references: plan::SourceReferences,
532 ) -> Result<(Plan, ResolvedIds), AdapterError> {
533 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
534
535 assert_none!(progress_stmt.of_source);
542 let id_ts = self.get_catalog_write_ts().await;
543 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
544 let progress_plan =
545 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
546 let progress_full_name = self
547 .catalog()
548 .resolve_full_name(&progress_plan.plan.name, None);
549 let progress_subsource = ResolvedItemName::Item {
550 id: progress_plan.item_id,
551 qualifiers: progress_plan.plan.name.qualifiers.clone(),
552 full_name: progress_full_name,
553 print_id: true,
554 version: RelationVersionSelector::Latest,
555 };
556
557 create_source_plans.push(progress_plan);
558
559 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
560
561 let catalog = self.catalog().for_session(ctx.session());
562 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
563
564 let propagated_with_options: Vec<_> = source_stmt
565 .with_options
566 .iter()
567 .filter_map(|opt| match opt.name {
568 CreateSourceOptionName::TimestampInterval => None,
569 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
570 name: CreateSubsourceOptionName::RetainHistory,
571 value: opt.value.clone(),
572 }),
573 })
574 .collect();
575
576 let source_plan = match self.plan_statement(
578 ctx.session(),
579 Statement::CreateSource(source_stmt),
580 ¶ms,
581 &resolved_ids,
582 )? {
583 Plan::CreateSource(plan) => plan,
584 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
585 };
586
587 let id_ts = self.get_catalog_write_ts().await;
588 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
589
590 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
591 let of_source = ResolvedItemName::Item {
592 id: item_id,
593 qualifiers: source_plan.name.qualifiers.clone(),
594 full_name: source_full_name,
595 print_id: true,
596 version: RelationVersionSelector::Latest,
597 };
598
599 let conn_catalog = self.catalog().for_system_session();
601 let pcx = plan::PlanContext::zero();
602 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
603
604 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
605
606 for subsource_stmt in subsource_stmts.iter_mut() {
607 subsource_stmt
608 .with_options
609 .extend(propagated_with_options.iter().cloned())
610 }
611
612 create_source_plans.push(CreateSourcePlanBundle {
613 item_id,
614 global_id,
615 plan: source_plan,
616 resolved_ids: resolved_ids.clone(),
617 available_source_references: Some(available_source_references),
618 });
619
620 let id_ts = self.get_catalog_write_ts().await;
622 let ids = self
623 .catalog_mut()
624 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
625 .await?;
626 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
627 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
628 create_source_plans.push(plan);
629 }
630
631 Ok((
632 Plan::CreateSources(create_source_plans),
633 ResolvedIds::empty(),
634 ))
635 }
636
637 #[instrument]
638 pub(super) async fn sequence_create_source(
639 &mut self,
640 ctx: &mut ExecuteContext,
641 plans: Vec<plan::CreateSourcePlanBundle>,
642 ) -> Result<ExecuteResponse, AdapterError> {
643 let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
644 let CreateSourceInner {
645 ops,
646 sources,
647 if_not_exists_ids,
648 } = self.create_source_inner(ctx.session(), plans).await?;
649
650 let transact_result = self
651 .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
652 Box::pin(async move {
653 let mut collections = Vec::with_capacity(sources.len());
665
666 for (item_id, source) in sources {
667 let source_status_item_id =
668 coord.catalog().resolve_builtin_storage_collection(
669 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
670 );
671 let source_status_collection_id = Some(
672 coord
673 .catalog()
674 .get_entry(&source_status_item_id)
675 .latest_global_id(),
676 );
677
678 let (data_source, status_collection_id) = match source.data_source {
679 DataSourceDesc::Ingestion { desc, cluster_id } => {
680 let desc = desc.into_inline_connection(coord.catalog().state());
681
682 let ingestion =
683 IngestionDescription::new(desc, cluster_id, source.global_id);
684
685 (
686 DataSource::Ingestion(ingestion),
687 source_status_collection_id,
688 )
689 }
690 DataSourceDesc::OldSyntaxIngestion {
691 desc,
692 progress_subsource,
693 data_config,
694 details,
695 cluster_id,
696 } => {
697 let desc = desc.into_inline_connection(coord.catalog().state());
698 let data_config =
699 data_config.into_inline_connection(coord.catalog().state());
700 let progress_subsource = coord
703 .catalog()
704 .get_entry(&progress_subsource)
705 .latest_global_id();
706
707 let mut ingestion =
708 IngestionDescription::new(desc, cluster_id, progress_subsource);
709 let legacy_export = SourceExport {
710 storage_metadata: (),
711 data_config,
712 details,
713 };
714 ingestion
715 .source_exports
716 .insert(source.global_id, legacy_export);
717
718 (
719 DataSource::Ingestion(ingestion),
720 source_status_collection_id,
721 )
722 }
723 DataSourceDesc::IngestionExport {
724 ingestion_id,
725 external_reference: _,
726 details,
727 data_config,
728 } => {
729 let ingestion_id =
732 coord.catalog().get_entry(&ingestion_id).latest_global_id();
733 (
734 DataSource::IngestionExport {
735 ingestion_id,
736 details,
737 data_config: data_config
738 .into_inline_connection(coord.catalog().state()),
739 },
740 source_status_collection_id,
741 )
742 }
743 DataSourceDesc::Progress => (DataSource::Progress, None),
744 DataSourceDesc::Webhook { .. } => {
745 if let Some(url) =
746 coord.catalog().state().try_get_webhook_url(&item_id)
747 {
748 if let Some(ctx) = ctx.as_ref() {
749 ctx.session()
750 .add_notice(AdapterNotice::WebhookSourceCreated { url })
751 }
752 }
753
754 (DataSource::Webhook, None)
755 }
756 DataSourceDesc::Introspection(_) => {
757 unreachable!(
758 "cannot create sources with introspection data sources"
759 )
760 }
761 };
762
763 collections.push((
764 source.global_id,
765 CollectionDescription::<Timestamp> {
766 desc: source.desc.clone(),
767 data_source,
768 timeline: Some(source.timeline),
769 since: None,
770 status_collection_id,
771 },
772 ));
773 }
774
775 let storage_metadata = coord.catalog.state().storage_metadata();
776
777 coord
778 .controller
779 .storage
780 .create_collections(storage_metadata, None, collections)
781 .await
782 .unwrap_or_terminate("cannot fail to create collections");
783
784 let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
800 for (compaction_window, storage_policies) in read_policies {
801 coord
802 .initialize_storage_read_policies(storage_policies, compaction_window)
803 .await;
804 }
805 })
806 })
807 .await;
808
809 match transact_result {
810 Ok(()) => Ok(ExecuteResponse::CreatedSource),
811 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
812 kind:
813 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
814 })) if if_not_exists_ids.contains_key(&id) => {
815 ctx.session()
816 .add_notice(AdapterNotice::ObjectAlreadyExists {
817 name: if_not_exists_ids[&id].item.clone(),
818 ty: "source",
819 });
820 Ok(ExecuteResponse::CreatedSource)
821 }
822 Err(err) => Err(err),
823 }
824 }
825
826 #[instrument]
827 pub(super) async fn sequence_create_connection(
828 &mut self,
829 mut ctx: ExecuteContext,
830 plan: plan::CreateConnectionPlan,
831 resolved_ids: ResolvedIds,
832 ) {
833 let id_ts = self.get_catalog_write_ts().await;
834 let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
835 {
836 Ok(item_id) => item_id,
837 Err(err) => return ctx.retire(Err(err.into())),
838 };
839
840 match &plan.connection.details {
841 ConnectionDetails::Ssh { key_1, key_2, .. } => {
842 let key_1 = match key_1.as_key_pair() {
843 Some(key_1) => key_1.clone(),
844 None => {
845 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
846 "the PUBLIC KEY 1 option cannot be explicitly specified"
847 ))));
848 }
849 };
850
851 let key_2 = match key_2.as_key_pair() {
852 Some(key_2) => key_2.clone(),
853 None => {
854 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
855 "the PUBLIC KEY 2 option cannot be explicitly specified"
856 ))));
857 }
858 };
859
860 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
861 let secret = key_set.to_bytes();
862 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
863 return ctx.retire(Err(err.into()));
864 }
865 }
866 _ => (),
867 };
868
869 if plan.validate {
870 let internal_cmd_tx = self.internal_cmd_tx.clone();
871 let transient_revision = self.catalog().transient_revision();
872 let conn_id = ctx.session().conn_id().clone();
873 let otel_ctx = OpenTelemetryContext::obtain();
874 let role_metadata = ctx.session().role_metadata().clone();
875
876 let connection = plan
877 .connection
878 .details
879 .to_connection()
880 .into_inline_connection(self.catalog().state());
881
882 let current_storage_parameters = self.controller.storage.config().clone();
883 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
884 let result = match connection
885 .validate(connection_id, ¤t_storage_parameters)
886 .await
887 {
888 Ok(()) => Ok(plan),
889 Err(err) => Err(err.into()),
890 };
891
892 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
894 CreateConnectionValidationReady {
895 ctx,
896 result,
897 connection_id,
898 connection_gid,
899 plan_validity: PlanValidity::new(
900 transient_revision,
901 resolved_ids.items().copied().collect(),
902 None,
903 None,
904 role_metadata,
905 ),
906 otel_ctx,
907 resolved_ids: resolved_ids.clone(),
908 },
909 ));
910 if let Err(e) = result {
911 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
912 }
913 });
914 } else {
915 let result = self
916 .sequence_create_connection_stage_finish(
917 &mut ctx,
918 connection_id,
919 connection_gid,
920 plan,
921 resolved_ids,
922 )
923 .await;
924 ctx.retire(result);
925 }
926 }
927
928 #[instrument]
929 pub(crate) async fn sequence_create_connection_stage_finish(
930 &mut self,
931 ctx: &mut ExecuteContext,
932 connection_id: CatalogItemId,
933 connection_gid: GlobalId,
934 plan: plan::CreateConnectionPlan,
935 resolved_ids: ResolvedIds,
936 ) -> Result<ExecuteResponse, AdapterError> {
937 let ops = vec![catalog::Op::CreateItem {
938 id: connection_id,
939 name: plan.name.clone(),
940 item: CatalogItem::Connection(Connection {
941 create_sql: plan.connection.create_sql,
942 global_id: connection_gid,
943 details: plan.connection.details.clone(),
944 resolved_ids,
945 }),
946 owner_id: *ctx.session().current_role_id(),
947 }];
948
949 let transact_result = self
950 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
951 Box::pin(async move {
952 match plan.connection.details {
953 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
954 let spec = VpcEndpointConfig {
955 aws_service_name: privatelink.service_name.to_owned(),
956 availability_zone_ids: privatelink.availability_zones.to_owned(),
957 };
958 let cloud_resource_controller =
959 match coord.cloud_resource_controller.as_ref().cloned() {
960 Some(controller) => controller,
961 None => {
962 tracing::warn!("AWS PrivateLink connections unsupported");
963 return;
964 }
965 };
966 if let Err(err) = cloud_resource_controller
967 .ensure_vpc_endpoint(connection_id, spec)
968 .await
969 {
970 tracing::warn!(?err, "failed to ensure vpc endpoint!");
971 }
972 }
973 _ => {}
974 }
975 })
976 })
977 .await;
978
979 match transact_result {
980 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
981 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
982 kind:
983 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
984 })) if plan.if_not_exists => {
985 ctx.session()
986 .add_notice(AdapterNotice::ObjectAlreadyExists {
987 name: plan.name.item,
988 ty: "connection",
989 });
990 Ok(ExecuteResponse::CreatedConnection)
991 }
992 Err(err) => Err(err),
993 }
994 }
995
996 #[instrument]
997 pub(super) async fn sequence_create_database(
998 &mut self,
999 session: &mut Session,
1000 plan: plan::CreateDatabasePlan,
1001 ) -> Result<ExecuteResponse, AdapterError> {
1002 let ops = vec![catalog::Op::CreateDatabase {
1003 name: plan.name.clone(),
1004 owner_id: *session.current_role_id(),
1005 }];
1006 match self.catalog_transact(Some(session), ops).await {
1007 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
1008 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1009 kind:
1010 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
1011 })) if plan.if_not_exists => {
1012 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
1013 Ok(ExecuteResponse::CreatedDatabase)
1014 }
1015 Err(err) => Err(err),
1016 }
1017 }
1018
1019 #[instrument]
1020 pub(super) async fn sequence_create_schema(
1021 &mut self,
1022 session: &mut Session,
1023 plan: plan::CreateSchemaPlan,
1024 ) -> Result<ExecuteResponse, AdapterError> {
1025 let op = catalog::Op::CreateSchema {
1026 database_id: plan.database_spec,
1027 schema_name: plan.schema_name.clone(),
1028 owner_id: *session.current_role_id(),
1029 };
1030 match self.catalog_transact(Some(session), vec![op]).await {
1031 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1032 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1033 kind:
1034 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1035 })) if plan.if_not_exists => {
1036 session.add_notice(AdapterNotice::SchemaAlreadyExists {
1037 name: plan.schema_name,
1038 });
1039 Ok(ExecuteResponse::CreatedSchema)
1040 }
1041 Err(err) => Err(err),
1042 }
1043 }
1044
1045 fn validate_role_attributes(&self, attributes: &RoleAttributesRaw) -> Result<(), AdapterError> {
1047 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1048 if attributes.superuser.is_some()
1049 || attributes.password.is_some()
1050 || attributes.login.is_some()
1051 {
1052 return Err(AdapterError::UnavailableFeature {
1053 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1054 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1055 });
1056 }
1057 }
1058 Ok(())
1059 }
1060
1061 #[instrument]
1062 pub(super) async fn sequence_create_role(
1063 &mut self,
1064 conn_id: Option<&ConnectionId>,
1065 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1066 ) -> Result<ExecuteResponse, AdapterError> {
1067 self.validate_role_attributes(&attributes.clone())?;
1068 let op = catalog::Op::CreateRole { name, attributes };
1069 self.catalog_transact_conn(conn_id, vec![op])
1070 .await
1071 .map(|_| ExecuteResponse::CreatedRole)
1072 }
1073
1074 #[instrument]
1075 pub(super) async fn sequence_create_network_policy(
1076 &mut self,
1077 session: &Session,
1078 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1079 ) -> Result<ExecuteResponse, AdapterError> {
1080 let op = catalog::Op::CreateNetworkPolicy {
1081 rules,
1082 name,
1083 owner_id: *session.current_role_id(),
1084 };
1085 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1086 .await
1087 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1088 }
1089
1090 #[instrument]
1091 pub(super) async fn sequence_alter_network_policy(
1092 &mut self,
1093 session: &Session,
1094 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1095 ) -> Result<ExecuteResponse, AdapterError> {
1096 let current_network_policy_name = self
1098 .owned_catalog()
1099 .system_config()
1100 .default_network_policy_name();
1101 if current_network_policy_name == name {
1103 self.validate_alter_network_policy(session, &rules)?;
1104 }
1105
1106 let op = catalog::Op::AlterNetworkPolicy {
1107 id,
1108 rules,
1109 name,
1110 owner_id: *session.current_role_id(),
1111 };
1112 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1113 .await
1114 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1115 }
1116
1117 #[instrument]
1118 pub(super) async fn sequence_create_table(
1119 &mut self,
1120 ctx: &mut ExecuteContext,
1121 plan: plan::CreateTablePlan,
1122 resolved_ids: ResolvedIds,
1123 ) -> Result<ExecuteResponse, AdapterError> {
1124 let plan::CreateTablePlan {
1125 name,
1126 table,
1127 if_not_exists,
1128 } = plan;
1129
1130 let conn_id = if table.temporary {
1131 Some(ctx.session().conn_id())
1132 } else {
1133 None
1134 };
1135 let id_ts = self.get_catalog_write_ts().await;
1136 let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1137 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1138
1139 let data_source = match table.data_source {
1140 plan::TableDataSource::TableWrites { defaults } => {
1141 TableDataSource::TableWrites { defaults }
1142 }
1143 plan::TableDataSource::DataSource {
1144 desc: data_source_plan,
1145 timeline,
1146 } => match data_source_plan {
1147 plan::DataSourceDesc::IngestionExport {
1148 ingestion_id,
1149 external_reference,
1150 details,
1151 data_config,
1152 } => TableDataSource::DataSource {
1153 desc: DataSourceDesc::IngestionExport {
1154 ingestion_id,
1155 external_reference,
1156 details,
1157 data_config,
1158 },
1159 timeline,
1160 },
1161 plan::DataSourceDesc::Webhook {
1162 validate_using,
1163 body_format,
1164 headers,
1165 cluster_id,
1166 } => TableDataSource::DataSource {
1167 desc: DataSourceDesc::Webhook {
1168 validate_using,
1169 body_format,
1170 headers,
1171 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1172 },
1173 timeline,
1174 },
1175 o => {
1176 unreachable!("CREATE TABLE data source got {:?}", o)
1177 }
1178 },
1179 };
1180 let table = Table {
1181 create_sql: Some(table.create_sql),
1182 desc: table.desc,
1183 collections,
1184 conn_id: conn_id.cloned(),
1185 resolved_ids,
1186 custom_logical_compaction_window: table.compaction_window,
1187 is_retained_metrics_object: false,
1188 data_source,
1189 };
1190 let ops = vec![catalog::Op::CreateItem {
1191 id: table_id,
1192 name: name.clone(),
1193 item: CatalogItem::Table(table.clone()),
1194 owner_id: *ctx.session().current_role_id(),
1195 }];
1196
1197 let catalog_result = self
1198 .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1199 Box::pin(async move {
1200 let (collections, register_ts, read_policies) = match table.data_source {
1204 TableDataSource::TableWrites { defaults: _ } => {
1205 let register_ts = coord.get_local_write_ts().await.timestamp;
1207
1208 coord
1215 .catalog
1216 .confirm_leadership()
1217 .await
1218 .unwrap_or_terminate("unable to confirm leadership");
1219
1220 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1221 coord.set_statement_execution_timestamp(id, register_ts);
1222 }
1223
1224 let relation_version = RelationVersion::root();
1226 assert_eq!(table.desc.latest_version(), relation_version);
1227 let relation_desc = table
1228 .desc
1229 .at_version(RelationVersionSelector::Specific(relation_version));
1230 let collection_desc =
1232 CollectionDescription::for_table(relation_desc, None);
1233 let collections = vec![(global_id, collection_desc)];
1234
1235 let compaction_window = table
1236 .custom_logical_compaction_window
1237 .unwrap_or(CompactionWindow::Default);
1238 let read_policies =
1239 BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1240
1241 (collections, Some(register_ts), read_policies)
1242 }
1243 TableDataSource::DataSource {
1244 desc: data_source,
1245 timeline,
1246 } => {
1247 match data_source {
1248 DataSourceDesc::IngestionExport {
1249 ingestion_id,
1250 external_reference: _,
1251 details,
1252 data_config,
1253 } => {
1254 let source_status_item_id =
1258 coord.catalog().resolve_builtin_storage_collection(
1259 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1260 );
1261 let status_collection_id = Some(
1262 coord
1263 .catalog()
1264 .get_entry(&source_status_item_id)
1265 .latest_global_id(),
1266 );
1267 let ingestion_id =
1270 coord.catalog().get_entry(&ingestion_id).latest_global_id();
1271 let collection_desc = CollectionDescription::<Timestamp> {
1273 desc: table
1274 .desc
1275 .at_version(RelationVersionSelector::Latest),
1276 data_source: DataSource::IngestionExport {
1277 ingestion_id,
1278 details,
1279 data_config: data_config
1280 .into_inline_connection(coord.catalog.state()),
1281 },
1282 since: None,
1283 status_collection_id,
1284 timeline: Some(timeline.clone()),
1285 };
1286
1287 let collections = vec![(global_id, collection_desc)];
1288 let read_policies = coord
1289 .catalog()
1290 .state()
1291 .source_compaction_windows(vec![table_id]);
1292
1293 (collections, None, read_policies)
1294 }
1295 DataSourceDesc::Webhook { .. } => {
1296 if let Some(url) =
1297 coord.catalog().state().try_get_webhook_url(&table_id)
1298 {
1299 if let Some(ctx) = ctx.as_ref() {
1300 ctx.session().add_notice(
1301 AdapterNotice::WebhookSourceCreated { url },
1302 )
1303 }
1304 }
1305
1306 assert_eq!(
1308 table.desc.latest_version(),
1309 RelationVersion::root(),
1310 "found webhook with more than 1 relation version, {:?}",
1311 table.desc
1312 );
1313 let desc = table.desc.latest();
1314
1315 let collection_desc = CollectionDescription {
1316 desc,
1317 data_source: DataSource::Webhook,
1318 since: None,
1319 status_collection_id: None,
1320 timeline: Some(timeline.clone()),
1321 };
1322 let collections = vec![(global_id, collection_desc)];
1323 let read_policies = coord
1324 .catalog()
1325 .state()
1326 .source_compaction_windows(vec![table_id]);
1327
1328 (collections, None, read_policies)
1329 }
1330 _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1331 }
1332 }
1333 };
1334
1335 let storage_metadata = coord.catalog.state().storage_metadata();
1337 coord
1338 .controller
1339 .storage
1340 .create_collections(storage_metadata, register_ts, collections)
1341 .await
1342 .unwrap_or_terminate("cannot fail to create collections");
1343
1344 if let Some(register_ts) = register_ts {
1346 coord.apply_local_write(register_ts).await;
1347 }
1348
1349 for (compaction_window, storage_policies) in read_policies {
1351 coord
1352 .initialize_storage_read_policies(storage_policies, compaction_window)
1353 .await;
1354 }
1355 })
1356 })
1357 .await;
1358
1359 match catalog_result {
1360 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1361 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1362 kind:
1363 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1364 })) if if_not_exists => {
1365 ctx.session_mut()
1366 .add_notice(AdapterNotice::ObjectAlreadyExists {
1367 name: name.item,
1368 ty: "table",
1369 });
1370 Ok(ExecuteResponse::CreatedTable)
1371 }
1372 Err(err) => Err(err),
1373 }
1374 }
1375
1376 #[instrument]
1377 pub(super) async fn sequence_create_sink(
1378 &mut self,
1379 ctx: ExecuteContext,
1380 plan: plan::CreateSinkPlan,
1381 resolved_ids: ResolvedIds,
1382 ) {
1383 let plan::CreateSinkPlan {
1384 name,
1385 sink,
1386 with_snapshot,
1387 if_not_exists,
1388 in_cluster,
1389 } = plan;
1390
1391 let id_ts = self.get_catalog_write_ts().await;
1393 let (item_id, global_id) =
1394 return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1395
1396 let catalog_sink = Sink {
1397 create_sql: sink.create_sql,
1398 global_id,
1399 from: sink.from,
1400 connection: sink.connection,
1401 envelope: sink.envelope,
1402 version: sink.version,
1403 with_snapshot,
1404 resolved_ids,
1405 cluster_id: in_cluster,
1406 };
1407
1408 let ops = vec![catalog::Op::CreateItem {
1409 id: item_id,
1410 name: name.clone(),
1411 item: CatalogItem::Sink(catalog_sink.clone()),
1412 owner_id: *ctx.session().current_role_id(),
1413 }];
1414
1415 let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1416 if let Err(e) = self
1417 .controller
1418 .storage
1419 .check_exists(sink.from)
1420 .map_err(|e| match e {
1421 StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1422 "{} is a {}, which cannot be exported as a sink",
1423 from.name().item.clone(),
1424 from.item().typ().to_string(),
1425 )),
1426 e => AdapterError::Storage(e),
1427 })
1428 {
1429 ctx.retire(Err(e));
1430 return;
1431 }
1432
1433 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1434
1435 match result {
1436 Ok(()) => {}
1437 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1438 kind:
1439 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1440 })) if if_not_exists => {
1441 ctx.session()
1442 .add_notice(AdapterNotice::ObjectAlreadyExists {
1443 name: name.item,
1444 ty: "sink",
1445 });
1446 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1447 return;
1448 }
1449 Err(e) => {
1450 ctx.retire(Err(e));
1451 return;
1452 }
1453 };
1454
1455 self.create_storage_export(global_id, &catalog_sink)
1456 .await
1457 .unwrap_or_terminate("cannot fail to create exports");
1458
1459 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1460 .await;
1461
1462 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1463 }
1464
1465 pub(super) fn validate_system_column_references(
1488 &self,
1489 uses_ambiguous_columns: bool,
1490 depends_on: &BTreeSet<GlobalId>,
1491 ) -> Result<(), AdapterError> {
1492 if uses_ambiguous_columns
1493 && depends_on
1494 .iter()
1495 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1496 {
1497 Err(AdapterError::AmbiguousSystemColumnReference)
1498 } else {
1499 Ok(())
1500 }
1501 }
1502
1503 #[instrument]
1504 pub(super) async fn sequence_create_type(
1505 &mut self,
1506 session: &Session,
1507 plan: plan::CreateTypePlan,
1508 resolved_ids: ResolvedIds,
1509 ) -> Result<ExecuteResponse, AdapterError> {
1510 let id_ts = self.get_catalog_write_ts().await;
1511 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1512 let typ = Type {
1513 create_sql: Some(plan.typ.create_sql),
1514 global_id,
1515 desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1516 details: CatalogTypeDetails {
1517 array_id: None,
1518 typ: plan.typ.inner,
1519 pg_metadata: None,
1520 },
1521 resolved_ids,
1522 };
1523 let op = catalog::Op::CreateItem {
1524 id: item_id,
1525 name: plan.name,
1526 item: CatalogItem::Type(typ),
1527 owner_id: *session.current_role_id(),
1528 };
1529 match self.catalog_transact(Some(session), vec![op]).await {
1530 Ok(()) => Ok(ExecuteResponse::CreatedType),
1531 Err(err) => Err(err),
1532 }
1533 }
1534
1535 #[instrument]
1536 pub(super) async fn sequence_comment_on(
1537 &mut self,
1538 session: &Session,
1539 plan: plan::CommentPlan,
1540 ) -> Result<ExecuteResponse, AdapterError> {
1541 let op = catalog::Op::Comment {
1542 object_id: plan.object_id,
1543 sub_component: plan.sub_component,
1544 comment: plan.comment,
1545 };
1546 self.catalog_transact(Some(session), vec![op]).await?;
1547 Ok(ExecuteResponse::Comment)
1548 }
1549
1550 #[instrument]
1551 pub(super) async fn sequence_drop_objects(
1552 &mut self,
1553 session: &Session,
1554 plan::DropObjectsPlan {
1555 drop_ids,
1556 object_type,
1557 referenced_ids,
1558 }: plan::DropObjectsPlan,
1559 ) -> Result<ExecuteResponse, AdapterError> {
1560 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1561 let mut objects = Vec::new();
1562 for obj_id in &drop_ids {
1563 if !referenced_ids_hashset.contains(obj_id) {
1564 let object_info = ErrorMessageObjectDescription::from_id(
1565 obj_id,
1566 &self.catalog().for_session(session),
1567 )
1568 .to_string();
1569 objects.push(object_info);
1570 }
1571 }
1572
1573 if !objects.is_empty() {
1574 session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1575 }
1576
1577 let DropOps {
1578 ops,
1579 dropped_active_db,
1580 dropped_active_cluster,
1581 dropped_in_use_indexes,
1582 } = self.sequence_drop_common(session, drop_ids)?;
1583
1584 self.catalog_transact(Some(session), ops).await?;
1585
1586 fail::fail_point!("after_sequencer_drop_replica");
1587
1588 if dropped_active_db {
1589 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1590 name: session.vars().database().to_string(),
1591 });
1592 }
1593 if dropped_active_cluster {
1594 session.add_notice(AdapterNotice::DroppedActiveCluster {
1595 name: session.vars().cluster().to_string(),
1596 });
1597 }
1598 for dropped_in_use_index in dropped_in_use_indexes {
1599 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1600 self.metrics
1601 .optimization_notices
1602 .with_label_values(&["DroppedInUseIndex"])
1603 .inc_by(1);
1604 }
1605 Ok(ExecuteResponse::DroppedObject(object_type))
1606 }
1607
1608 fn validate_dropped_role_ownership(
1609 &self,
1610 session: &Session,
1611 dropped_roles: &BTreeMap<RoleId, &str>,
1612 ) -> Result<(), AdapterError> {
1613 fn privilege_check(
1614 privileges: &PrivilegeMap,
1615 dropped_roles: &BTreeMap<RoleId, &str>,
1616 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1617 object_id: &SystemObjectId,
1618 catalog: &ConnCatalog,
1619 ) {
1620 for privilege in privileges.all_values() {
1621 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1622 let grantor_name = catalog.get_role(&privilege.grantor).name();
1623 let object_description =
1624 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1625 dependent_objects
1626 .entry(role_name.to_string())
1627 .or_default()
1628 .push(format!(
1629 "privileges on {object_description} granted by {grantor_name}",
1630 ));
1631 }
1632 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1633 let grantee_name = catalog.get_role(&privilege.grantee).name();
1634 let object_description =
1635 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1636 dependent_objects
1637 .entry(role_name.to_string())
1638 .or_default()
1639 .push(format!(
1640 "privileges granted on {object_description} to {grantee_name}"
1641 ));
1642 }
1643 }
1644 }
1645
1646 let catalog = self.catalog().for_session(session);
1647 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1648 for entry in self.catalog.entries() {
1649 let id = SystemObjectId::Object(entry.id().into());
1650 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1651 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1652 dependent_objects
1653 .entry(role_name.to_string())
1654 .or_default()
1655 .push(format!("owner of {object_description}"));
1656 }
1657 privilege_check(
1658 entry.privileges(),
1659 dropped_roles,
1660 &mut dependent_objects,
1661 &id,
1662 &catalog,
1663 );
1664 }
1665 for database in self.catalog.databases() {
1666 let database_id = SystemObjectId::Object(database.id().into());
1667 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1668 let object_description =
1669 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1670 dependent_objects
1671 .entry(role_name.to_string())
1672 .or_default()
1673 .push(format!("owner of {object_description}"));
1674 }
1675 privilege_check(
1676 &database.privileges,
1677 dropped_roles,
1678 &mut dependent_objects,
1679 &database_id,
1680 &catalog,
1681 );
1682 for schema in database.schemas_by_id.values() {
1683 let schema_id = SystemObjectId::Object(
1684 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1685 );
1686 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1687 let object_description =
1688 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1689 dependent_objects
1690 .entry(role_name.to_string())
1691 .or_default()
1692 .push(format!("owner of {object_description}"));
1693 }
1694 privilege_check(
1695 &schema.privileges,
1696 dropped_roles,
1697 &mut dependent_objects,
1698 &schema_id,
1699 &catalog,
1700 );
1701 }
1702 }
1703 for cluster in self.catalog.clusters() {
1704 let cluster_id = SystemObjectId::Object(cluster.id().into());
1705 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1706 let object_description =
1707 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1708 dependent_objects
1709 .entry(role_name.to_string())
1710 .or_default()
1711 .push(format!("owner of {object_description}"));
1712 }
1713 privilege_check(
1714 &cluster.privileges,
1715 dropped_roles,
1716 &mut dependent_objects,
1717 &cluster_id,
1718 &catalog,
1719 );
1720 for replica in cluster.replicas() {
1721 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1722 let replica_id =
1723 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1724 let object_description =
1725 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1726 dependent_objects
1727 .entry(role_name.to_string())
1728 .or_default()
1729 .push(format!("owner of {object_description}"));
1730 }
1731 }
1732 }
1733 privilege_check(
1734 self.catalog().system_privileges(),
1735 dropped_roles,
1736 &mut dependent_objects,
1737 &SystemObjectId::System,
1738 &catalog,
1739 );
1740 for (default_privilege_object, default_privilege_acl_items) in
1741 self.catalog.default_privileges()
1742 {
1743 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1744 dependent_objects
1745 .entry(role_name.to_string())
1746 .or_default()
1747 .push(format!(
1748 "default privileges on {}S created by {}",
1749 default_privilege_object.object_type, role_name
1750 ));
1751 }
1752 for default_privilege_acl_item in default_privilege_acl_items {
1753 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1754 dependent_objects
1755 .entry(role_name.to_string())
1756 .or_default()
1757 .push(format!(
1758 "default privileges on {}S granted to {}",
1759 default_privilege_object.object_type, role_name
1760 ));
1761 }
1762 }
1763 }
1764
1765 if !dependent_objects.is_empty() {
1766 Err(AdapterError::DependentObject(dependent_objects))
1767 } else {
1768 Ok(())
1769 }
1770 }
1771
1772 #[instrument]
1773 pub(super) async fn sequence_drop_owned(
1774 &mut self,
1775 session: &Session,
1776 plan: plan::DropOwnedPlan,
1777 ) -> Result<ExecuteResponse, AdapterError> {
1778 for role_id in &plan.role_ids {
1779 self.catalog().ensure_not_reserved_role(role_id)?;
1780 }
1781
1782 let mut privilege_revokes = plan.privilege_revokes;
1783
1784 let session_catalog = self.catalog().for_session(session);
1786 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1787 && !session.is_superuser()
1788 {
1789 let role_membership =
1791 session_catalog.collect_role_membership(session.current_role_id());
1792 let invalid_revokes: BTreeSet<_> = privilege_revokes
1793 .extract_if(.., |(_, privilege)| {
1794 !role_membership.contains(&privilege.grantor)
1795 })
1796 .map(|(object_id, _)| object_id)
1797 .collect();
1798 for invalid_revoke in invalid_revokes {
1799 let object_description =
1800 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1801 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1802 }
1803 }
1804
1805 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1806 catalog::Op::UpdatePrivilege {
1807 target_id: object_id,
1808 privilege,
1809 variant: UpdatePrivilegeVariant::Revoke,
1810 }
1811 });
1812 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1813 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1814 privilege_object,
1815 privilege_acl_item,
1816 variant: UpdatePrivilegeVariant::Revoke,
1817 },
1818 );
1819 let DropOps {
1820 ops: drop_ops,
1821 dropped_active_db,
1822 dropped_active_cluster,
1823 dropped_in_use_indexes,
1824 } = self.sequence_drop_common(session, plan.drop_ids)?;
1825
1826 let ops = privilege_revoke_ops
1827 .chain(default_privilege_revoke_ops)
1828 .chain(drop_ops.into_iter())
1829 .collect();
1830
1831 self.catalog_transact(Some(session), ops).await?;
1832
1833 if dropped_active_db {
1834 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1835 name: session.vars().database().to_string(),
1836 });
1837 }
1838 if dropped_active_cluster {
1839 session.add_notice(AdapterNotice::DroppedActiveCluster {
1840 name: session.vars().cluster().to_string(),
1841 });
1842 }
1843 for dropped_in_use_index in dropped_in_use_indexes {
1844 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1845 }
1846 Ok(ExecuteResponse::DroppedOwned)
1847 }
1848
1849 fn sequence_drop_common(
1850 &self,
1851 session: &Session,
1852 ids: Vec<ObjectId>,
1853 ) -> Result<DropOps, AdapterError> {
1854 let mut dropped_active_db = false;
1855 let mut dropped_active_cluster = false;
1856 let mut dropped_in_use_indexes = Vec::new();
1857 let mut dropped_roles = BTreeMap::new();
1858 let mut dropped_databases = BTreeSet::new();
1859 let mut dropped_schemas = BTreeSet::new();
1860 let mut role_revokes = BTreeSet::new();
1864 let mut default_privilege_revokes = BTreeSet::new();
1867
1868 let mut clusters_to_drop = BTreeSet::new();
1870
1871 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1872 for id in &ids {
1873 match id {
1874 ObjectId::Database(id) => {
1875 let name = self.catalog().get_database(id).name();
1876 if name == session.vars().database() {
1877 dropped_active_db = true;
1878 }
1879 dropped_databases.insert(id);
1880 }
1881 ObjectId::Schema((_, spec)) => {
1882 if let SchemaSpecifier::Id(id) = spec {
1883 dropped_schemas.insert(id);
1884 }
1885 }
1886 ObjectId::Cluster(id) => {
1887 clusters_to_drop.insert(*id);
1888 if let Some(active_id) = self
1889 .catalog()
1890 .active_cluster(session)
1891 .ok()
1892 .map(|cluster| cluster.id())
1893 {
1894 if id == &active_id {
1895 dropped_active_cluster = true;
1896 }
1897 }
1898 }
1899 ObjectId::Role(id) => {
1900 let role = self.catalog().get_role(id);
1901 let name = role.name();
1902 dropped_roles.insert(*id, name);
1903 for (group_id, grantor_id) in &role.membership.map {
1905 role_revokes.insert((*group_id, *id, *grantor_id));
1906 }
1907 }
1908 ObjectId::Item(id) => {
1909 if let Some(index) = self.catalog().get_entry(id).index() {
1910 let humanizer = self.catalog().for_session(session);
1911 let dependants = self
1912 .controller
1913 .compute
1914 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1915 .ok()
1916 .into_iter()
1917 .flatten()
1918 .filter(|dependant_id| {
1919 if dependant_id.is_transient() {
1926 return false;
1927 }
1928 let Some(dependent_id) = humanizer
1930 .try_get_item_by_global_id(dependant_id)
1931 .map(|item| item.id())
1932 else {
1933 return false;
1934 };
1935 !ids_set.contains(&ObjectId::Item(dependent_id))
1938 })
1939 .flat_map(|dependant_id| {
1940 humanizer.humanize_id(dependant_id)
1944 })
1945 .collect_vec();
1946 if !dependants.is_empty() {
1947 dropped_in_use_indexes.push(DroppedInUseIndex {
1948 index_name: humanizer
1949 .humanize_id(index.global_id())
1950 .unwrap_or_else(|| id.to_string()),
1951 dependant_objects: dependants,
1952 });
1953 }
1954 }
1955 }
1956 _ => {}
1957 }
1958 }
1959
1960 for id in &ids {
1961 match id {
1962 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1966 if !clusters_to_drop.contains(cluster_id) {
1967 let cluster = self.catalog.get_cluster(*cluster_id);
1968 if cluster.is_managed() {
1969 let replica =
1970 cluster.replica(*replica_id).expect("Catalog out of sync");
1971 if !replica.config.location.internal() {
1972 coord_bail!("cannot drop replica of managed cluster");
1973 }
1974 }
1975 }
1976 }
1977 _ => {}
1978 }
1979 }
1980
1981 for role_id in dropped_roles.keys() {
1982 self.catalog().ensure_not_reserved_role(role_id)?;
1983 }
1984 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1985 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1987 for role in self.catalog().user_roles() {
1988 for dropped_role_id in
1989 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1990 {
1991 role_revokes.insert((
1992 **dropped_role_id,
1993 role.id(),
1994 *role
1995 .membership
1996 .map
1997 .get(*dropped_role_id)
1998 .expect("included in keys above"),
1999 ));
2000 }
2001 }
2002
2003 for (default_privilege_object, default_privilege_acls) in
2004 self.catalog().default_privileges()
2005 {
2006 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
2007 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
2008 {
2009 for default_privilege_acl in default_privilege_acls {
2010 default_privilege_revokes.insert((
2011 default_privilege_object.clone(),
2012 default_privilege_acl.clone(),
2013 ));
2014 }
2015 }
2016 }
2017
2018 let ops = role_revokes
2019 .into_iter()
2020 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2021 role_id,
2022 member_id,
2023 grantor_id,
2024 })
2025 .chain(default_privilege_revokes.into_iter().map(
2026 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2027 privilege_object,
2028 privilege_acl_item,
2029 variant: UpdatePrivilegeVariant::Revoke,
2030 },
2031 ))
2032 .chain(iter::once(catalog::Op::DropObjects(
2033 ids.into_iter()
2034 .map(DropObjectInfo::manual_drop_from_object_id)
2035 .collect(),
2036 )))
2037 .collect();
2038
2039 Ok(DropOps {
2040 ops,
2041 dropped_active_db,
2042 dropped_active_cluster,
2043 dropped_in_use_indexes,
2044 })
2045 }
2046
2047 pub(super) fn sequence_explain_schema(
2048 &self,
2049 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2050 ) -> Result<ExecuteResponse, AdapterError> {
2051 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2052 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2053 })?;
2054
2055 let json_string = json_string(&json_value);
2056 let row = Row::pack_slice(&[Datum::String(&json_string)]);
2057 Ok(Self::send_immediate_rows(row))
2058 }
2059
2060 pub(super) fn sequence_show_all_variables(
2061 &self,
2062 session: &Session,
2063 ) -> Result<ExecuteResponse, AdapterError> {
2064 let mut rows = viewable_variables(self.catalog().state(), session)
2065 .map(|v| (v.name(), v.value(), v.description()))
2066 .collect::<Vec<_>>();
2067 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2068
2069 let rows: Vec<_> = rows
2071 .into_iter()
2072 .map(|(name, val, desc)| {
2073 Row::pack_slice(&[
2074 Datum::String(name),
2075 Datum::String(&val),
2076 Datum::String(desc),
2077 ])
2078 })
2079 .collect();
2080 Ok(Self::send_immediate_rows(rows))
2081 }
2082
2083 pub(super) fn sequence_show_variable(
2084 &self,
2085 session: &Session,
2086 plan: plan::ShowVariablePlan,
2087 ) -> Result<ExecuteResponse, AdapterError> {
2088 if &plan.name == SCHEMA_ALIAS {
2089 let schemas = self.catalog.resolve_search_path(session);
2090 let schema = schemas.first();
2091 return match schema {
2092 Some((database_spec, schema_spec)) => {
2093 let schema_name = &self
2094 .catalog
2095 .get_schema(database_spec, schema_spec, session.conn_id())
2096 .name()
2097 .schema;
2098 let row = Row::pack_slice(&[Datum::String(schema_name)]);
2099 Ok(Self::send_immediate_rows(row))
2100 }
2101 None => {
2102 if session.vars().current_object_missing_warnings() {
2103 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2104 search_path: session
2105 .vars()
2106 .search_path()
2107 .into_iter()
2108 .map(|schema| schema.to_string())
2109 .collect(),
2110 });
2111 }
2112 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2113 }
2114 };
2115 }
2116
2117 let variable = session
2118 .vars()
2119 .get(self.catalog().system_config(), &plan.name)
2120 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2121
2122 variable.visible(session.user(), self.catalog().system_config())?;
2125
2126 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2127 if variable.name() == vars::DATABASE.name()
2128 && matches!(
2129 self.catalog().resolve_database(&variable.value()),
2130 Err(CatalogError::UnknownDatabase(_))
2131 )
2132 && session.vars().current_object_missing_warnings()
2133 {
2134 let name = variable.value();
2135 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2136 } else if variable.name() == vars::CLUSTER.name()
2137 && matches!(
2138 self.catalog().resolve_cluster(&variable.value()),
2139 Err(CatalogError::UnknownCluster(_))
2140 )
2141 && session.vars().current_object_missing_warnings()
2142 {
2143 let name = variable.value();
2144 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2145 }
2146 Ok(Self::send_immediate_rows(row))
2147 }
2148
2149 #[instrument]
2150 pub(super) async fn sequence_inspect_shard(
2151 &self,
2152 session: &Session,
2153 plan: plan::InspectShardPlan,
2154 ) -> Result<ExecuteResponse, AdapterError> {
2155 if !session.user().is_internal() {
2158 return Err(AdapterError::Unauthorized(
2159 rbac::UnauthorizedError::MzSystem {
2160 action: "inspect".into(),
2161 },
2162 ));
2163 }
2164 let state = self
2165 .controller
2166 .storage
2167 .inspect_persist_state(plan.id)
2168 .await?;
2169 let jsonb = Jsonb::from_serde_json(state)?;
2170 Ok(Self::send_immediate_rows(jsonb.into_row()))
2171 }
2172
2173 #[instrument]
2174 pub(super) fn sequence_set_variable(
2175 &self,
2176 session: &mut Session,
2177 plan: plan::SetVariablePlan,
2178 ) -> Result<ExecuteResponse, AdapterError> {
2179 let (name, local) = (plan.name, plan.local);
2180 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2181 self.validate_set_isolation_level(session)?;
2182 }
2183 if &name == vars::CLUSTER.name() {
2184 self.validate_set_cluster(session)?;
2185 }
2186
2187 let vars = session.vars_mut();
2188 let values = match plan.value {
2189 plan::VariableValue::Default => None,
2190 plan::VariableValue::Values(values) => Some(values),
2191 };
2192
2193 match values {
2194 Some(values) => {
2195 vars.set(
2196 self.catalog().system_config(),
2197 &name,
2198 VarInput::SqlSet(&values),
2199 local,
2200 )?;
2201
2202 let vars = session.vars();
2203
2204 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2207 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2208 } else if name == vars::CLUSTER.name()
2209 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2210 {
2211 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2212 }
2213
2214 if name.as_str() == vars::DATABASE.name()
2216 && matches!(
2217 self.catalog().resolve_database(vars.database()),
2218 Err(CatalogError::UnknownDatabase(_))
2219 )
2220 && session.vars().current_object_missing_warnings()
2221 {
2222 let name = vars.database().to_string();
2223 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2224 } else if name.as_str() == vars::CLUSTER.name()
2225 && matches!(
2226 self.catalog().resolve_cluster(vars.cluster()),
2227 Err(CatalogError::UnknownCluster(_))
2228 )
2229 && session.vars().current_object_missing_warnings()
2230 {
2231 let name = vars.cluster().to_string();
2232 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2233 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2234 let v = values.into_first().to_lowercase();
2235 if v == IsolationLevel::ReadUncommitted.as_str()
2236 || v == IsolationLevel::ReadCommitted.as_str()
2237 || v == IsolationLevel::RepeatableRead.as_str()
2238 {
2239 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2240 isolation_level: v,
2241 });
2242 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2243 session.add_notice(AdapterNotice::StrongSessionSerializable);
2244 }
2245 }
2246 }
2247 None => vars.reset(self.catalog().system_config(), &name, local)?,
2248 }
2249
2250 Ok(ExecuteResponse::SetVariable { name, reset: false })
2251 }
2252
2253 pub(super) fn sequence_reset_variable(
2254 &self,
2255 session: &mut Session,
2256 plan: plan::ResetVariablePlan,
2257 ) -> Result<ExecuteResponse, AdapterError> {
2258 let name = plan.name;
2259 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2260 self.validate_set_isolation_level(session)?;
2261 }
2262 if &name == vars::CLUSTER.name() {
2263 self.validate_set_cluster(session)?;
2264 }
2265 session
2266 .vars_mut()
2267 .reset(self.catalog().system_config(), &name, false)?;
2268 Ok(ExecuteResponse::SetVariable { name, reset: true })
2269 }
2270
2271 pub(super) fn sequence_set_transaction(
2272 &self,
2273 session: &mut Session,
2274 plan: plan::SetTransactionPlan,
2275 ) -> Result<ExecuteResponse, AdapterError> {
2276 for mode in plan.modes {
2278 match mode {
2279 TransactionMode::AccessMode(_) => {
2280 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2281 }
2282 TransactionMode::IsolationLevel(isolation_level) => {
2283 self.validate_set_isolation_level(session)?;
2284
2285 session.vars_mut().set(
2286 self.catalog().system_config(),
2287 TRANSACTION_ISOLATION_VAR_NAME,
2288 VarInput::Flat(&isolation_level.to_ast_string_stable()),
2289 plan.local,
2290 )?
2291 }
2292 }
2293 }
2294 Ok(ExecuteResponse::SetVariable {
2295 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2296 reset: false,
2297 })
2298 }
2299
2300 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2301 if session.transaction().contains_ops() {
2302 Err(AdapterError::InvalidSetIsolationLevel)
2303 } else {
2304 Ok(())
2305 }
2306 }
2307
2308 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2309 if session.transaction().contains_ops() {
2310 Err(AdapterError::InvalidSetCluster)
2311 } else {
2312 Ok(())
2313 }
2314 }
2315
2316 #[instrument]
2317 pub(super) async fn sequence_end_transaction(
2318 &mut self,
2319 mut ctx: ExecuteContext,
2320 mut action: EndTransactionAction,
2321 ) {
2322 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2324 (&action, ctx.session().transaction())
2325 {
2326 action = EndTransactionAction::Rollback;
2327 }
2328 let response = match action {
2329 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2330 params: BTreeMap::new(),
2331 }),
2332 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2333 params: BTreeMap::new(),
2334 }),
2335 };
2336
2337 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2338
2339 let (response, action) = match result {
2340 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2341 (response, action)
2342 }
2343 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2344 let validated_locks = match write_lock_guards {
2348 None => None,
2349 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2350 Ok(locks) => Some(locks),
2351 Err(missing) => {
2352 tracing::error!(?missing, "programming error, missing write locks");
2353 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2354 }
2355 },
2356 };
2357
2358 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2359 for WriteOp { id, rows } in writes {
2360 let total_rows = collected_writes.entry(id).or_default();
2361 total_rows.push(rows);
2362 }
2363
2364 self.submit_write(PendingWriteTxn::User {
2365 span: Span::current(),
2366 writes: collected_writes,
2367 write_locks: validated_locks,
2368 pending_txn: PendingTxn {
2369 ctx,
2370 response,
2371 action,
2372 },
2373 });
2374 return;
2375 }
2376 Ok((
2377 Some(TransactionOps::Peeks {
2378 determination,
2379 requires_linearization: RequireLinearization::Required,
2380 ..
2381 }),
2382 _,
2383 )) if ctx.session().vars().transaction_isolation()
2384 == &IsolationLevel::StrictSerializable =>
2385 {
2386 let conn_id = ctx.session().conn_id().clone();
2387 let pending_read_txn = PendingReadTxn {
2388 txn: PendingRead::Read {
2389 txn: PendingTxn {
2390 ctx,
2391 response,
2392 action,
2393 },
2394 },
2395 timestamp_context: determination.timestamp_context,
2396 created: Instant::now(),
2397 num_requeues: 0,
2398 otel_ctx: OpenTelemetryContext::obtain(),
2399 };
2400 self.strict_serializable_reads_tx
2401 .send((conn_id, pending_read_txn))
2402 .expect("sending to strict_serializable_reads_tx cannot fail");
2403 return;
2404 }
2405 Ok((
2406 Some(TransactionOps::Peeks {
2407 determination,
2408 requires_linearization: RequireLinearization::Required,
2409 ..
2410 }),
2411 _,
2412 )) if ctx.session().vars().transaction_isolation()
2413 == &IsolationLevel::StrongSessionSerializable =>
2414 {
2415 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2416 ctx.session_mut()
2417 .ensure_timestamp_oracle(timeline.clone())
2418 .apply_write(*ts);
2419 }
2420 (response, action)
2421 }
2422 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2423 self.internal_cmd_tx
2424 .send(Message::ExecuteSingleStatementTransaction {
2425 ctx,
2426 otel_ctx: OpenTelemetryContext::obtain(),
2427 stmt,
2428 params,
2429 })
2430 .expect("must send");
2431 return;
2432 }
2433 Ok((_, _)) => (response, action),
2434 Err(err) => (Err(err), EndTransactionAction::Rollback),
2435 };
2436 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2437 let response = response.map(|mut r| {
2439 r.extend_params(changed);
2440 ExecuteResponse::from(r)
2441 });
2442
2443 ctx.retire(response);
2444 }
2445
2446 #[instrument]
2447 async fn sequence_end_transaction_inner(
2448 &mut self,
2449 ctx: &mut ExecuteContext,
2450 action: EndTransactionAction,
2451 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2452 let txn = self.clear_transaction(ctx.session_mut()).await;
2453
2454 if let EndTransactionAction::Commit = action {
2455 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2456 match &mut ops {
2457 TransactionOps::Writes(writes) => {
2458 for WriteOp { id, .. } in &mut writes.iter() {
2459 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2461 AdapterError::Catalog(mz_catalog::memory::error::Error {
2462 kind: mz_catalog::memory::error::ErrorKind::Sql(
2463 CatalogError::UnknownItem(id.to_string()),
2464 ),
2465 })
2466 })?;
2467 }
2468
2469 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2471 }
2472 TransactionOps::DDL {
2473 ops,
2474 state: _,
2475 side_effects,
2476 revision,
2477 } => {
2478 if *revision != self.catalog().transient_revision() {
2480 return Err(AdapterError::DDLTransactionRace);
2481 }
2482 let ops = std::mem::take(ops);
2484 let side_effects = std::mem::take(side_effects);
2485 self.catalog_transact_with_side_effects(
2486 Some(ctx),
2487 ops,
2488 move |a, mut ctx| {
2489 Box::pin(async move {
2490 for side_effect in side_effects {
2491 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2492 }
2493 })
2494 },
2495 )
2496 .await?;
2497 }
2498 _ => (),
2499 }
2500 return Ok((Some(ops), write_lock_guards));
2501 }
2502 }
2503
2504 Ok((None, None))
2505 }
2506
2507 pub(super) async fn sequence_side_effecting_func(
2508 &mut self,
2509 ctx: ExecuteContext,
2510 plan: SideEffectingFunc,
2511 ) {
2512 match plan {
2513 SideEffectingFunc::PgCancelBackend { connection_id } => {
2514 if ctx.session().conn_id().unhandled() == connection_id {
2515 ctx.retire(Err(AdapterError::Canceled));
2519 return;
2520 }
2521
2522 let res = if let Some((id_handle, _conn_meta)) =
2523 self.active_conns.get_key_value(&connection_id)
2524 {
2525 self.handle_privileged_cancel(id_handle.clone()).await;
2527 Datum::True
2528 } else {
2529 Datum::False
2530 };
2531 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2532 }
2533 }
2534 }
2535
2536 pub(super) async fn determine_real_time_recent_timestamp(
2539 &self,
2540 session: &Session,
2541 source_ids: impl Iterator<Item = CatalogItemId>,
2542 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2543 {
2544 let vars = session.vars();
2545
2546 let r = if vars.real_time_recency()
2551 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2552 && !session.contains_read_timestamp()
2553 {
2554 let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2560 if to_visit.is_empty() {
2563 return Ok(None);
2564 }
2565
2566 let mut timestamp_objects = BTreeSet::new();
2567
2568 while let Some(id) = to_visit.pop_front() {
2569 timestamp_objects.insert(id);
2570 to_visit.extend(
2571 self.catalog()
2572 .get_entry(&id)
2573 .uses()
2574 .into_iter()
2575 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2576 );
2577 }
2578 let timestamp_objects = timestamp_objects
2579 .into_iter()
2580 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2581 .collect();
2582
2583 let r = self
2584 .controller
2585 .determine_real_time_recent_timestamp(
2586 timestamp_objects,
2587 *vars.real_time_recency_timeout(),
2588 )
2589 .await?;
2590
2591 Some(r)
2592 } else {
2593 None
2594 };
2595
2596 Ok(r)
2597 }
2598
2599 #[instrument]
2600 pub(super) async fn sequence_explain_plan(
2601 &mut self,
2602 ctx: ExecuteContext,
2603 plan: plan::ExplainPlanPlan,
2604 target_cluster: TargetCluster,
2605 ) {
2606 match &plan.explainee {
2607 plan::Explainee::Statement(stmt) => match stmt {
2608 plan::ExplaineeStatement::CreateView { .. } => {
2609 self.explain_create_view(ctx, plan).await;
2610 }
2611 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2612 self.explain_create_materialized_view(ctx, plan).await;
2613 }
2614 plan::ExplaineeStatement::CreateIndex { .. } => {
2615 self.explain_create_index(ctx, plan).await;
2616 }
2617 plan::ExplaineeStatement::Select { .. } => {
2618 self.explain_peek(ctx, plan, target_cluster).await;
2619 }
2620 },
2621 plan::Explainee::View(_) => {
2622 let result = self.explain_view(&ctx, plan);
2623 ctx.retire(result);
2624 }
2625 plan::Explainee::MaterializedView(_) => {
2626 let result = self.explain_materialized_view(&ctx, plan);
2627 ctx.retire(result);
2628 }
2629 plan::Explainee::Index(_) => {
2630 let result = self.explain_index(&ctx, plan);
2631 ctx.retire(result);
2632 }
2633 plan::Explainee::ReplanView(_) => {
2634 self.explain_replan_view(ctx, plan).await;
2635 }
2636 plan::Explainee::ReplanMaterializedView(_) => {
2637 self.explain_replan_materialized_view(ctx, plan).await;
2638 }
2639 plan::Explainee::ReplanIndex(_) => {
2640 self.explain_replan_index(ctx, plan).await;
2641 }
2642 };
2643 }
2644
2645 pub(super) async fn sequence_explain_pushdown(
2646 &mut self,
2647 ctx: ExecuteContext,
2648 plan: plan::ExplainPushdownPlan,
2649 target_cluster: TargetCluster,
2650 ) {
2651 match plan.explainee {
2652 Explainee::Statement(ExplaineeStatement::Select {
2653 broken: false,
2654 plan,
2655 desc: _,
2656 }) => {
2657 let stage = return_if_err!(
2658 self.peek_validate(
2659 ctx.session(),
2660 plan,
2661 target_cluster,
2662 None,
2663 ExplainContext::Pushdown,
2664 Some(ctx.session().vars().max_query_result_size()),
2665 ),
2666 ctx
2667 );
2668 self.sequence_staged(ctx, Span::current(), stage).await;
2669 }
2670 Explainee::MaterializedView(item_id) => {
2671 self.explain_pushdown_materialized_view(ctx, item_id).await;
2672 }
2673 _ => {
2674 ctx.retire(Err(AdapterError::Unsupported(
2675 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2676 )));
2677 }
2678 };
2679 }
2680
2681 async fn render_explain_pushdown(
2682 &self,
2683 ctx: ExecuteContext,
2684 as_of: Antichain<Timestamp>,
2685 mz_now: ResultSpec<'static>,
2686 read_holds: Option<ReadHolds<Timestamp>>,
2687 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2688 ) {
2689 let fut = self
2690 .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2691 .await;
2692 task::spawn(|| "render explain pushdown", async move {
2693 let _read_holds = read_holds;
2695 let res = fut.await;
2696 ctx.retire(res);
2697 });
2698 }
2699
2700 async fn render_explain_pushdown_prepare<
2701 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2702 >(
2703 &self,
2704 session: &Session,
2705 as_of: Antichain<Timestamp>,
2706 mz_now: ResultSpec<'static>,
2707 imports: I,
2708 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2709 let explain_timeout = *session.vars().statement_timeout();
2710 let mut futures = FuturesOrdered::new();
2711 for (id, mfp) in imports {
2712 let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2713 let full_name = self
2714 .catalog
2715 .for_session(session)
2716 .resolve_full_name(&catalog_entry.name);
2717 let name = format!("{}", full_name);
2718 let relation_desc = catalog_entry
2719 .desc_opt()
2720 .expect("source should have a proper desc")
2721 .into_owned();
2722 let stats_future = self
2723 .controller
2724 .storage_collections
2725 .snapshot_parts_stats(id, as_of.clone())
2726 .await;
2727
2728 let mz_now = mz_now.clone();
2729 futures.push_back(async move {
2732 let snapshot_stats = match stats_future.await {
2733 Ok(stats) => stats,
2734 Err(e) => return Err(e),
2735 };
2736 let mut total_bytes = 0;
2737 let mut total_parts = 0;
2738 let mut selected_bytes = 0;
2739 let mut selected_parts = 0;
2740 for SnapshotPartStats {
2741 encoded_size_bytes: bytes,
2742 stats,
2743 } in &snapshot_stats.parts
2744 {
2745 let bytes = u64::cast_from(*bytes);
2746 total_bytes += bytes;
2747 total_parts += 1u64;
2748 let selected = match stats {
2749 None => true,
2750 Some(stats) => {
2751 let stats = stats.decode();
2752 let stats = RelationPartStats::new(
2753 name.as_str(),
2754 &snapshot_stats.metrics.pushdown.part_stats,
2755 &relation_desc,
2756 &stats,
2757 );
2758 stats.may_match_mfp(mz_now.clone(), &mfp)
2759 }
2760 };
2761
2762 if selected {
2763 selected_bytes += bytes;
2764 selected_parts += 1u64;
2765 }
2766 }
2767 Ok(Row::pack_slice(&[
2768 name.as_str().into(),
2769 total_bytes.into(),
2770 selected_bytes.into(),
2771 total_parts.into(),
2772 selected_parts.into(),
2773 ]))
2774 });
2775 }
2776
2777 let fut = async move {
2778 match tokio::time::timeout(
2779 explain_timeout,
2780 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2781 )
2782 .await
2783 {
2784 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2785 rows: Box::new(rows.into_row_iter()),
2786 }),
2787 Ok(Err(err)) => Err(err.into()),
2788 Err(_) => Err(AdapterError::StatementTimeout),
2789 }
2790 };
2791 fut
2792 }
2793
2794 #[instrument]
2795 pub(super) async fn sequence_insert(
2796 &mut self,
2797 mut ctx: ExecuteContext,
2798 plan: plan::InsertPlan,
2799 ) {
2800 if !ctx.session_mut().transaction().allows_writes() {
2808 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2809 return;
2810 }
2811
2812 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2826 let expr = return_if_err!(
2829 plan.values
2830 .clone()
2831 .lower(self.catalog().system_config(), None),
2832 ctx
2833 );
2834 OptimizedMirRelationExpr(expr)
2835 } else {
2836 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2838
2839 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2841
2842 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2844 };
2845
2846 match optimized_mir.into_inner() {
2847 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2848 let catalog = self.owned_catalog();
2849 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2850 let result =
2851 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2852 ctx.retire(result);
2853 });
2854 }
2855 _ => {
2857 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2858 Some(table) => {
2859 let fullname = self
2860 .catalog()
2861 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2862 table
2864 .desc_latest(&fullname)
2865 .expect("desc called on table")
2866 .arity()
2867 }
2868 None => {
2869 ctx.retire(Err(AdapterError::Catalog(
2870 mz_catalog::memory::error::Error {
2871 kind: mz_catalog::memory::error::ErrorKind::Sql(
2872 CatalogError::UnknownItem(plan.id.to_string()),
2873 ),
2874 },
2875 )));
2876 return;
2877 }
2878 };
2879
2880 let finishing = RowSetFinishing {
2881 order_by: vec![],
2882 limit: None,
2883 offset: 0,
2884 project: (0..desc_arity).collect(),
2885 };
2886
2887 let read_then_write_plan = plan::ReadThenWritePlan {
2888 id: plan.id,
2889 selection: plan.values,
2890 finishing,
2891 assignments: BTreeMap::new(),
2892 kind: MutationKind::Insert,
2893 returning: plan.returning,
2894 };
2895
2896 self.sequence_read_then_write(ctx, read_then_write_plan)
2897 .await;
2898 }
2899 }
2900 }
2901
2902 #[instrument]
2907 pub(super) async fn sequence_read_then_write(
2908 &mut self,
2909 mut ctx: ExecuteContext,
2910 plan: plan::ReadThenWritePlan,
2911 ) {
2912 let mut source_ids: BTreeSet<_> = plan
2913 .selection
2914 .depends_on()
2915 .into_iter()
2916 .map(|gid| self.catalog().resolve_item_id(&gid))
2917 .collect();
2918 source_ids.insert(plan.id);
2919
2920 if ctx.session().transaction().write_locks().is_none() {
2922 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2924
2925 for id in &source_ids {
2927 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2928 write_locks.insert_lock(*id, lock);
2929 }
2930 }
2931
2932 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2934 Ok(locks) => locks,
2935 Err(missing) => {
2936 let role_metadata = ctx.session().role_metadata().clone();
2938 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2939 let plan = DeferredPlan {
2940 ctx,
2941 plan: Plan::ReadThenWrite(plan),
2942 validity: PlanValidity::new(
2943 self.catalog.transient_revision(),
2944 source_ids.clone(),
2945 None,
2946 None,
2947 role_metadata,
2948 ),
2949 requires_locks: source_ids,
2950 };
2951 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2952 }
2953 };
2954
2955 ctx.session_mut()
2956 .try_grant_write_locks(write_locks)
2957 .expect("session has already been granted write locks");
2958 }
2959
2960 let plan::ReadThenWritePlan {
2961 id,
2962 kind,
2963 selection,
2964 mut assignments,
2965 finishing,
2966 mut returning,
2967 } = plan;
2968
2969 let desc = match self.catalog().try_get_entry(&id) {
2971 Some(table) => {
2972 let full_name = self
2973 .catalog()
2974 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2975 table
2977 .desc_latest(&full_name)
2978 .expect("desc called on table")
2979 .into_owned()
2980 }
2981 None => {
2982 ctx.retire(Err(AdapterError::Catalog(
2983 mz_catalog::memory::error::Error {
2984 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2985 id.to_string(),
2986 )),
2987 },
2988 )));
2989 return;
2990 }
2991 };
2992
2993 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2995 || assignments.values().any(|e| e.contains_temporal())
2996 || returning.iter().any(|e| e.contains_temporal());
2997 if contains_temporal {
2998 ctx.retire(Err(AdapterError::Unsupported(
2999 "calls to mz_now in write statements",
3000 )));
3001 return;
3002 }
3003
3004 fn validate_read_dependencies(
3012 catalog: &Catalog,
3013 id: &CatalogItemId,
3014 ) -> Result<(), AdapterError> {
3015 use CatalogItemType::*;
3016 use mz_catalog::memory::objects;
3017 let mut ids_to_check = Vec::new();
3018 let valid = match catalog.try_get_entry(id) {
3019 Some(entry) => {
3020 if let CatalogItem::View(objects::View { optimized_expr, .. })
3021 | CatalogItem::MaterializedView(objects::MaterializedView {
3022 optimized_expr,
3023 ..
3024 }) = entry.item()
3025 {
3026 if optimized_expr.contains_temporal() {
3027 return Err(AdapterError::Unsupported(
3028 "calls to mz_now in write statements",
3029 ));
3030 }
3031 }
3032 match entry.item().typ() {
3033 typ @ (Func | View | MaterializedView | ContinualTask) => {
3034 ids_to_check.extend(entry.uses());
3035 let valid_id = id.is_user() || matches!(typ, Func);
3036 valid_id
3037 }
3038 Source | Secret | Connection => false,
3039 Sink | Index => unreachable!(),
3041 Table => {
3042 if !id.is_user() {
3043 false
3045 } else {
3046 entry.source_export_details().is_none()
3048 }
3049 }
3050 Type => true,
3051 }
3052 }
3053 None => false,
3054 };
3055 if !valid {
3056 return Err(AdapterError::InvalidTableMutationSelection);
3057 }
3058 for id in ids_to_check {
3059 validate_read_dependencies(catalog, &id)?;
3060 }
3061 Ok(())
3062 }
3063
3064 for gid in selection.depends_on() {
3065 let item_id = self.catalog().resolve_item_id(&gid);
3066 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3067 ctx.retire(Err(err));
3068 return;
3069 }
3070 }
3071
3072 let (peek_tx, peek_rx) = oneshot::channel();
3073 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3074 let (tx, _, session, extra) = ctx.into_parts();
3075 let peek_ctx = ExecuteContext::from_parts(
3087 peek_client_tx,
3088 self.internal_cmd_tx.clone(),
3089 session,
3090 Default::default(),
3091 );
3092
3093 self.sequence_peek(
3094 peek_ctx,
3095 plan::SelectPlan {
3096 select: None,
3097 source: selection,
3098 when: QueryWhen::FreshestTableWrite,
3099 finishing,
3100 copy_to: None,
3101 },
3102 TargetCluster::Active,
3103 None,
3104 )
3105 .await;
3106
3107 let internal_cmd_tx = self.internal_cmd_tx.clone();
3108 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3109 let catalog = self.owned_catalog();
3110 let max_result_size = self.catalog().system_config().max_result_size();
3111
3112 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3113 let (peek_response, session) = match peek_rx.await {
3114 Ok(Response {
3115 result: Ok(resp),
3116 session,
3117 otel_ctx,
3118 }) => {
3119 otel_ctx.attach_as_parent();
3120 (resp, session)
3121 }
3122 Ok(Response {
3123 result: Err(e),
3124 session,
3125 otel_ctx,
3126 }) => {
3127 let ctx =
3128 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3129 otel_ctx.attach_as_parent();
3130 ctx.retire(Err(e));
3131 return;
3132 }
3133 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3135 };
3136 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3137 let mut timeout_dur = *ctx.session().vars().statement_timeout();
3138
3139 if timeout_dur == Duration::ZERO {
3141 timeout_dur = Duration::MAX;
3142 }
3143
3144 let style = ExprPrepStyle::OneShot {
3145 logical_time: EvalTime::NotAvailable, session: ctx.session(),
3147 catalog_state: catalog.state(),
3148 };
3149 for expr in assignments.values_mut().chain(returning.iter_mut()) {
3150 return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3151 }
3152
3153 let make_diffs =
3154 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3155 let arena = RowArena::new();
3156 let mut diffs = Vec::new();
3157 let mut datum_vec = mz_repr::DatumVec::new();
3158
3159 while let Some(row) = rows.next() {
3160 if !assignments.is_empty() {
3161 assert!(
3162 matches!(kind, MutationKind::Update),
3163 "only updates support assignments"
3164 );
3165 let mut datums = datum_vec.borrow_with(row);
3166 let mut updates = vec![];
3167 for (idx, expr) in &assignments {
3168 let updated = match expr.eval(&datums, &arena) {
3169 Ok(updated) => updated,
3170 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3171 };
3172 updates.push((*idx, updated));
3173 }
3174 for (idx, new_value) in updates {
3175 datums[idx] = new_value;
3176 }
3177 let updated = Row::pack_slice(&datums);
3178 diffs.push((updated, Diff::ONE));
3179 }
3180 match kind {
3181 MutationKind::Update | MutationKind::Delete => {
3185 diffs.push((row.to_owned(), Diff::MINUS_ONE))
3186 }
3187 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3188 }
3189 }
3190
3191 let mut byte_size: u64 = 0;
3194 for (row, diff) in &diffs {
3195 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3196 if diff.is_positive() {
3197 for (idx, datum) in row.iter().enumerate() {
3198 desc.constraints_met(idx, &datum)?;
3199 }
3200 }
3201 }
3202 Ok((diffs, byte_size))
3203 };
3204
3205 let diffs = match peek_response {
3206 ExecuteResponse::SendingRowsStreaming {
3207 rows: mut rows_stream,
3208 ..
3209 } => {
3210 let mut byte_size: u64 = 0;
3211 let mut diffs = Vec::new();
3212 let result = loop {
3213 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3214 Ok(Some(res)) => match res {
3215 PeekResponseUnary::Rows(new_rows) => {
3216 match make_diffs(new_rows) {
3217 Ok((mut new_diffs, new_byte_size)) => {
3218 byte_size = byte_size.saturating_add(new_byte_size);
3219 if byte_size > max_result_size {
3220 break Err(AdapterError::ResultSize(format!(
3221 "result exceeds max size of {max_result_size}"
3222 )));
3223 }
3224 diffs.append(&mut new_diffs)
3225 }
3226 Err(e) => break Err(e),
3227 };
3228 }
3229 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3230 PeekResponseUnary::Error(e) => {
3231 break Err(AdapterError::Unstructured(anyhow!(e)));
3232 }
3233 },
3234 Ok(None) => break Ok(diffs),
3235 Err(_) => {
3236 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3241 conn_id: ctx.session().conn_id().clone(),
3242 });
3243 if let Err(e) = result {
3244 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3245 }
3246 break Err(AdapterError::StatementTimeout);
3247 }
3248 }
3249 };
3250
3251 result
3252 }
3253 ExecuteResponse::SendingRowsImmediate { rows } => {
3254 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3255 }
3256 resp => Err(AdapterError::Unstructured(anyhow!(
3257 "unexpected peek response: {resp:?}"
3258 ))),
3259 };
3260
3261 let mut returning_rows = Vec::new();
3262 let mut diff_err: Option<AdapterError> = None;
3263 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3264 let arena = RowArena::new();
3265 for (row, diff) in diffs {
3266 if !diff.is_positive() {
3267 continue;
3268 }
3269 let mut returning_row = Row::with_capacity(returning.len());
3270 let mut packer = returning_row.packer();
3271 for expr in &returning {
3272 let datums: Vec<_> = row.iter().collect();
3273 match expr.eval(&datums, &arena) {
3274 Ok(datum) => {
3275 packer.push(datum);
3276 }
3277 Err(err) => {
3278 diff_err = Some(err.into());
3279 break;
3280 }
3281 }
3282 }
3283 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3284 let diff = match NonZeroUsize::try_from(diff) {
3285 Ok(diff) => diff,
3286 Err(err) => {
3287 diff_err = Some(err.into());
3288 break;
3289 }
3290 };
3291 returning_rows.push((returning_row, diff));
3292 if diff_err.is_some() {
3293 break;
3294 }
3295 }
3296 }
3297 let diffs = if let Some(err) = diff_err {
3298 Err(err)
3299 } else {
3300 diffs
3301 };
3302
3303 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3306 if let Some(timestamp_context) = timestamp_context {
3315 let (tx, rx) = tokio::sync::oneshot::channel();
3316 let conn_id = ctx.session().conn_id().clone();
3317 let pending_read_txn = PendingReadTxn {
3318 txn: PendingRead::ReadThenWrite { ctx, tx },
3319 timestamp_context,
3320 created: Instant::now(),
3321 num_requeues: 0,
3322 otel_ctx: OpenTelemetryContext::obtain(),
3323 };
3324 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3325 if let Err(e) = result {
3327 warn!(
3328 "strict_serializable_reads_tx dropped before we could send: {:?}",
3329 e
3330 );
3331 return;
3332 }
3333 let result = rx.await;
3334 ctx = match result {
3336 Ok(Some(ctx)) => ctx,
3337 Ok(None) => {
3338 return;
3341 }
3342 Err(e) => {
3343 warn!(
3344 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3345 e
3346 );
3347 return;
3348 }
3349 };
3350 }
3351
3352 match diffs {
3353 Ok(diffs) => {
3354 let result = Self::send_diffs(
3355 ctx.session_mut(),
3356 plan::SendDiffsPlan {
3357 id,
3358 updates: diffs,
3359 kind,
3360 returning: returning_rows,
3361 max_result_size,
3362 },
3363 );
3364 ctx.retire(result);
3365 }
3366 Err(e) => {
3367 ctx.retire(Err(e));
3368 }
3369 }
3370 });
3371 }
3372
3373 #[instrument]
3374 pub(super) async fn sequence_alter_item_rename(
3375 &mut self,
3376 ctx: &mut ExecuteContext,
3377 plan: plan::AlterItemRenamePlan,
3378 ) -> Result<ExecuteResponse, AdapterError> {
3379 let op = catalog::Op::RenameItem {
3380 id: plan.id,
3381 current_full_name: plan.current_full_name,
3382 to_name: plan.to_name,
3383 };
3384 match self
3385 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3386 .await
3387 {
3388 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3389 Err(err) => Err(err),
3390 }
3391 }
3392
3393 #[instrument]
3394 pub(super) async fn sequence_alter_retain_history(
3395 &mut self,
3396 ctx: &mut ExecuteContext,
3397 plan: plan::AlterRetainHistoryPlan,
3398 ) -> Result<ExecuteResponse, AdapterError> {
3399 let ops = vec![catalog::Op::AlterRetainHistory {
3400 id: plan.id,
3401 value: plan.value,
3402 window: plan.window,
3403 }];
3404 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3405 Box::pin(async move {
3406 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3407 let cluster = match catalog_item {
3408 CatalogItem::Table(_)
3409 | CatalogItem::MaterializedView(_)
3410 | CatalogItem::Source(_)
3411 | CatalogItem::ContinualTask(_) => None,
3412 CatalogItem::Index(index) => Some(index.cluster_id),
3413 CatalogItem::Log(_)
3414 | CatalogItem::View(_)
3415 | CatalogItem::Sink(_)
3416 | CatalogItem::Type(_)
3417 | CatalogItem::Func(_)
3418 | CatalogItem::Secret(_)
3419 | CatalogItem::Connection(_) => unreachable!(),
3420 };
3421 match cluster {
3422 Some(cluster) => {
3423 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3424 }
3425 None => {
3426 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3427 }
3428 }
3429 })
3430 })
3431 .await?;
3432 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3433 }
3434
3435 #[instrument]
3436 pub(super) async fn sequence_alter_schema_rename(
3437 &mut self,
3438 ctx: &mut ExecuteContext,
3439 plan: plan::AlterSchemaRenamePlan,
3440 ) -> Result<ExecuteResponse, AdapterError> {
3441 let (database_spec, schema_spec) = plan.cur_schema_spec;
3442 let op = catalog::Op::RenameSchema {
3443 database_spec,
3444 schema_spec,
3445 new_name: plan.new_schema_name,
3446 check_reserved_names: true,
3447 };
3448 match self
3449 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3450 .await
3451 {
3452 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3453 Err(err) => Err(err),
3454 }
3455 }
3456
3457 #[instrument]
3458 pub(super) async fn sequence_alter_schema_swap(
3459 &mut self,
3460 ctx: &mut ExecuteContext,
3461 plan: plan::AlterSchemaSwapPlan,
3462 ) -> Result<ExecuteResponse, AdapterError> {
3463 let plan::AlterSchemaSwapPlan {
3464 schema_a_spec: (schema_a_db, schema_a),
3465 schema_a_name,
3466 schema_b_spec: (schema_b_db, schema_b),
3467 schema_b_name,
3468 name_temp,
3469 } = plan;
3470
3471 let op_a = catalog::Op::RenameSchema {
3472 database_spec: schema_a_db,
3473 schema_spec: schema_a,
3474 new_name: name_temp,
3475 check_reserved_names: false,
3476 };
3477 let op_b = catalog::Op::RenameSchema {
3478 database_spec: schema_b_db,
3479 schema_spec: schema_b,
3480 new_name: schema_a_name,
3481 check_reserved_names: false,
3482 };
3483 let op_c = catalog::Op::RenameSchema {
3484 database_spec: schema_a_db,
3485 schema_spec: schema_a,
3486 new_name: schema_b_name,
3487 check_reserved_names: false,
3488 };
3489
3490 match self
3491 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3492 Box::pin(async {})
3493 })
3494 .await
3495 {
3496 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3497 Err(err) => Err(err),
3498 }
3499 }
3500
3501 #[instrument]
3502 pub(super) async fn sequence_alter_role(
3503 &mut self,
3504 session: &Session,
3505 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3506 ) -> Result<ExecuteResponse, AdapterError> {
3507 let catalog = self.catalog().for_session(session);
3508 let role = catalog.get_role(&id);
3509
3510 let mut notices = vec![];
3512
3513 let mut attributes: RoleAttributesRaw = role.attributes().clone().into();
3515 let mut vars = role.vars().clone();
3516
3517 let mut nopassword = false;
3520
3521 match option {
3523 PlannedAlterRoleOption::Attributes(attrs) => {
3524 self.validate_role_attributes(&attrs.clone().into())?;
3525
3526 if let Some(inherit) = attrs.inherit {
3527 attributes.inherit = inherit;
3528 }
3529
3530 if let Some(password) = attrs.password {
3531 attributes.password = Some(password);
3532 }
3533
3534 if let Some(superuser) = attrs.superuser {
3535 attributes.superuser = Some(superuser);
3536 }
3537
3538 if let Some(login) = attrs.login {
3539 attributes.login = Some(login);
3540 }
3541
3542 if attrs.nopassword.unwrap_or(false) {
3543 nopassword = true;
3544 }
3545
3546 if let Some(notice) = self.should_emit_rbac_notice(session) {
3547 notices.push(notice);
3548 }
3549 }
3550 PlannedAlterRoleOption::Variable(variable) => {
3551 let session_var = session.vars().inspect(variable.name())?;
3553 session_var.visible(session.user(), catalog.system_vars())?;
3555
3556 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3559 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3560 } else if let PlannedRoleVariable::Set {
3561 name,
3562 value: VariableValue::Values(vals),
3563 } = &variable
3564 {
3565 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3566 notices.push(AdapterNotice::IntrospectionClusterUsage);
3567 }
3568 }
3569
3570 let var_name = match variable {
3571 PlannedRoleVariable::Set { name, value } => {
3572 match &value {
3574 VariableValue::Default => {
3575 vars.remove(&name);
3576 }
3577 VariableValue::Values(vals) => {
3578 let var = match &vals[..] {
3579 [val] => OwnedVarInput::Flat(val.clone()),
3580 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3581 };
3582 session_var.check(var.borrow())?;
3584
3585 vars.insert(name.clone(), var);
3586 }
3587 };
3588 name
3589 }
3590 PlannedRoleVariable::Reset { name } => {
3591 vars.remove(&name);
3593 name
3594 }
3595 };
3596
3597 notices.push(AdapterNotice::VarDefaultUpdated {
3599 role: Some(name.clone()),
3600 var_name: Some(var_name),
3601 });
3602 }
3603 }
3604
3605 let op = catalog::Op::AlterRole {
3606 id,
3607 name,
3608 attributes,
3609 nopassword,
3610 vars: RoleVars { map: vars },
3611 };
3612 let response = self
3613 .catalog_transact(Some(session), vec![op])
3614 .await
3615 .map(|_| ExecuteResponse::AlteredRole)?;
3616
3617 session.add_notices(notices);
3619
3620 Ok(response)
3621 }
3622
3623 #[instrument]
3624 pub(super) async fn sequence_alter_sink_prepare(
3625 &mut self,
3626 ctx: ExecuteContext,
3627 plan: plan::AlterSinkPlan,
3628 ) {
3629 let id_bundle = crate::CollectionIdBundle {
3631 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3632 compute_ids: BTreeMap::new(),
3633 };
3634 let read_hold = self.acquire_read_holds(&id_bundle);
3635
3636 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3637 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3638 return;
3639 };
3640
3641 let otel_ctx = OpenTelemetryContext::obtain();
3642 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3643
3644 let plan_validity = PlanValidity::new(
3645 self.catalog().transient_revision(),
3646 BTreeSet::from_iter([from_item_id]),
3647 Some(plan.in_cluster),
3648 None,
3649 ctx.session().role_metadata().clone(),
3650 );
3651
3652 let create_sink_stmt = match mz_sql::parse::parse(&plan.sink.create_sql)
3655 .expect("invalid create sink sql")
3656 .into_element()
3657 .ast
3658 {
3659 Statement::CreateSink(stmt) => stmt,
3660 _ => unreachable!("invalid statment kind for sink"),
3661 };
3662 let catalog = self.catalog().for_system_session();
3663 let (_, resolved_ids) = match mz_sql::names::resolve(&catalog, create_sink_stmt) {
3664 Ok(ok) => ok,
3665 Err(e) => {
3666 ctx.retire(Err(AdapterError::internal("ALTER SINK", e)));
3667 return;
3668 }
3669 };
3670
3671 info!(
3672 "preparing alter sink for {}: frontiers={:?} export={:?}",
3673 plan.global_id,
3674 self.controller
3675 .storage_collections
3676 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3677 self.controller.storage.export(plan.global_id)
3678 );
3679
3680 self.install_storage_watch_set(
3683 ctx.session().conn_id().clone(),
3684 BTreeSet::from_iter([plan.global_id]),
3685 read_ts,
3686 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3687 ctx: Some(ctx),
3688 otel_ctx,
3689 plan,
3690 plan_validity,
3691 resolved_ids,
3692 read_hold,
3693 }),
3694 );
3695 }
3696
3697 #[instrument]
3698 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3699 ctx.otel_ctx.attach_as_parent();
3700 match ctx.plan_validity.check(self.catalog()) {
3701 Ok(()) => {}
3702 Err(err) => {
3703 ctx.retire(Err(err));
3704 return;
3705 }
3706 }
3707 {
3708 let plan = &ctx.plan;
3709 info!(
3710 "finishing alter sink for {}: frontiers={:?} export={:?}",
3711 plan.global_id,
3712 self.controller
3713 .storage_collections
3714 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3715 self.controller.storage.export(plan.global_id)
3716 );
3717 }
3718
3719 let plan::AlterSinkPlan {
3720 item_id,
3721 global_id,
3722 sink,
3723 with_snapshot,
3724 in_cluster,
3725 } = ctx.plan.clone();
3726 let write_frontier = &self
3729 .controller
3730 .storage
3731 .export(global_id)
3732 .expect("sink known to exist")
3733 .write_frontier;
3734 let as_of = ctx.read_hold.least_valid_read();
3735 assert!(
3736 write_frontier.iter().all(|t| as_of.less_than(t)),
3737 "{:?} should be strictly less than {:?}",
3738 &*as_of,
3739 &**write_frontier
3740 );
3741
3742 let catalog_sink = Sink {
3743 create_sql: sink.create_sql,
3744 global_id,
3745 from: sink.from,
3746 connection: sink.connection.clone(),
3747 envelope: sink.envelope,
3748 version: sink.version,
3749 with_snapshot,
3750 resolved_ids: ctx.resolved_ids.clone(),
3751 cluster_id: in_cluster,
3752 };
3753
3754 let ops = vec![catalog::Op::UpdateItem {
3755 id: item_id,
3756 name: self.catalog.get_entry(&item_id).name().clone(),
3757 to_item: CatalogItem::Sink(catalog_sink),
3758 }];
3759
3760 match self
3761 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3762 .await
3763 {
3764 Ok(()) => {}
3765 Err(err) => {
3766 ctx.retire(Err(err));
3767 return;
3768 }
3769 }
3770
3771 let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3772 let storage_sink_desc = StorageSinkDesc {
3773 from: sink.from,
3774 from_desc: from_entry
3775 .desc_opt()
3776 .expect("sinks can only be built on items with descs")
3777 .into_owned(),
3778 connection: sink
3779 .connection
3780 .clone()
3781 .into_inline_connection(self.catalog().state()),
3782 envelope: sink.envelope,
3783 as_of,
3784 with_snapshot,
3785 version: sink.version,
3786 from_storage_metadata: (),
3787 to_storage_metadata: (),
3788 };
3789
3790 self.controller
3791 .storage
3792 .alter_export(
3793 global_id,
3794 ExportDescription {
3795 sink: storage_sink_desc,
3796 instance_id: in_cluster,
3797 },
3798 )
3799 .await
3800 .unwrap_or_terminate("cannot fail to alter source desc");
3801
3802 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3803 }
3804
3805 #[instrument]
3806 pub(super) async fn sequence_alter_connection(
3807 &mut self,
3808 ctx: ExecuteContext,
3809 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3810 ) {
3811 match action {
3812 AlterConnectionAction::RotateKeys => {
3813 self.sequence_rotate_keys(ctx, id).await;
3814 }
3815 AlterConnectionAction::AlterOptions {
3816 set_options,
3817 drop_options,
3818 validate,
3819 } => {
3820 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3821 .await
3822 }
3823 }
3824 }
3825
3826 #[instrument]
3827 async fn sequence_alter_connection_options(
3828 &mut self,
3829 mut ctx: ExecuteContext,
3830 id: CatalogItemId,
3831 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3832 drop_options: BTreeSet<ConnectionOptionName>,
3833 validate: bool,
3834 ) {
3835 let cur_entry = self.catalog().get_entry(&id);
3836 let cur_conn = cur_entry.connection().expect("known to be connection");
3837 let connection_gid = cur_conn.global_id();
3838
3839 let inner = || -> Result<Connection, AdapterError> {
3840 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3842 .expect("invalid create sql persisted to catalog")
3843 .into_element()
3844 .ast
3845 {
3846 Statement::CreateConnection(stmt) => stmt,
3847 _ => unreachable!("proved type is source"),
3848 };
3849
3850 let catalog = self.catalog().for_system_session();
3851
3852 let (mut create_conn_stmt, resolved_ids) =
3854 mz_sql::names::resolve(&catalog, create_conn_stmt)
3855 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3856
3857 create_conn_stmt
3859 .values
3860 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3861
3862 create_conn_stmt.values.extend(
3864 set_options
3865 .into_iter()
3866 .map(|(name, value)| ConnectionOption { name, value }),
3867 );
3868
3869 let mut catalog = self.catalog().for_system_session();
3872 catalog.mark_id_unresolvable_for_replanning(id);
3873
3874 let plan = match mz_sql::plan::plan(
3876 None,
3877 &catalog,
3878 Statement::CreateConnection(create_conn_stmt),
3879 &Params::empty(),
3880 &resolved_ids,
3881 )
3882 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3883 {
3884 Plan::CreateConnection(plan) => plan,
3885 _ => unreachable!("create source plan is only valid response"),
3886 };
3887
3888 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3890 .expect("invalid create sql persisted to catalog")
3891 .into_element()
3892 .ast
3893 {
3894 Statement::CreateConnection(stmt) => stmt,
3895 _ => unreachable!("proved type is source"),
3896 };
3897
3898 let catalog = self.catalog().for_system_session();
3899
3900 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3902 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3903
3904 Ok(Connection {
3905 create_sql: plan.connection.create_sql,
3906 global_id: cur_conn.global_id,
3907 details: plan.connection.details,
3908 resolved_ids: new_deps,
3909 })
3910 };
3911
3912 let conn = match inner() {
3913 Ok(conn) => conn,
3914 Err(e) => {
3915 return ctx.retire(Err(e));
3916 }
3917 };
3918
3919 if validate {
3920 let connection = conn
3921 .details
3922 .to_connection()
3923 .into_inline_connection(self.catalog().state());
3924
3925 let internal_cmd_tx = self.internal_cmd_tx.clone();
3926 let transient_revision = self.catalog().transient_revision();
3927 let conn_id = ctx.session().conn_id().clone();
3928 let otel_ctx = OpenTelemetryContext::obtain();
3929 let role_metadata = ctx.session().role_metadata().clone();
3930 let current_storage_parameters = self.controller.storage.config().clone();
3931
3932 task::spawn(
3933 || format!("validate_alter_connection:{conn_id}"),
3934 async move {
3935 let resolved_ids = conn.resolved_ids.clone();
3936 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3937 let result = match connection.validate(id, ¤t_storage_parameters).await {
3938 Ok(()) => Ok(conn),
3939 Err(err) => Err(err.into()),
3940 };
3941
3942 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3944 AlterConnectionValidationReady {
3945 ctx,
3946 result,
3947 connection_id: id,
3948 connection_gid,
3949 plan_validity: PlanValidity::new(
3950 transient_revision,
3951 dependency_ids.clone(),
3952 None,
3953 None,
3954 role_metadata,
3955 ),
3956 otel_ctx,
3957 resolved_ids,
3958 },
3959 ));
3960 if let Err(e) = result {
3961 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3962 }
3963 },
3964 );
3965 } else {
3966 let result = self
3967 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3968 .await;
3969 ctx.retire(result);
3970 }
3971 }
3972
3973 #[instrument]
3974 pub(crate) async fn sequence_alter_connection_stage_finish(
3975 &mut self,
3976 session: &mut Session,
3977 id: CatalogItemId,
3978 connection: Connection,
3979 ) -> Result<ExecuteResponse, AdapterError> {
3980 match self.catalog.get_entry(&id).item() {
3981 CatalogItem::Connection(curr_conn) => {
3982 curr_conn
3983 .details
3984 .to_connection()
3985 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3986 .map_err(StorageError::from)?;
3987 }
3988 _ => unreachable!("known to be a connection"),
3989 };
3990
3991 let ops = vec![catalog::Op::UpdateItem {
3992 id,
3993 name: self.catalog.get_entry(&id).name().clone(),
3994 to_item: CatalogItem::Connection(connection.clone()),
3995 }];
3996
3997 self.catalog_transact(Some(session), ops).await?;
3998
3999 match connection.details {
4000 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
4001 let spec = VpcEndpointConfig {
4002 aws_service_name: privatelink.service_name.to_owned(),
4003 availability_zone_ids: privatelink.availability_zones.to_owned(),
4004 };
4005 self.cloud_resource_controller
4006 .as_ref()
4007 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
4008 .ensure_vpc_endpoint(id, spec)
4009 .await?;
4010 }
4011 _ => {}
4012 };
4013
4014 let entry = self.catalog().get_entry(&id);
4015
4016 let mut connections = VecDeque::new();
4017 connections.push_front(entry.id());
4018
4019 let mut source_connections = BTreeMap::new();
4020 let mut sink_connections = BTreeMap::new();
4021 let mut source_export_data_configs = BTreeMap::new();
4022
4023 while let Some(id) = connections.pop_front() {
4024 for id in self.catalog.get_entry(&id).used_by() {
4025 let entry = self.catalog.get_entry(id);
4026 match entry.item() {
4027 CatalogItem::Connection(_) => connections.push_back(*id),
4028 CatalogItem::Source(source) => {
4029 let desc = match &entry.source().expect("known to be source").data_source {
4030 DataSourceDesc::Ingestion { desc, .. }
4031 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4032 desc.clone().into_inline_connection(self.catalog().state())
4033 }
4034 _ => unreachable!("only ingestions reference connections"),
4035 };
4036
4037 source_connections.insert(source.global_id, desc.connection);
4038 }
4039 CatalogItem::Sink(sink) => {
4040 let export = entry.sink().expect("known to be sink");
4041 sink_connections.insert(
4042 sink.global_id,
4043 export
4044 .connection
4045 .clone()
4046 .into_inline_connection(self.catalog().state()),
4047 );
4048 }
4049 CatalogItem::Table(table) => {
4050 if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4053 let data_config = export_data_config.clone();
4054 source_export_data_configs.insert(
4055 table.global_id_writes(),
4056 data_config.into_inline_connection(self.catalog().state()),
4057 );
4058 }
4059 }
4060 t => unreachable!("connection dependency not expected on {:?}", t),
4061 }
4062 }
4063 }
4064
4065 if !source_connections.is_empty() {
4066 self.controller
4067 .storage
4068 .alter_ingestion_connections(source_connections)
4069 .await
4070 .unwrap_or_terminate("cannot fail to alter ingestion connection");
4071 }
4072
4073 if !sink_connections.is_empty() {
4074 self.controller
4075 .storage
4076 .alter_export_connections(sink_connections)
4077 .await
4078 .unwrap_or_terminate("altering exports after txn must succeed");
4079 }
4080
4081 if !source_export_data_configs.is_empty() {
4082 self.controller
4083 .storage
4084 .alter_ingestion_export_data_configs(source_export_data_configs)
4085 .await
4086 .unwrap_or_terminate("altering source export data configs after txn must succeed");
4087 }
4088
4089 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4090 }
4091
4092 #[instrument]
4093 pub(super) async fn sequence_alter_source(
4094 &mut self,
4095 session: &Session,
4096 plan::AlterSourcePlan {
4097 item_id,
4098 ingestion_id,
4099 action,
4100 }: plan::AlterSourcePlan,
4101 ) -> Result<ExecuteResponse, AdapterError> {
4102 let cur_entry = self.catalog().get_entry(&item_id);
4103 let cur_source = cur_entry.source().expect("known to be source");
4104
4105 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4106 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4108 .expect("invalid create sql persisted to catalog")
4109 .into_element()
4110 .ast
4111 {
4112 Statement::CreateSource(stmt) => stmt,
4113 _ => unreachable!("proved type is source"),
4114 };
4115
4116 let catalog = coord.catalog().for_system_session();
4117
4118 mz_sql::names::resolve(&catalog, create_source_stmt)
4120 .map_err(|e| AdapterError::internal(err_cx, e))
4121 };
4122
4123 match action {
4124 plan::AlterSourceAction::AddSubsourceExports {
4125 subsources,
4126 options,
4127 } => {
4128 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4129
4130 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4131 text_columns: mut new_text_columns,
4132 exclude_columns: mut new_exclude_columns,
4133 ..
4134 } = options.try_into()?;
4135
4136 let (mut create_source_stmt, resolved_ids) =
4138 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4139
4140 let catalog = self.catalog();
4142 let curr_references: BTreeSet<_> = catalog
4143 .get_entry(&item_id)
4144 .used_by()
4145 .into_iter()
4146 .filter_map(|subsource| {
4147 catalog
4148 .get_entry(subsource)
4149 .subsource_details()
4150 .map(|(_id, reference, _details)| reference)
4151 })
4152 .collect();
4153
4154 let purification_err =
4157 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4158
4159 match &mut create_source_stmt.connection {
4163 CreateSourceConnection::Postgres {
4164 options: curr_options,
4165 ..
4166 } => {
4167 let mz_sql::plan::PgConfigOptionExtracted {
4168 mut text_columns, ..
4169 } = curr_options.clone().try_into()?;
4170
4171 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4174
4175 text_columns.retain(|column_qualified_reference| {
4177 mz_ore::soft_assert_eq_or_log!(
4178 column_qualified_reference.0.len(),
4179 4,
4180 "all TEXT COLUMNS values must be column-qualified references"
4181 );
4182 let mut table = column_qualified_reference.clone();
4183 table.0.truncate(3);
4184 curr_references.contains(&table)
4185 });
4186
4187 new_text_columns.extend(text_columns);
4189
4190 if !new_text_columns.is_empty() {
4192 new_text_columns.sort();
4193 let new_text_columns = new_text_columns
4194 .into_iter()
4195 .map(WithOptionValue::UnresolvedItemName)
4196 .collect();
4197
4198 curr_options.push(PgConfigOption {
4199 name: PgConfigOptionName::TextColumns,
4200 value: Some(WithOptionValue::Sequence(new_text_columns)),
4201 });
4202 }
4203 }
4204 CreateSourceConnection::MySql {
4205 options: curr_options,
4206 ..
4207 } => {
4208 let mz_sql::plan::MySqlConfigOptionExtracted {
4209 mut text_columns,
4210 mut exclude_columns,
4211 ..
4212 } = curr_options.clone().try_into()?;
4213
4214 curr_options.retain(|o| {
4217 !matches!(
4218 o.name,
4219 MySqlConfigOptionName::TextColumns
4220 | MySqlConfigOptionName::ExcludeColumns
4221 )
4222 });
4223
4224 let column_referenced =
4226 |column_qualified_reference: &UnresolvedItemName| {
4227 mz_ore::soft_assert_eq_or_log!(
4228 column_qualified_reference.0.len(),
4229 3,
4230 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4231 );
4232 let mut table = column_qualified_reference.clone();
4233 table.0.truncate(2);
4234 curr_references.contains(&table)
4235 };
4236 text_columns.retain(column_referenced);
4237 exclude_columns.retain(column_referenced);
4238
4239 new_text_columns.extend(text_columns);
4241 new_exclude_columns.extend(exclude_columns);
4242
4243 if !new_text_columns.is_empty() {
4245 new_text_columns.sort();
4246 let new_text_columns = new_text_columns
4247 .into_iter()
4248 .map(WithOptionValue::UnresolvedItemName)
4249 .collect();
4250
4251 curr_options.push(MySqlConfigOption {
4252 name: MySqlConfigOptionName::TextColumns,
4253 value: Some(WithOptionValue::Sequence(new_text_columns)),
4254 });
4255 }
4256 if !new_exclude_columns.is_empty() {
4258 new_exclude_columns.sort();
4259 let new_exclude_columns = new_exclude_columns
4260 .into_iter()
4261 .map(WithOptionValue::UnresolvedItemName)
4262 .collect();
4263
4264 curr_options.push(MySqlConfigOption {
4265 name: MySqlConfigOptionName::ExcludeColumns,
4266 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4267 });
4268 }
4269 }
4270 CreateSourceConnection::SqlServer {
4271 options: curr_options,
4272 ..
4273 } => {
4274 let mz_sql::plan::SqlServerConfigOptionExtracted {
4275 mut text_columns,
4276 mut exclude_columns,
4277 ..
4278 } = curr_options.clone().try_into()?;
4279
4280 curr_options.retain(|o| {
4283 !matches!(
4284 o.name,
4285 SqlServerConfigOptionName::TextColumns
4286 | SqlServerConfigOptionName::ExcludeColumns
4287 )
4288 });
4289
4290 let column_referenced =
4292 |column_qualified_reference: &UnresolvedItemName| {
4293 mz_ore::soft_assert_eq_or_log!(
4294 column_qualified_reference.0.len(),
4295 3,
4296 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4297 );
4298 let mut table = column_qualified_reference.clone();
4299 table.0.truncate(2);
4300 curr_references.contains(&table)
4301 };
4302 text_columns.retain(column_referenced);
4303 exclude_columns.retain(column_referenced);
4304
4305 new_text_columns.extend(text_columns);
4307 new_exclude_columns.extend(exclude_columns);
4308
4309 if !new_text_columns.is_empty() {
4311 new_text_columns.sort();
4312 let new_text_columns = new_text_columns
4313 .into_iter()
4314 .map(WithOptionValue::UnresolvedItemName)
4315 .collect();
4316
4317 curr_options.push(SqlServerConfigOption {
4318 name: SqlServerConfigOptionName::TextColumns,
4319 value: Some(WithOptionValue::Sequence(new_text_columns)),
4320 });
4321 }
4322 if !new_exclude_columns.is_empty() {
4324 new_exclude_columns.sort();
4325 let new_exclude_columns = new_exclude_columns
4326 .into_iter()
4327 .map(WithOptionValue::UnresolvedItemName)
4328 .collect();
4329
4330 curr_options.push(SqlServerConfigOption {
4331 name: SqlServerConfigOptionName::ExcludeColumns,
4332 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4333 });
4334 }
4335 }
4336 _ => return Err(purification_err()),
4337 };
4338
4339 let mut catalog = self.catalog().for_system_session();
4340 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4341
4342 let plan = match mz_sql::plan::plan(
4344 None,
4345 &catalog,
4346 Statement::CreateSource(create_source_stmt),
4347 &Params::empty(),
4348 &resolved_ids,
4349 )
4350 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4351 {
4352 Plan::CreateSource(plan) => plan,
4353 _ => unreachable!("create source plan is only valid response"),
4354 };
4355
4356 let source = Source::new(
4360 plan,
4361 cur_source.global_id,
4362 resolved_ids,
4363 cur_source.custom_logical_compaction_window,
4364 cur_source.is_retained_metrics_object,
4365 );
4366
4367 let source_compaction_window = source.custom_logical_compaction_window;
4368
4369 let desc = match &source.data_source {
4371 DataSourceDesc::Ingestion { desc, .. }
4372 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
4373 desc.clone().into_inline_connection(self.catalog().state())
4374 }
4375 _ => unreachable!("already verified of type ingestion"),
4376 };
4377
4378 self.controller
4379 .storage
4380 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4381 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4382
4383 let mut ops = vec![catalog::Op::UpdateItem {
4386 id: item_id,
4387 name: self.catalog.get_entry(&item_id).name().clone(),
4390 to_item: CatalogItem::Source(source),
4391 }];
4392
4393 let CreateSourceInner {
4394 ops: new_ops,
4395 sources,
4396 if_not_exists_ids,
4397 } = self.create_source_inner(session, subsources).await?;
4398
4399 ops.extend(new_ops.into_iter());
4400
4401 assert!(
4402 if_not_exists_ids.is_empty(),
4403 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4404 );
4405
4406 self.catalog_transact(Some(session), ops).await?;
4407
4408 let mut item_ids = BTreeSet::new();
4409 let mut collections = Vec::with_capacity(sources.len());
4410 for (item_id, source) in sources {
4411 let status_id = self.catalog().resolve_builtin_storage_collection(
4412 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4413 );
4414 let source_status_collection_id =
4415 Some(self.catalog().get_entry(&status_id).latest_global_id());
4416
4417 let (data_source, status_collection_id) = match source.data_source {
4418 DataSourceDesc::IngestionExport {
4420 ingestion_id,
4421 external_reference: _,
4422 details,
4423 data_config,
4424 } => {
4425 let ingestion_id =
4428 self.catalog().get_entry(&ingestion_id).latest_global_id();
4429 (
4430 DataSource::IngestionExport {
4431 ingestion_id,
4432 details,
4433 data_config: data_config
4434 .into_inline_connection(self.catalog().state()),
4435 },
4436 source_status_collection_id,
4437 )
4438 }
4439 o => {
4440 unreachable!(
4441 "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4442 o
4443 )
4444 }
4445 };
4446
4447 collections.push((
4448 source.global_id,
4449 CollectionDescription {
4450 desc: source.desc.clone(),
4451 data_source,
4452 since: None,
4453 status_collection_id,
4454 timeline: Some(source.timeline.clone()),
4455 },
4456 ));
4457
4458 item_ids.insert(item_id);
4459 }
4460
4461 let storage_metadata = self.catalog.state().storage_metadata();
4462
4463 self.controller
4464 .storage
4465 .create_collections(storage_metadata, None, collections)
4466 .await
4467 .unwrap_or_terminate("cannot fail to create collections");
4468
4469 self.initialize_storage_read_policies(
4470 item_ids,
4471 source_compaction_window.unwrap_or(CompactionWindow::Default),
4472 )
4473 .await;
4474 }
4475 plan::AlterSourceAction::RefreshReferences { references } => {
4476 self.catalog_transact(
4477 Some(session),
4478 vec![catalog::Op::UpdateSourceReferences {
4479 source_id: item_id,
4480 references: references.into(),
4481 }],
4482 )
4483 .await?;
4484 }
4485 }
4486
4487 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4488 }
4489
4490 #[instrument]
4491 pub(super) async fn sequence_alter_system_set(
4492 &mut self,
4493 session: &Session,
4494 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4495 ) -> Result<ExecuteResponse, AdapterError> {
4496 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4497 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4499 self.validate_alter_system_network_policy(session, &value)?;
4500 }
4501
4502 let op = match value {
4503 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4504 name: name.clone(),
4505 value: OwnedVarInput::SqlSet(values),
4506 },
4507 plan::VariableValue::Default => {
4508 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4509 }
4510 };
4511 self.catalog_transact(Some(session), vec![op]).await?;
4512
4513 session.add_notice(AdapterNotice::VarDefaultUpdated {
4514 role: None,
4515 var_name: Some(name),
4516 });
4517 Ok(ExecuteResponse::AlteredSystemConfiguration)
4518 }
4519
4520 #[instrument]
4521 pub(super) async fn sequence_alter_system_reset(
4522 &mut self,
4523 session: &Session,
4524 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4525 ) -> Result<ExecuteResponse, AdapterError> {
4526 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4527 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4528 self.catalog_transact(Some(session), vec![op]).await?;
4529 session.add_notice(AdapterNotice::VarDefaultUpdated {
4530 role: None,
4531 var_name: Some(name),
4532 });
4533 Ok(ExecuteResponse::AlteredSystemConfiguration)
4534 }
4535
4536 #[instrument]
4537 pub(super) async fn sequence_alter_system_reset_all(
4538 &mut self,
4539 session: &Session,
4540 _: plan::AlterSystemResetAllPlan,
4541 ) -> Result<ExecuteResponse, AdapterError> {
4542 self.is_user_allowed_to_alter_system(session, None)?;
4543 let op = catalog::Op::ResetAllSystemConfiguration;
4544 self.catalog_transact(Some(session), vec![op]).await?;
4545 session.add_notice(AdapterNotice::VarDefaultUpdated {
4546 role: None,
4547 var_name: None,
4548 });
4549 Ok(ExecuteResponse::AlteredSystemConfiguration)
4550 }
4551
4552 fn is_user_allowed_to_alter_system(
4554 &self,
4555 session: &Session,
4556 var_name: Option<&str>,
4557 ) -> Result<(), AdapterError> {
4558 match (session.user().kind(), var_name) {
4559 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4561 (UserKind::Superuser, Some(name))
4563 if session.user().is_internal()
4564 || self.catalog().system_config().user_modifiable(name) =>
4565 {
4566 let var = self.catalog().system_config().get(name)?;
4569 var.visible(session.user(), self.catalog().system_config())?;
4570 Ok(())
4571 }
4572 (UserKind::Regular, Some(name))
4575 if self.catalog().system_config().user_modifiable(name) =>
4576 {
4577 Err(AdapterError::Unauthorized(
4578 rbac::UnauthorizedError::Superuser {
4579 action: format!("toggle the '{name}' system configuration parameter"),
4580 },
4581 ))
4582 }
4583 _ => Err(AdapterError::Unauthorized(
4584 rbac::UnauthorizedError::MzSystem {
4585 action: "alter system".into(),
4586 },
4587 )),
4588 }
4589 }
4590
4591 fn validate_alter_system_network_policy(
4592 &self,
4593 session: &Session,
4594 policy_value: &plan::VariableValue,
4595 ) -> Result<(), AdapterError> {
4596 let policy_name = match &policy_value {
4597 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4599 plan::VariableValue::Values(values) if values.len() == 1 => {
4600 values.iter().next().cloned()
4601 }
4602 plan::VariableValue::Values(values) => {
4603 tracing::warn!(?values, "can't set multiple network policies at once");
4604 None
4605 }
4606 };
4607 let maybe_network_policy = policy_name
4608 .as_ref()
4609 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4610 let Some(network_policy) = maybe_network_policy else {
4611 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4612 VarError::InvalidParameterValue {
4613 name: NETWORK_POLICY.name(),
4614 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4615 reason: "no network policy with such name exists".to_string(),
4616 },
4617 )));
4618 };
4619 self.validate_alter_network_policy(session, &network_policy.rules)
4620 }
4621
4622 fn validate_alter_network_policy(
4627 &self,
4628 session: &Session,
4629 policy_rules: &Vec<NetworkPolicyRule>,
4630 ) -> Result<(), AdapterError> {
4631 if session.user().is_internal() {
4634 return Ok(());
4635 }
4636 if let Some(ip) = session.meta().client_ip() {
4637 validate_ip_with_policy_rules(ip, policy_rules)
4638 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4639 } else {
4640 return Err(AdapterError::NetworkPolicyDenied(
4643 NetworkPolicyError::MissingIp,
4644 ));
4645 }
4646 Ok(())
4647 }
4648
4649 #[instrument]
4651 pub(super) fn sequence_execute(
4652 &mut self,
4653 session: &mut Session,
4654 plan: plan::ExecutePlan,
4655 ) -> Result<String, AdapterError> {
4656 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4658 let ps = session
4659 .get_prepared_statement_unverified(&plan.name)
4660 .expect("known to exist");
4661 let stmt = ps.stmt().cloned();
4662 let desc = ps.desc().clone();
4663 let state_revision = ps.state_revision;
4664 let logging = Arc::clone(ps.logging());
4665 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4666 }
4667
4668 #[instrument]
4669 pub(super) async fn sequence_grant_privileges(
4670 &mut self,
4671 session: &Session,
4672 plan::GrantPrivilegesPlan {
4673 update_privileges,
4674 grantees,
4675 }: plan::GrantPrivilegesPlan,
4676 ) -> Result<ExecuteResponse, AdapterError> {
4677 self.sequence_update_privileges(
4678 session,
4679 update_privileges,
4680 grantees,
4681 UpdatePrivilegeVariant::Grant,
4682 )
4683 .await
4684 }
4685
4686 #[instrument]
4687 pub(super) async fn sequence_revoke_privileges(
4688 &mut self,
4689 session: &Session,
4690 plan::RevokePrivilegesPlan {
4691 update_privileges,
4692 revokees,
4693 }: plan::RevokePrivilegesPlan,
4694 ) -> Result<ExecuteResponse, AdapterError> {
4695 self.sequence_update_privileges(
4696 session,
4697 update_privileges,
4698 revokees,
4699 UpdatePrivilegeVariant::Revoke,
4700 )
4701 .await
4702 }
4703
4704 #[instrument]
4705 async fn sequence_update_privileges(
4706 &mut self,
4707 session: &Session,
4708 update_privileges: Vec<UpdatePrivilege>,
4709 grantees: Vec<RoleId>,
4710 variant: UpdatePrivilegeVariant,
4711 ) -> Result<ExecuteResponse, AdapterError> {
4712 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4713 let mut warnings = Vec::new();
4714 let catalog = self.catalog().for_session(session);
4715
4716 for UpdatePrivilege {
4717 acl_mode,
4718 target_id,
4719 grantor,
4720 } in update_privileges
4721 {
4722 let actual_object_type = catalog.get_system_object_type(&target_id);
4723 if actual_object_type.is_relation() {
4726 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4727 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4728 if !non_applicable_privileges.is_empty() {
4729 let object_description =
4730 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4731 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4732 non_applicable_privileges,
4733 object_description,
4734 })
4735 }
4736 }
4737
4738 if let SystemObjectId::Object(object_id) = &target_id {
4739 self.catalog()
4740 .ensure_not_reserved_object(object_id, session.conn_id())?;
4741 }
4742
4743 let privileges = self
4744 .catalog()
4745 .get_privileges(&target_id, session.conn_id())
4746 .ok_or(AdapterError::Unsupported(
4749 "GRANTs/REVOKEs on an object type with no privileges",
4750 ))?;
4751
4752 for grantee in &grantees {
4753 self.catalog().ensure_not_system_role(grantee)?;
4754 self.catalog().ensure_not_predefined_role(grantee)?;
4755 let existing_privilege = privileges
4756 .get_acl_item(grantee, &grantor)
4757 .map(Cow::Borrowed)
4758 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4759
4760 match variant {
4761 UpdatePrivilegeVariant::Grant
4762 if !existing_privilege.acl_mode.contains(acl_mode) =>
4763 {
4764 ops.push(catalog::Op::UpdatePrivilege {
4765 target_id: target_id.clone(),
4766 privilege: MzAclItem {
4767 grantee: *grantee,
4768 grantor,
4769 acl_mode,
4770 },
4771 variant,
4772 });
4773 }
4774 UpdatePrivilegeVariant::Revoke
4775 if !existing_privilege
4776 .acl_mode
4777 .intersection(acl_mode)
4778 .is_empty() =>
4779 {
4780 ops.push(catalog::Op::UpdatePrivilege {
4781 target_id: target_id.clone(),
4782 privilege: MzAclItem {
4783 grantee: *grantee,
4784 grantor,
4785 acl_mode,
4786 },
4787 variant,
4788 });
4789 }
4790 _ => {}
4792 }
4793 }
4794 }
4795
4796 if ops.is_empty() {
4797 session.add_notices(warnings);
4798 return Ok(variant.into());
4799 }
4800
4801 let res = self
4802 .catalog_transact(Some(session), ops)
4803 .await
4804 .map(|_| match variant {
4805 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4806 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4807 });
4808 if res.is_ok() {
4809 session.add_notices(warnings);
4810 }
4811 res
4812 }
4813
4814 #[instrument]
4815 pub(super) async fn sequence_alter_default_privileges(
4816 &mut self,
4817 session: &Session,
4818 plan::AlterDefaultPrivilegesPlan {
4819 privilege_objects,
4820 privilege_acl_items,
4821 is_grant,
4822 }: plan::AlterDefaultPrivilegesPlan,
4823 ) -> Result<ExecuteResponse, AdapterError> {
4824 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4825 let variant = if is_grant {
4826 UpdatePrivilegeVariant::Grant
4827 } else {
4828 UpdatePrivilegeVariant::Revoke
4829 };
4830 for privilege_object in &privilege_objects {
4831 self.catalog()
4832 .ensure_not_system_role(&privilege_object.role_id)?;
4833 self.catalog()
4834 .ensure_not_predefined_role(&privilege_object.role_id)?;
4835 if let Some(database_id) = privilege_object.database_id {
4836 self.catalog()
4837 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4838 }
4839 if let Some(schema_id) = privilege_object.schema_id {
4840 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4841 let schema_spec: SchemaSpecifier = schema_id.into();
4842
4843 self.catalog().ensure_not_reserved_object(
4844 &(database_spec, schema_spec).into(),
4845 session.conn_id(),
4846 )?;
4847 }
4848 for privilege_acl_item in &privilege_acl_items {
4849 self.catalog()
4850 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4851 self.catalog()
4852 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4853 ops.push(catalog::Op::UpdateDefaultPrivilege {
4854 privilege_object: privilege_object.clone(),
4855 privilege_acl_item: privilege_acl_item.clone(),
4856 variant,
4857 })
4858 }
4859 }
4860
4861 self.catalog_transact(Some(session), ops).await?;
4862 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4863 }
4864
4865 #[instrument]
4866 pub(super) async fn sequence_grant_role(
4867 &mut self,
4868 session: &Session,
4869 plan::GrantRolePlan {
4870 role_ids,
4871 member_ids,
4872 grantor_id,
4873 }: plan::GrantRolePlan,
4874 ) -> Result<ExecuteResponse, AdapterError> {
4875 let catalog = self.catalog();
4876 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4877 for role_id in role_ids {
4878 for member_id in &member_ids {
4879 let member_membership: BTreeSet<_> =
4880 catalog.get_role(member_id).membership().keys().collect();
4881 if member_membership.contains(&role_id) {
4882 let role_name = catalog.get_role(&role_id).name().to_string();
4883 let member_name = catalog.get_role(member_id).name().to_string();
4884 catalog.ensure_not_reserved_role(member_id)?;
4886 catalog.ensure_grantable_role(&role_id)?;
4887 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4888 role_name,
4889 member_name,
4890 });
4891 } else {
4892 ops.push(catalog::Op::GrantRole {
4893 role_id,
4894 member_id: *member_id,
4895 grantor_id,
4896 });
4897 }
4898 }
4899 }
4900
4901 if ops.is_empty() {
4902 return Ok(ExecuteResponse::GrantedRole);
4903 }
4904
4905 self.catalog_transact(Some(session), ops)
4906 .await
4907 .map(|_| ExecuteResponse::GrantedRole)
4908 }
4909
4910 #[instrument]
4911 pub(super) async fn sequence_revoke_role(
4912 &mut self,
4913 session: &Session,
4914 plan::RevokeRolePlan {
4915 role_ids,
4916 member_ids,
4917 grantor_id,
4918 }: plan::RevokeRolePlan,
4919 ) -> Result<ExecuteResponse, AdapterError> {
4920 let catalog = self.catalog();
4921 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4922 for role_id in role_ids {
4923 for member_id in &member_ids {
4924 let member_membership: BTreeSet<_> =
4925 catalog.get_role(member_id).membership().keys().collect();
4926 if !member_membership.contains(&role_id) {
4927 let role_name = catalog.get_role(&role_id).name().to_string();
4928 let member_name = catalog.get_role(member_id).name().to_string();
4929 catalog.ensure_not_reserved_role(member_id)?;
4931 catalog.ensure_grantable_role(&role_id)?;
4932 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4933 role_name,
4934 member_name,
4935 });
4936 } else {
4937 ops.push(catalog::Op::RevokeRole {
4938 role_id,
4939 member_id: *member_id,
4940 grantor_id,
4941 });
4942 }
4943 }
4944 }
4945
4946 if ops.is_empty() {
4947 return Ok(ExecuteResponse::RevokedRole);
4948 }
4949
4950 self.catalog_transact(Some(session), ops)
4951 .await
4952 .map(|_| ExecuteResponse::RevokedRole)
4953 }
4954
4955 #[instrument]
4956 pub(super) async fn sequence_alter_owner(
4957 &mut self,
4958 session: &Session,
4959 plan::AlterOwnerPlan {
4960 id,
4961 object_type,
4962 new_owner,
4963 }: plan::AlterOwnerPlan,
4964 ) -> Result<ExecuteResponse, AdapterError> {
4965 let mut ops = vec![catalog::Op::UpdateOwner {
4966 id: id.clone(),
4967 new_owner,
4968 }];
4969
4970 match &id {
4971 ObjectId::Item(global_id) => {
4972 let entry = self.catalog().get_entry(global_id);
4973
4974 if entry.is_index() {
4976 let name = self
4977 .catalog()
4978 .resolve_full_name(entry.name(), Some(session.conn_id()))
4979 .to_string();
4980 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4981 return Ok(ExecuteResponse::AlteredObject(object_type));
4982 }
4983
4984 let dependent_index_ops = entry
4986 .used_by()
4987 .into_iter()
4988 .filter(|id| self.catalog().get_entry(id).is_index())
4989 .map(|id| catalog::Op::UpdateOwner {
4990 id: ObjectId::Item(*id),
4991 new_owner,
4992 });
4993 ops.extend(dependent_index_ops);
4994
4995 let dependent_subsources =
4997 entry
4998 .progress_id()
4999 .into_iter()
5000 .map(|item_id| catalog::Op::UpdateOwner {
5001 id: ObjectId::Item(item_id),
5002 new_owner,
5003 });
5004 ops.extend(dependent_subsources);
5005 }
5006 ObjectId::Cluster(cluster_id) => {
5007 let cluster = self.catalog().get_cluster(*cluster_id);
5008 let managed_cluster_replica_ops =
5010 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
5011 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
5012 new_owner,
5013 });
5014 ops.extend(managed_cluster_replica_ops);
5015 }
5016 _ => {}
5017 }
5018
5019 self.catalog_transact(Some(session), ops)
5020 .await
5021 .map(|_| ExecuteResponse::AlteredObject(object_type))
5022 }
5023
5024 #[instrument]
5025 pub(super) async fn sequence_reassign_owned(
5026 &mut self,
5027 session: &Session,
5028 plan::ReassignOwnedPlan {
5029 old_roles,
5030 new_role,
5031 reassign_ids,
5032 }: plan::ReassignOwnedPlan,
5033 ) -> Result<ExecuteResponse, AdapterError> {
5034 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5035 self.catalog().ensure_not_reserved_role(role_id)?;
5036 }
5037
5038 let ops = reassign_ids
5039 .into_iter()
5040 .map(|id| catalog::Op::UpdateOwner {
5041 id,
5042 new_owner: new_role,
5043 })
5044 .collect();
5045
5046 self.catalog_transact(Some(session), ops)
5047 .await
5048 .map(|_| ExecuteResponse::ReassignOwned)
5049 }
5050
5051 #[instrument]
5052 pub(crate) async fn handle_deferred_statement(&mut self) {
5053 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5057 return;
5058 };
5059 match ps {
5060 crate::coord::PlanStatement::Statement { stmt, params } => {
5061 self.handle_execute_inner(stmt, params, ctx).await;
5062 }
5063 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5064 self.sequence_plan(ctx, plan, resolved_ids).await;
5065 }
5066 }
5067 }
5068
5069 #[instrument]
5070 #[allow(clippy::unused_async)]
5072 pub(super) async fn sequence_alter_table(
5073 &mut self,
5074 ctx: &mut ExecuteContext,
5075 plan: plan::AlterTablePlan,
5076 ) -> Result<ExecuteResponse, AdapterError> {
5077 let plan::AlterTablePlan {
5078 relation_id,
5079 column_name,
5080 column_type,
5081 raw_sql_type,
5082 } = plan;
5083
5084 let id_ts = self.get_catalog_write_ts().await;
5086 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5087 let ops = vec![catalog::Op::AlterAddColumn {
5088 id: relation_id,
5089 new_global_id,
5090 name: column_name,
5091 typ: column_type,
5092 sql: raw_sql_type,
5093 }];
5094
5095 let entry = self.catalog().get_entry(&relation_id);
5096 let CatalogItem::Table(table) = &entry.item else {
5097 let err = format!("expected table, found {:?}", entry.item);
5098 return Err(AdapterError::Internal(err));
5099 };
5100 let expected_version = table.desc.latest_version();
5102 let existing_global_id = table.global_id_writes();
5103
5104 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5105 Box::pin(async move {
5106 let entry = coord.catalog().get_entry(&relation_id);
5107 let CatalogItem::Table(table) = &entry.item else {
5108 panic!("programming error, expected table found {:?}", entry.item);
5109 };
5110 let table = table.clone();
5111
5112 let existing_table = crate::CollectionIdBundle {
5116 storage_ids: btreeset![existing_global_id],
5117 compute_ids: BTreeMap::new(),
5118 };
5119 let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5120
5121 let new_version = table.desc.latest_version();
5122 let new_desc = table
5123 .desc
5124 .at_version(RelationVersionSelector::Specific(new_version));
5125 let register_ts = coord.get_local_write_ts().await.timestamp;
5126
5127 coord
5129 .controller
5130 .storage
5131 .alter_table_desc(
5132 existing_global_id,
5133 new_global_id,
5134 new_desc,
5135 expected_version,
5136 register_ts,
5137 )
5138 .await
5139 .expect("failed to alter desc of table");
5140
5141 let compaction_window = table
5143 .custom_logical_compaction_window
5144 .unwrap_or(CompactionWindow::Default);
5145 coord
5146 .initialize_read_policies(
5147 &crate::CollectionIdBundle {
5148 storage_ids: btreeset![new_global_id],
5149 compute_ids: BTreeMap::new(),
5150 },
5151 compaction_window,
5152 )
5153 .await;
5154 coord.apply_local_write(register_ts).await;
5155
5156 drop(existing_table_read_hold);
5158 })
5159 })
5160 .await?;
5161
5162 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5163 }
5164}
5165
5166#[derive(Debug)]
5167struct CachedStatisticsOracle {
5168 cache: BTreeMap<GlobalId, usize>,
5169}
5170
5171impl CachedStatisticsOracle {
5172 pub async fn new<T: TimelyTimestamp>(
5173 ids: &BTreeSet<GlobalId>,
5174 as_of: &Antichain<T>,
5175 storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5176 ) -> Result<Self, StorageError<T>> {
5177 let mut cache = BTreeMap::new();
5178
5179 for id in ids {
5180 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5181
5182 match stats {
5183 Ok(stats) => {
5184 cache.insert(*id, stats.num_updates);
5185 }
5186 Err(StorageError::IdentifierMissing(id)) => {
5187 ::tracing::debug!("no statistics for {id}")
5188 }
5189 Err(e) => return Err(e),
5190 }
5191 }
5192
5193 Ok(Self { cache })
5194 }
5195}
5196
5197impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5198 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5199 self.cache.get(&id).map(|estimate| *estimate)
5200 }
5201
5202 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5203 self.cache.clone()
5204 }
5205}
5206
5207impl Coordinator {
5208 pub(super) async fn statistics_oracle(
5209 &self,
5210 session: &Session,
5211 source_ids: &BTreeSet<GlobalId>,
5212 query_as_of: &Antichain<Timestamp>,
5213 is_oneshot: bool,
5214 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5215 if !session.vars().enable_session_cardinality_estimates() {
5216 return Ok(Box::new(EmptyStatisticsOracle));
5217 }
5218
5219 let timeout = if is_oneshot {
5220 self.catalog()
5222 .system_config()
5223 .optimizer_oneshot_stats_timeout()
5224 } else {
5225 self.catalog().system_config().optimizer_stats_timeout()
5226 };
5227
5228 let cached_stats = mz_ore::future::timeout(
5229 timeout,
5230 CachedStatisticsOracle::new(
5231 source_ids,
5232 query_as_of,
5233 self.controller.storage_collections.as_ref(),
5234 ),
5235 )
5236 .await;
5237
5238 match cached_stats {
5239 Ok(stats) => Ok(Box::new(stats)),
5240 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5241 warn!(
5242 is_oneshot = is_oneshot,
5243 "optimizer statistics collection timed out after {}ms",
5244 timeout.as_millis()
5245 );
5246
5247 Ok(Box::new(EmptyStatisticsOracle))
5248 }
5249 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5250 }
5251 }
5252}
5253
5254pub(super) fn check_log_reads(
5263 catalog: &Catalog,
5264 cluster: &Cluster,
5265 source_ids: &BTreeSet<GlobalId>,
5266 target_replica: &mut Option<ReplicaId>,
5267 vars: &SessionVars,
5268) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5269where
5270{
5271 let log_names = source_ids
5272 .iter()
5273 .map(|gid| catalog.resolve_item_id(gid))
5274 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5275 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5276 .collect::<Vec<_>>();
5277
5278 if log_names.is_empty() {
5279 return Ok(None);
5280 }
5281
5282 let num_replicas = cluster.replicas().count();
5286 if target_replica.is_none() {
5287 if num_replicas == 1 {
5288 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5289 } else {
5290 return Err(AdapterError::UntargetedLogRead { log_names });
5291 }
5292 }
5293
5294 let replica_id = target_replica.expect("set to `Some` above");
5297 let replica = &cluster.replica(replica_id).expect("Replica must exist");
5298 if !replica.config.compute.logging.enabled() {
5299 return Err(AdapterError::IntrospectionDisabled { log_names });
5300 }
5301
5302 Ok(vars
5303 .emit_introspection_query_notice()
5304 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5305}
5306
5307impl Coordinator {
5308 fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5310 if notices.is_empty() {
5312 return;
5313 }
5314 let humanizer = self.catalog.for_session(session);
5315 let system_vars = self.catalog.system_config();
5316 for notice in notices {
5317 let kind = OptimizerNoticeKind::from(notice);
5318 let notice_enabled = match kind {
5319 OptimizerNoticeKind::IndexAlreadyExists => {
5320 system_vars.enable_notices_for_index_already_exists()
5321 }
5322 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5323 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5324 }
5325 OptimizerNoticeKind::IndexKeyEmpty => {
5326 system_vars.enable_notices_for_index_empty_key()
5327 }
5328 };
5329 if notice_enabled {
5330 session.add_notice(AdapterNotice::OptimizerNotice {
5334 notice: notice.message(&humanizer, false).to_string(),
5335 hint: notice.hint(&humanizer, false).to_string(),
5336 });
5337 }
5338 self.metrics
5339 .optimization_notices
5340 .with_label_values(&[kind.metric_label()])
5341 .inc_by(1);
5342 }
5343 }
5344
5345 async fn process_dataflow_metainfo(
5347 &mut self,
5348 df_meta: DataflowMetainfo,
5349 export_id: GlobalId,
5350 ctx: Option<&mut ExecuteContext>,
5351 notice_ids: Vec<GlobalId>,
5352 ) -> Option<BuiltinTableAppendNotify> {
5353 if let Some(ctx) = ctx {
5355 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5356 }
5357
5358 let df_meta = self
5360 .catalog()
5361 .render_notices(df_meta, notice_ids, Some(export_id));
5362
5363 if self.catalog().state().system_config().enable_mz_notices()
5366 && !df_meta.optimizer_notices.is_empty()
5367 {
5368 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5369 self.catalog().state().pack_optimizer_notices(
5370 &mut builtin_table_updates,
5371 df_meta.optimizer_notices.iter(),
5372 Diff::ONE,
5373 );
5374
5375 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5377
5378 Some(
5379 self.builtin_table_update()
5380 .execute(builtin_table_updates)
5381 .await
5382 .0,
5383 )
5384 } else {
5385 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5387
5388 None
5389 }
5390 }
5391}